diff --git a/src/Bundle/ChillMainBundle/Command/LoadAddressesFRFromBANOCommand.php b/src/Bundle/ChillMainBundle/Command/LoadAddressesFRFromBANOCommand.php new file mode 100644 index 000000000..96e7023fb --- /dev/null +++ b/src/Bundle/ChillMainBundle/Command/LoadAddressesFRFromBANOCommand.php @@ -0,0 +1,47 @@ +addressReferenceFromBano = $addressReferenceFromBano; + } + + protected function configure() + { + $this->setName('chill:main:address-ref-from-bano') + ->addArgument('departementNo', InputArgument::REQUIRED | InputArgument::IS_ARRAY, 'a list of departement numbers') + ->setDescription('Import addresses from bano (see https://bano.openstreetmap.fr'); + } + + protected function execute(InputInterface $input, OutputInterface $output): int + { + foreach ($input->getArgument('departementNo') as $departementNo) { + $output->writeln('Import addresses for ' . $departementNo); + + $this->addressReferenceFromBano->import($departementNo); + } + + return 0; + } +} diff --git a/src/Bundle/ChillMainBundle/Entity/AddressReference.php b/src/Bundle/ChillMainBundle/Entity/AddressReference.php index fc4339fe0..5a6b176c2 100644 --- a/src/Bundle/ChillMainBundle/Entity/AddressReference.php +++ b/src/Bundle/ChillMainBundle/Entity/AddressReference.php @@ -20,6 +20,9 @@ use Symfony\Component\Serializer\Annotation\Groups; * @ORM\Entity * @ORM\Table(name="chill_main_address_reference", indexes={ * @ORM\Index(name="address_refid", columns={"refId"}) + * }, + * uniqueConstraints={ + * @ORM\UniqueConstraint(name="chill_main_address_reference_unicity", columns={"refId", "source"}) * }) * @ORM\HasLifecycleCallbacks */ diff --git a/src/Bundle/ChillMainBundle/Service/Import/AddressReferenceBaseImporter.php b/src/Bundle/ChillMainBundle/Service/Import/AddressReferenceBaseImporter.php new file mode 100644 index 000000000..87659e65d --- /dev/null +++ b/src/Bundle/ChillMainBundle/Service/Import/AddressReferenceBaseImporter.php @@ -0,0 +1,207 @@ + + */ + private array $cachingStatements = []; + + private ?string $currentSource = null; + + private Connection $defaultConnection; + + private bool $isInitialized = false; + + private LoggerInterface $logger; + + private array $waitingForInsert = []; + + public function __construct(Connection $defaultConnection, LoggerInterface $logger) + { + $this->defaultConnection = $defaultConnection; + $this->logger = $logger; + } + + public function finalize(): void + { + $this->doInsertPending(); + + $this->updateAddressReferenceTable(); + + $this->deleteTemporaryTable(); + + $this->currentSource = null; + $this->isInitialized = false; + } + + public function importAddress( + string $refAddress, + string $refPostalCode, + string $postalCode, + string $street, + string $streetNumber, + string $source, + ?float $lat = null, + ?float $lon = null, + ?int $srid = null + ): void { + if (!$this->isInitialized) { + $this->initialize($source); + } + + if ($this->currentSource !== $source) { + throw new LogicException('Cannot store addresses from different sources during same import. Execute finalize to commit inserts before changing the source'); + } + + $this->waitingForInsert[] = [ + $refAddress, + $refPostalCode, + $postalCode, + $street, + $streetNumber, + $source, + $lat, + $lon, + $srid, + ]; + + if (100 <= count($this->waitingForInsert)) { + $this->doInsertPending(); + } + } + + private function createTemporaryTable(): void + { + $this->defaultConnection->executeStatement('CREATE TEMPORARY TABLE reference_address_temp ( + postcode_id INT, + refid VARCHAR(255), + street VARCHAR(255), + streetnumber VARCHAR(255), + municipalitycode VARCHAR(255), + source VARCHAR(255), + point GEOMETRY + ); + '); + $this->defaultConnection->executeStatement('SET work_mem TO \'50MB\''); + } + + private function deleteTemporaryTable(): void + { + $this->defaultConnection->executeStatement('DROP TABLE IF EXISTS reference_address_temp'); + } + + private function doInsertPending(): void + { + if (!array_key_exists($forNumber = count($this->waitingForInsert), $this->cachingStatements)) { + $sql = strtr(self::INSERT, [ + '{{ values }}' => implode( + ', ', + array_fill(0, $forNumber, self::VALUE) + ), + ]); + + $this->cachingStatements[$forNumber] = $this->defaultConnection->prepare($sql); + } + + $this->logger->debug(self::LOG_PREFIX . ' inserting pending addresses', [ + 'number' => $forNumber, + 'first' => $this->waitingForInsert[0] ?? null, + ]); + + $statement = $this->cachingStatements[$forNumber]; + + try { + $statement->executeStatement(array_merge(...$this->waitingForInsert)); + } catch (Exception $e) { + // in some case, we can add debug code here + //dump($this->waitingForInsert); + throw $e; + } finally { + $this->waitingForInsert = []; + } + } + + private function initialize(string $source): void + { + $this->currentSource = $source; + $this->deleteTemporaryTable(); + $this->createTemporaryTable(); + $this->isInitialized = true; + } + + private function updateAddressReferenceTable(): void + { + $this->defaultConnection->executeStatement( + 'CREATE INDEX idx_ref_add_temp ON reference_address_temp (refid)' + ); + + //1) Add new addresses + $this->logger->info(self::LOG_PREFIX . 'upsert new addresses'); + $affected = $this->defaultConnection->executeStatement("INSERT INTO chill_main_address_reference + (id, postcode_id, refid, street, streetnumber, municipalitycode, source, point, createdat, deletedat, updatedat) + SELECT + nextval('chill_main_address_reference_id_seq'), + postcode_id, + refid, + street, + streetnumber, + municipalitycode, + source, + point, + NOW(), + null, + NOW() + FROM reference_address_temp + ON CONFLICT (refid, source) DO UPDATE + SET postcode_id = excluded.postcode_id, refid = excluded.refid, street = excluded.street, streetnumber = excluded.streetnumber, municipalitycode = excluded.municipalitycode, source = excluded.source, point = excluded.point, updatedat = NOW(), deletedAt = NULL + "); + $this->logger->info(self::LOG_PREFIX . 'addresses upserted', ['upserted' => $affected]); + + //3) Delete addresses + $this->logger->info(self::LOG_PREFIX . 'soft delete adresses'); + $affected = $this->defaultConnection->executeStatement('UPDATE chill_main_address_reference + SET deletedat = NOW() + WHERE + chill_main_address_reference.refid NOT IN (SELECT refid FROM reference_address_temp WHERE source LIKE ?) + AND chill_main_address_reference.source LIKE ? + ', [$this->currentSource, $this->currentSource]); + $this->logger->info(self::LOG_PREFIX . 'addresses deleted', ['deleted' => $affected]); + } +} diff --git a/src/Bundle/ChillMainBundle/Service/Import/AddressReferenceFromBano.php b/src/Bundle/ChillMainBundle/Service/Import/AddressReferenceFromBano.php new file mode 100644 index 000000000..c30b7c4f2 --- /dev/null +++ b/src/Bundle/ChillMainBundle/Service/Import/AddressReferenceFromBano.php @@ -0,0 +1,87 @@ +client = $client; + $this->baseImporter = $baseImporter; + } + + public function import(string $departementNo): void + { + if (!is_numeric($departementNo) || !is_int((int) $departementNo)) { + throw new UnexpectedValueException('Could not parse this department number'); + } + + $url = "https://bano.openstreetmap.fr/data/bano-{$departementNo}.csv"; + + $response = $this->client->request('GET', $url); + + if (200 !== $response->getStatusCode()) { + throw new Exception('Could not download CSV: ' . $response->getStatusCode()); + } + + $file = tmpfile(); + + foreach ($this->client->stream($response) as $chunk) { + fwrite($file, $chunk->getContent()); + } + + fseek($file, 0); + + $csv = Reader::createFromStream($file); + $csv->setDelimiter(','); + $stmt = Statement::create() + ->process($csv, [ + 'refId', + 'streetNumber', + 'street', + 'postcode', + 'city', + '_o', + 'lat', + 'lon', + ]); + + foreach ($stmt as $record) { + $this->baseImporter->importAddress( + $record['refId'], + substr($record['refId'], 0, 5), // extract insee from reference + $record['postcode'], + $record['street'], + $record['streetNumber'], + 'BANO.' . $departementNo, + (float) $record['lat'], + (float) $record['lon'], + 4326 + ); + } + + $this->baseImporter->finalize(); + + fclose($file); + } +} diff --git a/src/Bundle/ChillMainBundle/config/services/command.yaml b/src/Bundle/ChillMainBundle/config/services/command.yaml index 07b3c1721..8c1d9b94e 100644 --- a/src/Bundle/ChillMainBundle/config/services/command.yaml +++ b/src/Bundle/ChillMainBundle/config/services/command.yaml @@ -44,6 +44,12 @@ services: tags: - { name: console.command } + Chill\MainBundle\Command\LoadAddressesFRFromBANOCommand: + autoconfigure: true + autowire: true + tags: + - { name: console.command } + Chill\MainBundle\Command\LoadPostalCodeFR: autoconfigure: true autowire: true diff --git a/src/Bundle/ChillMainBundle/migrations/Version20220730204216.php b/src/Bundle/ChillMainBundle/migrations/Version20220730204216.php new file mode 100644 index 000000000..4ed1b2918 --- /dev/null +++ b/src/Bundle/ChillMainBundle/migrations/Version20220730204216.php @@ -0,0 +1,26 @@ +addSql('CREATE UNIQUE INDEX chill_main_address_reference_unicity ON chill_main_address_reference (refId, source)'); + } + + public function down(Schema $schema): void + { + $this->addSql('DROP INDEX chill_main_address_reference_unicity'); + } +}