mirror of
https://gitlab.com/Chill-Projet/chill-bundles.git
synced 2025-08-29 19:13:49 +00:00
Merge branch '111_exports_suite' into calendar/finalization
This commit is contained in:
@@ -0,0 +1,103 @@
|
||||
<?php
|
||||
|
||||
/**
|
||||
* Chill is a software for social workers
|
||||
*
|
||||
* For the full copyright and license information, please view
|
||||
* the LICENSE file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Chill\MainBundle\Service\Import;
|
||||
|
||||
use League\Csv\Reader;
|
||||
use League\Csv\Statement;
|
||||
use RuntimeException;
|
||||
use Symfony\Contracts\HttpClient\Exception\TransportExceptionInterface;
|
||||
use Symfony\Contracts\HttpClient\HttpClientInterface;
|
||||
|
||||
class AddressReferenceBEFromBestAddress
|
||||
{
|
||||
private const RELEASE = 'https://gitea.champs-libres.be/api/v1/repos/Chill-project/belgian-bestaddresses-transform/releases/tags/v1.0.0';
|
||||
|
||||
private AddressReferenceBaseImporter $baseImporter;
|
||||
|
||||
private HttpClientInterface $client;
|
||||
|
||||
public function __construct(HttpClientInterface $client, AddressReferenceBaseImporter $baseImporter)
|
||||
{
|
||||
$this->client = $client;
|
||||
$this->baseImporter = $baseImporter;
|
||||
}
|
||||
|
||||
public function import(string $lang, array $lists): void
|
||||
{
|
||||
foreach ($lists as $list) {
|
||||
$this->importList($lang, $list);
|
||||
}
|
||||
}
|
||||
|
||||
private function getDownloadUrl(string $lang, string $list): string
|
||||
{
|
||||
try {
|
||||
$release = $this->client->request('GET', self::RELEASE)
|
||||
->toArray();
|
||||
} catch (TransportExceptionInterface $e) {
|
||||
throw new RuntimeException('could not get the release definition', 0, $e);
|
||||
}
|
||||
|
||||
$asset = array_filter($release['assets'], static function (array $item) use ($lang, $list) {
|
||||
return 'addresses-' . $list . '.' . $lang . '.csv.gz' === $item['name'];
|
||||
});
|
||||
|
||||
return array_values($asset)[0]['browser_download_url'];
|
||||
}
|
||||
|
||||
private function importList(string $lang, string $list): void
|
||||
{
|
||||
$downloadUrl = $this->getDownloadUrl($lang, $list);
|
||||
|
||||
$response = $this->client->request('GET', $downloadUrl);
|
||||
|
||||
if (200 !== $response->getStatusCode()) {
|
||||
throw new Exception('Could not download CSV: ' . $response->getStatusCode());
|
||||
}
|
||||
|
||||
$tmpname = tempnam(sys_get_temp_dir(), 'php-add-' . $list . $lang);
|
||||
$file = fopen($tmpname, 'r+b');
|
||||
|
||||
foreach ($this->client->stream($response) as $chunk) {
|
||||
fwrite($file, $chunk->getContent());
|
||||
}
|
||||
|
||||
fclose($file);
|
||||
|
||||
$uncompressedStream = gzopen($tmpname, 'r');
|
||||
|
||||
$csv = Reader::createFromStream($uncompressedStream);
|
||||
$csv->setDelimiter(',');
|
||||
$csv->setHeaderOffset(0);
|
||||
|
||||
$stmt = Statement::create()
|
||||
->process($csv);
|
||||
|
||||
foreach ($stmt as $record) {
|
||||
$this->baseImporter->importAddress(
|
||||
$record['best_id'],
|
||||
$record['municipality_objectid'],
|
||||
$record['postal_info_objectid'],
|
||||
$record['streetname'],
|
||||
$record['housenumber'] . $record['boxnumber'],
|
||||
'bestaddress.' . $list,
|
||||
(float) $record['X'],
|
||||
(float) $record['Y'],
|
||||
3812
|
||||
);
|
||||
}
|
||||
|
||||
$this->baseImporter->finalize();
|
||||
|
||||
gzclose($uncompressedStream);
|
||||
}
|
||||
}
|
@@ -0,0 +1,221 @@
|
||||
<?php
|
||||
|
||||
/**
|
||||
* Chill is a software for social workers
|
||||
*
|
||||
* For the full copyright and license information, please view
|
||||
* the LICENSE file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Chill\MainBundle\Service\Import;
|
||||
|
||||
use Doctrine\DBAL\Connection;
|
||||
use Doctrine\DBAL\Statement;
|
||||
use Exception;
|
||||
use LogicException;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use RuntimeException;
|
||||
use function array_key_exists;
|
||||
use function count;
|
||||
|
||||
final class AddressReferenceBaseImporter
|
||||
{
|
||||
private const INSERT = <<<'SQL'
|
||||
INSERT INTO reference_address_temp
|
||||
(postcode_id, refid, street, streetnumber, municipalitycode, source, point)
|
||||
SELECT
|
||||
cmpc.id, i.refid, i.street, i.streetnumber, i.refpostalcode, i.source,
|
||||
CASE WHEN (i.lon::float != 0.0 AND i.lat::float != 0.0) THEN ST_Transform(ST_setSrid(ST_point(i.lon::float, i.lat::float), i.srid::int), 4326) ELSE NULL END
|
||||
FROM
|
||||
(VALUES
|
||||
{{ values }}
|
||||
) AS i (refid, refpostalcode, postalcode, street, streetnumber, source, lat, lon, srid)
|
||||
JOIN chill_main_postal_code cmpc ON cmpc.refpostalcodeid = i.refpostalcode and cmpc.code = i.postalcode
|
||||
SQL;
|
||||
|
||||
private const LOG_PREFIX = '[AddressReferenceImporter] ';
|
||||
|
||||
private const VALUE = '(?, ?, ?, ?, ?, ?, ?, ?, ?)';
|
||||
|
||||
/**
|
||||
* @var array<int, Statement>
|
||||
*/
|
||||
private array $cachingStatements = [];
|
||||
|
||||
private ?string $currentSource = null;
|
||||
|
||||
private Connection $defaultConnection;
|
||||
|
||||
private bool $isInitialized = false;
|
||||
|
||||
private LoggerInterface $logger;
|
||||
|
||||
private array $waitingForInsert = [];
|
||||
|
||||
public function __construct(Connection $defaultConnection, LoggerInterface $logger)
|
||||
{
|
||||
$this->defaultConnection = $defaultConnection;
|
||||
$this->logger = $logger;
|
||||
}
|
||||
|
||||
public function finalize(): void
|
||||
{
|
||||
$this->doInsertPending();
|
||||
|
||||
$this->updateAddressReferenceTable();
|
||||
|
||||
$this->deleteTemporaryTable();
|
||||
|
||||
$this->currentSource = null;
|
||||
$this->isInitialized = false;
|
||||
}
|
||||
|
||||
public function importAddress(
|
||||
string $refAddress,
|
||||
?string $refPostalCode,
|
||||
string $postalCode,
|
||||
string $street,
|
||||
string $streetNumber,
|
||||
string $source,
|
||||
?float $lat = null,
|
||||
?float $lon = null,
|
||||
?int $srid = null
|
||||
): void {
|
||||
if (!$this->isInitialized) {
|
||||
$this->initialize($source);
|
||||
}
|
||||
|
||||
if ($this->currentSource !== $source) {
|
||||
throw new LogicException('Cannot store addresses from different sources during same import. Execute finalize to commit inserts before changing the source');
|
||||
}
|
||||
|
||||
$this->waitingForInsert[] = [
|
||||
$refAddress,
|
||||
$refPostalCode,
|
||||
$postalCode,
|
||||
$street,
|
||||
$streetNumber,
|
||||
$source,
|
||||
$lat,
|
||||
$lon,
|
||||
$srid,
|
||||
];
|
||||
|
||||
if (100 <= count($this->waitingForInsert)) {
|
||||
$this->doInsertPending();
|
||||
}
|
||||
}
|
||||
|
||||
private function createTemporaryTable(): void
|
||||
{
|
||||
$this->defaultConnection->executeStatement('CREATE TEMPORARY TABLE reference_address_temp (
|
||||
postcode_id INT,
|
||||
refid VARCHAR(255),
|
||||
street VARCHAR(255),
|
||||
streetnumber VARCHAR(255),
|
||||
municipalitycode VARCHAR(255),
|
||||
source VARCHAR(255),
|
||||
point GEOMETRY
|
||||
);
|
||||
');
|
||||
$this->defaultConnection->executeStatement('SET work_mem TO \'50MB\'');
|
||||
}
|
||||
|
||||
private function deleteTemporaryTable(): void
|
||||
{
|
||||
$this->defaultConnection->executeStatement('DROP TABLE IF EXISTS reference_address_temp');
|
||||
}
|
||||
|
||||
private function doInsertPending(): void
|
||||
{
|
||||
if (!array_key_exists($forNumber = count($this->waitingForInsert), $this->cachingStatements)) {
|
||||
$sql = strtr(self::INSERT, [
|
||||
'{{ values }}' => implode(
|
||||
', ',
|
||||
array_fill(0, $forNumber, self::VALUE)
|
||||
),
|
||||
]);
|
||||
|
||||
$this->logger->debug(self::LOG_PREFIX . ' generated sql for insert', [
|
||||
'sql' => $sql,
|
||||
'forNumber' => $forNumber,
|
||||
]);
|
||||
|
||||
$this->cachingStatements[$forNumber] = $this->defaultConnection->prepare($sql);
|
||||
}
|
||||
|
||||
if (0 === $forNumber) {
|
||||
return;
|
||||
}
|
||||
|
||||
$this->logger->debug(self::LOG_PREFIX . ' inserting pending addresses', [
|
||||
'number' => $forNumber,
|
||||
'first' => $this->waitingForInsert[0] ?? null,
|
||||
]);
|
||||
|
||||
$statement = $this->cachingStatements[$forNumber];
|
||||
|
||||
try {
|
||||
$affected = $statement->executeStatement(array_merge(...$this->waitingForInsert));
|
||||
|
||||
if (0 === $affected) {
|
||||
throw new RuntimeException('no row affected');
|
||||
}
|
||||
} catch (Exception $e) {
|
||||
// in some case, we can add debug code here
|
||||
//dump($this->waitingForInsert);
|
||||
throw $e;
|
||||
} finally {
|
||||
$this->waitingForInsert = [];
|
||||
}
|
||||
}
|
||||
|
||||
private function initialize(string $source): void
|
||||
{
|
||||
$this->currentSource = $source;
|
||||
$this->deleteTemporaryTable();
|
||||
$this->createTemporaryTable();
|
||||
$this->isInitialized = true;
|
||||
}
|
||||
|
||||
private function updateAddressReferenceTable(): void
|
||||
{
|
||||
$this->defaultConnection->executeStatement(
|
||||
'CREATE INDEX idx_ref_add_temp ON reference_address_temp (refid)'
|
||||
);
|
||||
|
||||
//1) Add new addresses
|
||||
$this->logger->info(self::LOG_PREFIX . 'upsert new addresses');
|
||||
$affected = $this->defaultConnection->executeStatement("INSERT INTO chill_main_address_reference
|
||||
(id, postcode_id, refid, street, streetnumber, municipalitycode, source, point, createdat, deletedat, updatedat)
|
||||
SELECT
|
||||
nextval('chill_main_address_reference_id_seq'),
|
||||
postcode_id,
|
||||
refid,
|
||||
street,
|
||||
streetnumber,
|
||||
municipalitycode,
|
||||
source,
|
||||
point,
|
||||
NOW(),
|
||||
null,
|
||||
NOW()
|
||||
FROM reference_address_temp
|
||||
ON CONFLICT (refid, source) DO UPDATE
|
||||
SET postcode_id = excluded.postcode_id, refid = excluded.refid, street = excluded.street, streetnumber = excluded.streetnumber, municipalitycode = excluded.municipalitycode, source = excluded.source, point = excluded.point, updatedat = NOW(), deletedAt = NULL
|
||||
");
|
||||
$this->logger->info(self::LOG_PREFIX . 'addresses upserted', ['upserted' => $affected]);
|
||||
|
||||
//3) Delete addresses
|
||||
$this->logger->info(self::LOG_PREFIX . 'soft delete adresses');
|
||||
$affected = $this->defaultConnection->executeStatement('UPDATE chill_main_address_reference
|
||||
SET deletedat = NOW()
|
||||
WHERE
|
||||
chill_main_address_reference.refid NOT IN (SELECT refid FROM reference_address_temp WHERE source LIKE ?)
|
||||
AND chill_main_address_reference.source LIKE ?
|
||||
', [$this->currentSource, $this->currentSource]);
|
||||
$this->logger->info(self::LOG_PREFIX . 'addresses deleted', ['deleted' => $affected]);
|
||||
}
|
||||
}
|
@@ -0,0 +1,87 @@
|
||||
<?php
|
||||
|
||||
/**
|
||||
* Chill is a software for social workers
|
||||
*
|
||||
* For the full copyright and license information, please view
|
||||
* the LICENSE file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Chill\MainBundle\Service\Import;
|
||||
|
||||
use Exception;
|
||||
use League\Csv\Reader;
|
||||
use League\Csv\Statement;
|
||||
use Symfony\Contracts\HttpClient\HttpClientInterface;
|
||||
use UnexpectedValueException;
|
||||
use function is_int;
|
||||
|
||||
class AddressReferenceFromBano
|
||||
{
|
||||
private AddressReferenceBaseImporter $baseImporter;
|
||||
|
||||
private HttpClientInterface $client;
|
||||
|
||||
public function __construct(HttpClientInterface $client, AddressReferenceBaseImporter $baseImporter)
|
||||
{
|
||||
$this->client = $client;
|
||||
$this->baseImporter = $baseImporter;
|
||||
}
|
||||
|
||||
public function import(string $departementNo): void
|
||||
{
|
||||
if (!is_numeric($departementNo) || !is_int((int) $departementNo)) {
|
||||
throw new UnexpectedValueException('Could not parse this department number');
|
||||
}
|
||||
|
||||
$url = "https://bano.openstreetmap.fr/data/bano-{$departementNo}.csv";
|
||||
|
||||
$response = $this->client->request('GET', $url);
|
||||
|
||||
if (200 !== $response->getStatusCode()) {
|
||||
throw new Exception('Could not download CSV: ' . $response->getStatusCode());
|
||||
}
|
||||
|
||||
$file = tmpfile();
|
||||
|
||||
foreach ($this->client->stream($response) as $chunk) {
|
||||
fwrite($file, $chunk->getContent());
|
||||
}
|
||||
|
||||
fseek($file, 0);
|
||||
|
||||
$csv = Reader::createFromStream($file);
|
||||
$csv->setDelimiter(',');
|
||||
$stmt = Statement::create()
|
||||
->process($csv, [
|
||||
'refId',
|
||||
'streetNumber',
|
||||
'street',
|
||||
'postcode',
|
||||
'city',
|
||||
'_o',
|
||||
'lat',
|
||||
'lon',
|
||||
]);
|
||||
|
||||
foreach ($stmt as $record) {
|
||||
$this->baseImporter->importAddress(
|
||||
$record['refId'],
|
||||
substr($record['refId'], 0, 5), // extract insee from reference
|
||||
$record['postcode'],
|
||||
$record['street'],
|
||||
$record['streetNumber'],
|
||||
'BANO.' . $departementNo,
|
||||
(float) $record['lat'],
|
||||
(float) $record['lon'],
|
||||
4326
|
||||
);
|
||||
}
|
||||
|
||||
$this->baseImporter->finalize();
|
||||
|
||||
fclose($file);
|
||||
}
|
||||
}
|
@@ -0,0 +1,236 @@
|
||||
<?php
|
||||
|
||||
/**
|
||||
* Chill is a software for social workers
|
||||
*
|
||||
* For the full copyright and license information, please view
|
||||
* the LICENSE file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Chill\MainBundle\Service\Import;
|
||||
|
||||
use Doctrine\DBAL\Connection;
|
||||
use Doctrine\DBAL\Statement;
|
||||
use Doctrine\DBAL\Types\Types;
|
||||
use Exception;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use RuntimeException;
|
||||
use function array_key_exists;
|
||||
use function count;
|
||||
|
||||
final class GeographicalUnitBaseImporter
|
||||
{
|
||||
private const INSERT = <<<'SQL'
|
||||
INSERT INTO geographical_unit_temp
|
||||
(layerKey, layerName, unitName, unitKey, geom)
|
||||
SELECT
|
||||
i.layerKey, i.layerName, i.unitName, i.unitKey,
|
||||
ST_Transform(ST_setSrid(ST_GeomFromText(i.wkt), i.srid), 4326)
|
||||
FROM
|
||||
(VALUES
|
||||
{{ values }}
|
||||
) AS i (layerKey, layerName, unitName, unitKey, wkt, srid)
|
||||
SQL;
|
||||
|
||||
private const LOG_PREFIX = '[GeographicalUnitBAseImporter] ';
|
||||
|
||||
private const VALUE = '(?, ?::jsonb, ?, ?, ?, ?::int)';
|
||||
|
||||
/**
|
||||
* @var array<int, Statement>
|
||||
*/
|
||||
private array $cachingStatements = [];
|
||||
|
||||
private Connection $defaultConnection;
|
||||
|
||||
private bool $isInitialized = false;
|
||||
|
||||
private LoggerInterface $logger;
|
||||
|
||||
private array $waitingForInsert = [];
|
||||
|
||||
public function __construct(Connection $defaultConnection, LoggerInterface $logger)
|
||||
{
|
||||
$this->defaultConnection = $defaultConnection;
|
||||
$this->logger = $logger;
|
||||
}
|
||||
|
||||
public function finalize(): void
|
||||
{
|
||||
$this->doInsertPending();
|
||||
|
||||
$this->prepareForFinalize();
|
||||
|
||||
$this->updateGeographicalUnitTable();
|
||||
|
||||
$this->deleteTemporaryTable();
|
||||
|
||||
$this->isInitialized = false;
|
||||
}
|
||||
|
||||
public function importUnit(
|
||||
string $layerKey,
|
||||
array $layerName,
|
||||
string $unitName,
|
||||
string $unitKey,
|
||||
string $geomAsWKT,
|
||||
?int $srid = null
|
||||
): void {
|
||||
$this->initialize();
|
||||
|
||||
$this->waitingForInsert[] = [
|
||||
'layerKey' => $layerKey,
|
||||
'layerName' => $layerName,
|
||||
'unitName' => $unitName,
|
||||
'unitKey' => $unitKey,
|
||||
'geomAsWKT' => $geomAsWKT,
|
||||
'srid' => $srid,
|
||||
];
|
||||
|
||||
if (100 <= count($this->waitingForInsert)) {
|
||||
$this->doInsertPending();
|
||||
}
|
||||
}
|
||||
|
||||
private function createTemporaryTable(): void
|
||||
{
|
||||
$this->defaultConnection->executeStatement("CREATE TEMPORARY TABLE geographical_unit_temp (
|
||||
layerKey TEXT DEFAULT '' NOT NULL,
|
||||
layerName JSONB DEFAULT '[]'::jsonb NOT NULL,
|
||||
unitName TEXT default '' NOT NULL,
|
||||
unitKey TEXT default '' NOT NULL,
|
||||
geom GEOMETRY(MULTIPOLYGON, 4326)
|
||||
)");
|
||||
|
||||
$this->defaultConnection->executeStatement('SET work_mem TO \'50MB\'');
|
||||
}
|
||||
|
||||
private function deleteTemporaryTable(): void
|
||||
{
|
||||
$this->defaultConnection->executeStatement('DROP TABLE IF EXISTS geographical_unit_temp');
|
||||
}
|
||||
|
||||
private function doInsertPending(): void
|
||||
{
|
||||
$forNumber = count($this->waitingForInsert);
|
||||
|
||||
if (0 === $forNumber) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!array_key_exists($forNumber, $this->cachingStatements)) {
|
||||
$sql = strtr(self::INSERT, [
|
||||
'{{ values }}' => implode(
|
||||
', ',
|
||||
array_fill(0, $forNumber, self::VALUE)
|
||||
),
|
||||
]);
|
||||
|
||||
$this->logger->debug(self::LOG_PREFIX . ' generated sql for insert', [
|
||||
'sql' => $sql,
|
||||
'forNumber' => $forNumber,
|
||||
]);
|
||||
|
||||
$this->cachingStatements[$forNumber] = $this->defaultConnection->prepare($sql);
|
||||
}
|
||||
|
||||
$statement = $this->cachingStatements[$forNumber];
|
||||
|
||||
try {
|
||||
$i = 0;
|
||||
|
||||
foreach ($this->waitingForInsert as $insert) {
|
||||
$statement->bindValue(++$i, $insert['layerKey'], Types::STRING);
|
||||
$statement->bindValue(++$i, $insert['layerName'], Types::JSON);
|
||||
$statement->bindValue(++$i, $insert['unitName'], Types::STRING);
|
||||
$statement->bindValue(++$i, $insert['unitKey'], Types::STRING);
|
||||
$statement->bindValue(++$i, $insert['geomAsWKT'], Types::STRING);
|
||||
$statement->bindValue(++$i, $insert['srid'], Types::INTEGER);
|
||||
}
|
||||
|
||||
$affected = $statement->executeStatement();
|
||||
|
||||
if (0 === $affected) {
|
||||
throw new RuntimeException('no row affected');
|
||||
}
|
||||
} catch (Exception $e) {
|
||||
throw $e;
|
||||
} finally {
|
||||
$this->waitingForInsert = [];
|
||||
}
|
||||
}
|
||||
|
||||
private function initialize(): void
|
||||
{
|
||||
if ($this->isInitialized) {
|
||||
return;
|
||||
}
|
||||
|
||||
$this->deleteTemporaryTable();
|
||||
$this->createTemporaryTable();
|
||||
$this->isInitialized = true;
|
||||
}
|
||||
|
||||
private function prepareForFinalize(): void
|
||||
{
|
||||
$this->defaultConnection->executeStatement(
|
||||
'CREATE INDEX idx_ref_add_temp ON geographical_unit_temp (unitKey)'
|
||||
);
|
||||
}
|
||||
|
||||
private function updateGeographicalUnitTable(): void
|
||||
{
|
||||
$this->defaultConnection->transactional(
|
||||
function () {
|
||||
// 0) create new layers
|
||||
$this->defaultConnection->executeStatement(
|
||||
"
|
||||
WITH unique_layers AS (
|
||||
SELECT DISTINCT layerKey, layerName FROM geographical_unit_temp
|
||||
)
|
||||
INSERT INTO chill_main_geographical_unit_layer (id, name, refid)
|
||||
SELECT
|
||||
nextval('chill_main_geographical_unit_layer_id_seq'),
|
||||
layerName,
|
||||
layerKey
|
||||
FROM unique_layers
|
||||
ON CONFLICT (refid)
|
||||
DO UPDATE SET name=EXCLUDED.name
|
||||
"
|
||||
);
|
||||
|
||||
//1) Add new units
|
||||
$this->logger->info(self::LOG_PREFIX . 'upsert new units');
|
||||
$affected = $this->defaultConnection->executeStatement("INSERT INTO chill_main_geographical_unit
|
||||
(id, geom, unitname, layer_id, unitrefid)
|
||||
SELECT
|
||||
nextval('chill_main_geographical_unit_id_seq'),
|
||||
geom,
|
||||
unitName,
|
||||
layer.id,
|
||||
unitKey
|
||||
FROM geographical_unit_temp JOIN chill_main_geographical_unit_layer AS layer ON layer.refid = layerKey
|
||||
ON CONFLICT (layer_id, unitrefid)
|
||||
DO UPDATE
|
||||
SET geom = EXCLUDED.geom, unitname = EXCLUDED.unitname
|
||||
");
|
||||
$this->logger->info(self::LOG_PREFIX . 'units upserted', ['upserted' => $affected]);
|
||||
|
||||
//3) Delete units
|
||||
$this->logger->info(self::LOG_PREFIX . 'soft delete adresses');
|
||||
$affected = $this->defaultConnection->executeStatement('WITH to_delete AS (
|
||||
SELECT cmgu.id
|
||||
FROM chill_main_geographical_unit AS cmgu
|
||||
JOIN chill_main_geographical_unit_layer AS cmgul ON cmgul.id = cmgu.layer_id
|
||||
JOIN geographical_unit_temp AS gut ON cmgul.refid = gut.layerKey AND cmgu.unitrefid = gut.unitKey
|
||||
)
|
||||
DELETE FROM chill_main_geographical_unit
|
||||
WHERE id NOT IN (SELECT id FROM to_delete)
|
||||
');
|
||||
$this->logger->info(self::LOG_PREFIX . 'addresses deleted', ['deleted' => $affected]);
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
@@ -0,0 +1,105 @@
|
||||
<?php
|
||||
|
||||
/**
|
||||
* Chill is a software for social workers
|
||||
*
|
||||
* For the full copyright and license information, please view
|
||||
* the LICENSE file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Chill\MainBundle\Service\Import;
|
||||
|
||||
use League\Csv\Reader;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use RuntimeException;
|
||||
use Symfony\Contracts\HttpClient\Exception\TransportExceptionInterface;
|
||||
use Symfony\Contracts\HttpClient\HttpClientInterface;
|
||||
|
||||
class PostalCodeBEFromBestAddress
|
||||
{
|
||||
private const RELEASE = 'https://gitea.champs-libres.be/api/v1/repos/Chill-project/belgian-bestaddresses-transform/releases/tags/v1.0.0';
|
||||
|
||||
private PostalCodeBaseImporter $baseImporter;
|
||||
|
||||
private HttpClientInterface $client;
|
||||
|
||||
private LoggerInterface $logger;
|
||||
|
||||
public function __construct(PostalCodeBaseImporter $baseImporter, HttpClientInterface $client, LoggerInterface $logger)
|
||||
{
|
||||
$this->baseImporter = $baseImporter;
|
||||
$this->client = $client;
|
||||
$this->logger = $logger;
|
||||
}
|
||||
|
||||
public function import(string $lang = 'fr'): void
|
||||
{
|
||||
$fileDownloadUrl = $this->getFileDownloadUrl($lang);
|
||||
|
||||
$response = $this->client->request('GET', $fileDownloadUrl);
|
||||
|
||||
$tmpname = tempnam(sys_get_temp_dir(), 'postalcodes');
|
||||
$tmpfile = fopen($tmpname, 'r+b');
|
||||
|
||||
if (false === $tmpfile) {
|
||||
throw new RuntimeException('could not create temporary file');
|
||||
}
|
||||
|
||||
foreach ($this->client->stream($response) as $chunk) {
|
||||
fwrite($tmpfile, $chunk->getContent());
|
||||
}
|
||||
|
||||
fclose($tmpfile);
|
||||
|
||||
$uncompressedStream = gzopen($tmpname, 'r');
|
||||
|
||||
$csv = Reader::createFromStream($uncompressedStream);
|
||||
$csv->setDelimiter(',');
|
||||
$csv->setHeaderOffset(0);
|
||||
|
||||
foreach ($csv as $offset => $record) {
|
||||
$this->handleRecord($record);
|
||||
}
|
||||
|
||||
gzclose($uncompressedStream);
|
||||
unlink($tmpname);
|
||||
|
||||
$this->logger->info(__CLASS__ . ' list of postal code downloaded');
|
||||
|
||||
$this->baseImporter->finalize();
|
||||
|
||||
$this->logger->info(__CLASS__ . ' postal code fetched', ['offset' => $offset ?? 0]);
|
||||
}
|
||||
|
||||
private function getFileDownloadUrl(string $lang): string
|
||||
{
|
||||
try {
|
||||
$release = $this->client->request('GET', self::RELEASE)
|
||||
->toArray();
|
||||
} catch (TransportExceptionInterface $e) {
|
||||
throw new RuntimeException('could not get the release definition', 0, $e);
|
||||
}
|
||||
|
||||
$postals = array_filter($release['assets'], static function (array $item) use ($lang) {
|
||||
return 'postals.' . $lang . '.csv.gz' === $item['name'];
|
||||
});
|
||||
|
||||
return array_values($postals)[0]['browser_download_url'];
|
||||
}
|
||||
|
||||
private function handleRecord(array $record): void
|
||||
{
|
||||
$this->baseImporter->importCode(
|
||||
'BE',
|
||||
trim($record['municipality_name']),
|
||||
trim($record['postal_info_objectid']),
|
||||
$record['municipality_objectid'],
|
||||
'bestaddress',
|
||||
$record['Y'],
|
||||
$record['X'],
|
||||
3812
|
||||
);
|
||||
}
|
||||
}
|
@@ -0,0 +1,124 @@
|
||||
<?php
|
||||
|
||||
/**
|
||||
* Chill is a software for social workers
|
||||
*
|
||||
* For the full copyright and license information, please view
|
||||
* the LICENSE file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Chill\MainBundle\Service\Import;
|
||||
|
||||
use Doctrine\DBAL\Connection;
|
||||
use Doctrine\DBAL\Statement;
|
||||
use Exception;
|
||||
use function array_key_exists;
|
||||
use function count;
|
||||
|
||||
/**
|
||||
* Optimized way to load postal code into database.
|
||||
*/
|
||||
class PostalCodeBaseImporter
|
||||
{
|
||||
private const QUERY = <<<'SQL'
|
||||
WITH g AS (
|
||||
SELECT DISTINCT
|
||||
country.id AS country_id,
|
||||
g.*
|
||||
FROM (VALUES
|
||||
{{ values }}
|
||||
) AS g (countrycode, label, code, refpostalcodeid, postalcodeSource, lon, lat, srid)
|
||||
JOIN country ON country.countrycode = g.countrycode
|
||||
)
|
||||
INSERT INTO chill_main_postal_code (id, country_id, label, code, origin, refpostalcodeid, postalcodeSource, center, createdAt, updatedAt)
|
||||
SELECT
|
||||
nextval('chill_main_postal_code_id_seq'),
|
||||
g.country_id,
|
||||
g.label AS glabel,
|
||||
g.code,
|
||||
0,
|
||||
g.refpostalcodeid,
|
||||
g.postalcodeSource,
|
||||
CASE WHEN (g.lon::float != 0.0 AND g.lat::float != 0.0) THEN ST_Transform(ST_setSrid(ST_point(g.lon::float, g.lat::float), g.srid::int), 4326) ELSE NULL END,
|
||||
NOW(),
|
||||
NOW()
|
||||
FROM g
|
||||
ON CONFLICT (code, refpostalcodeid, postalcodeSource) WHERE refpostalcodeid IS NOT NULL DO UPDATE SET label = excluded.label, center = excluded.center, updatedAt = NOW()
|
||||
SQL;
|
||||
|
||||
private const VALUE = '(?, ?, ?, ?, ?, ?, ?, ?)';
|
||||
|
||||
/**
|
||||
* @var array<int, Statement>
|
||||
*/
|
||||
private array $cachingStatements = [];
|
||||
|
||||
private Connection $defaultConnection;
|
||||
|
||||
private array $waitingForInsert = [];
|
||||
|
||||
public function __construct(
|
||||
Connection $defaultConnection
|
||||
) {
|
||||
$this->defaultConnection = $defaultConnection;
|
||||
}
|
||||
|
||||
public function finalize(): void
|
||||
{
|
||||
$this->doInsertPending();
|
||||
}
|
||||
|
||||
public function importCode(
|
||||
string $countryCode,
|
||||
string $label,
|
||||
string $code,
|
||||
string $refPostalCodeId,
|
||||
string $refPostalCodeSource,
|
||||
float $centerLat,
|
||||
float $centerLon,
|
||||
int $centerSRID
|
||||
): void {
|
||||
$this->waitingForInsert[] = [
|
||||
$countryCode,
|
||||
$label,
|
||||
$code,
|
||||
$refPostalCodeId,
|
||||
$refPostalCodeSource,
|
||||
$centerLon,
|
||||
$centerLat,
|
||||
$centerSRID,
|
||||
];
|
||||
|
||||
if (100 <= count($this->waitingForInsert)) {
|
||||
$this->doInsertPending();
|
||||
}
|
||||
}
|
||||
|
||||
private function doInsertPending(): void
|
||||
{
|
||||
if (!array_key_exists($forNumber = count($this->waitingForInsert), $this->cachingStatements)) {
|
||||
$sql = strtr(self::QUERY, [
|
||||
'{{ values }}' => implode(
|
||||
', ',
|
||||
array_fill(0, $forNumber, self::VALUE)
|
||||
),
|
||||
]);
|
||||
|
||||
$this->cachingStatements[$forNumber] = $this->defaultConnection->prepare($sql);
|
||||
}
|
||||
|
||||
$statement = $this->cachingStatements[$forNumber];
|
||||
|
||||
try {
|
||||
$statement->executeStatement(array_merge(...$this->waitingForInsert));
|
||||
} catch (Exception $e) {
|
||||
// in some case, we can add debug code here
|
||||
//dump($this->waitingForInsert);
|
||||
throw $e;
|
||||
} finally {
|
||||
$this->waitingForInsert = [];
|
||||
}
|
||||
}
|
||||
}
|
@@ -0,0 +1,105 @@
|
||||
<?php
|
||||
|
||||
/**
|
||||
* Chill is a software for social workers
|
||||
*
|
||||
* For the full copyright and license information, please view
|
||||
* the LICENSE file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Chill\MainBundle\Service\Import;
|
||||
|
||||
use League\Csv\Reader;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use RuntimeException;
|
||||
use Symfony\Contracts\HttpClient\HttpClientInterface;
|
||||
|
||||
/**
|
||||
* Load French's postal codes from opendata.
|
||||
*
|
||||
* Currently, the source is datanova / la poste:
|
||||
* https://datanova.legroupe.laposte.fr/explore/dataset/laposte_hexasmal/information/
|
||||
*/
|
||||
class PostalCodeFRFromOpenData
|
||||
{
|
||||
private const CSV = 'https://datanova.legroupe.laposte.fr/explore/dataset/laposte_hexasmal/download/?format=csv&timezone=Europe/Berlin&lang=fr&use_labels_for_header=true&csv_separator=%3B';
|
||||
|
||||
private PostalCodeBaseImporter $baseImporter;
|
||||
|
||||
private HttpClientInterface $client;
|
||||
|
||||
private LoggerInterface $logger;
|
||||
|
||||
public function __construct(
|
||||
PostalCodeBaseImporter $baseImporter,
|
||||
HttpClientInterface $client,
|
||||
LoggerInterface $logger
|
||||
) {
|
||||
$this->baseImporter = $baseImporter;
|
||||
$this->client = $client;
|
||||
$this->logger = $logger;
|
||||
}
|
||||
|
||||
public function import(): void
|
||||
{
|
||||
$response = $this->client->request('GET', self::CSV);
|
||||
|
||||
if (200 !== $response->getStatusCode()) {
|
||||
throw new RuntimeException('could not download CSV');
|
||||
}
|
||||
|
||||
$tmpfile = tmpfile();
|
||||
|
||||
if (false === $tmpfile) {
|
||||
throw new RuntimeException('could not create temporary file');
|
||||
}
|
||||
|
||||
foreach ($this->client->stream($response) as $chunk) {
|
||||
fwrite($tmpfile, $chunk->getContent());
|
||||
}
|
||||
|
||||
fseek($tmpfile, 0);
|
||||
|
||||
$csv = Reader::createFromStream($tmpfile);
|
||||
$csv->setDelimiter(';');
|
||||
$csv->setHeaderOffset(0);
|
||||
|
||||
foreach ($csv as $offset => $record) {
|
||||
$this->handleRecord($record);
|
||||
}
|
||||
|
||||
$this->baseImporter->finalize();
|
||||
fclose($tmpfile);
|
||||
|
||||
$this->logger->info(__CLASS__ . ' postal code fetched', ['offset' => $offset ?? 0]);
|
||||
}
|
||||
|
||||
private function handleRecord(array $record): void
|
||||
{
|
||||
if ('' !== trim($record['coordonnees_gps'])) {
|
||||
[$lat, $lon] = array_map(static fn ($el) => (float) trim($el), explode(',', $record['coordonnees_gps']));
|
||||
} else {
|
||||
$lat = $lon = 0.0;
|
||||
}
|
||||
|
||||
$ref = trim($record['Code_commune_INSEE']);
|
||||
|
||||
if ('987' === substr($ref, 0, 3)) {
|
||||
// some differences in French Polynesia
|
||||
$ref .= '.' . trim($record['Libellé_d_acheminement']);
|
||||
}
|
||||
|
||||
$this->baseImporter->importCode(
|
||||
'FR',
|
||||
trim($record['Libellé_d_acheminement']),
|
||||
trim($record['Code_postal']),
|
||||
$ref,
|
||||
'INSEE',
|
||||
$lat,
|
||||
$lon,
|
||||
4326
|
||||
);
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user