*/ private array $cachingStatements = []; private ?string $currentSource = null; private bool $isInitialized = false; private array $waitingForInsert = []; public function __construct( private readonly Connection $defaultConnection, private readonly LoggerInterface $logger, private readonly MailerInterface $mailer, ) {} /** * Finalize the import process and make reconciliation with addresses. * * @param bool $allowRemoveDoubleRefId if true, allow the importer to remove automatically addresses with same refid * * @throws \Exception */ public function finalize(bool $allowRemoveDoubleRefId = false, ?string $sendAddressReportToEmail = null): void { $this->doInsertPending(); $this->updateAddressReferenceTable($allowRemoveDoubleRefId, $sendAddressReportToEmail); $this->deleteTemporaryTable(); $this->currentSource = null; $this->isInitialized = false; } /** * Do import a single address. * * @throws \Exception */ public function importAddress( string $refAddress, ?string $refPostalCode, string $postalCode, string $street, string $streetNumber, string $source, ?float $lat = null, ?float $lon = null, ?int $srid = null, ): void { if (!$this->isInitialized) { $this->initialize($source); } if ($this->currentSource !== $source) { throw new \LogicException('Cannot store addresses from different sources during same import. Execute finalize to commit inserts before changing the source'); } $this->waitingForInsert[] = [ $refAddress, $refPostalCode, $postalCode, $street, $streetNumber, $source, $lat, $lon, $srid, ]; if (100 <= \count($this->waitingForInsert)) { $this->doInsertPending(); } } private function createTemporaryTable(): void { $this->defaultConnection->executeStatement('CREATE TEMPORARY TABLE reference_address_temp ( postcode_id INT DEFAULT NULL, postalcode TEXT DEFAULT \'\', refid VARCHAR(255), street VARCHAR(255), streetnumber VARCHAR(255), municipalitycode VARCHAR(255), source VARCHAR(255), point GEOMETRY ); '); $this->defaultConnection->executeStatement('SET work_mem TO \'50MB\''); } private function deleteTemporaryTable(): void { $this->defaultConnection->executeStatement('DROP TABLE IF EXISTS reference_address_temp'); } private function doInsertPending(): void { if (!\array_key_exists($forNumber = \count($this->waitingForInsert), $this->cachingStatements)) { $sql = strtr(self::INSERT, [ '{{ values }}' => implode( ', ', array_fill(0, $forNumber, self::VALUE) ), ]); $this->logger->debug(self::LOG_PREFIX.' generated sql for insert', [ 'sql' => $sql, 'forNumber' => $forNumber, ]); $this->cachingStatements[$forNumber] = $this->defaultConnection->prepare($sql); } if (0 === $forNumber) { return; } $this->logger->debug(self::LOG_PREFIX.' inserting pending addresses', [ 'number' => $forNumber, 'first' => $this->waitingForInsert[0] ?? null, ]); $statement = $this->cachingStatements[$forNumber]; try { $affected = $statement->executeStatement(array_merge(...$this->waitingForInsert)); if (0 === $affected) { throw new \RuntimeException('no row affected'); } } catch (\Exception $e) { // in some case, we can add debug code here // dump($this->waitingForInsert); throw $e; } finally { $this->waitingForInsert = []; } } private function initialize(string $source): void { $this->currentSource = $source; $this->deleteTemporaryTable(); $this->createTemporaryTable(); $this->isInitialized = true; } private function updateAddressReferenceTable(bool $allowRemoveDoubleRefId, ?string $sendAddressReportToEmail = null): void { $this->defaultConnection->executeStatement( 'CREATE INDEX idx_ref_add_temp ON reference_address_temp (refid) WHERE postcode_id IS NOT NULL' ); // 0) detect for doublon in current temporary table $results = $this->defaultConnection->executeQuery( 'SELECT COUNT(*) AS nb_appearance, refid FROM reference_address_temp WHERE postcode_id IS NOT NULL GROUP BY refid HAVING count(*) > 1' ); $hasDouble = false; foreach ($results->iterateAssociative() as $result) { $this->logger->error(self::LOG_PREFIX.'Some reference id are present more than one time', ['nb_apparearance' => $result['nb_appearance'], 'refid' => $result['refid']]); $hasDouble = true; } if ($hasDouble) { if ($allowRemoveDoubleRefId) { $this->logger->alert(self::LOG_PREFIX.'We are going to remove the addresses which are present more than once in the table'); $this->defaultConnection->executeStatement('ALTER TABLE reference_address_temp ADD COLUMN gid SERIAL'); $removed = $this->defaultConnection->executeStatement(<<<'SQL' WITH ordering AS ( SELECT gid, rank() over (PARTITION BY refid ORDER BY gid DESC) AS ranking FROM reference_address_temp WHERE postcode_id IS NOT NULL AND refid IN (SELECT refid FROM reference_address_temp WHERE postcode_id IS NOT NULL group by refid having count(*) > 1) ), keep_last AS ( SELECT gid, ranking FROM ordering where ranking > 1 ) DELETE FROM reference_address_temp WHERE gid IN (SELECT gid FROM keep_last); SQL); $this->logger->alert(self::LOG_PREFIX.'addresses with same refid present twice, we removed some double', ['nb_removed', $removed]); } else { throw new \RuntimeException('Some addresses are present twice in the database, we cannot process them'); } } $this->defaultConnection->transactional(function ($connection): void { // 1) Add new addresses $this->logger->info(self::LOG_PREFIX.'upsert new addresses'); $affected = $connection->executeStatement("INSERT INTO chill_main_address_reference (id, postcode_id, refid, street, streetnumber, municipalitycode, source, point, createdat, deletedat, updatedat) SELECT nextval('chill_main_address_reference_id_seq'), postcode_id, refid, street, streetnumber, municipalitycode, source, point, NOW(), null, NOW() FROM reference_address_temp WHERE postcode_id IS NOT NULL ON CONFLICT (refid, source) DO UPDATE SET postcode_id = excluded.postcode_id, refid = excluded.refid, street = excluded.street, streetnumber = excluded.streetnumber, municipalitycode = excluded.municipalitycode, source = excluded.source, point = excluded.point, updatedat = NOW(), deletedAt = NULL "); $this->logger->info(self::LOG_PREFIX.'addresses upserted', ['upserted' => $affected]); // 3) Delete addresses $this->logger->info(self::LOG_PREFIX.'soft delete adresses'); $affected = $connection->executeStatement('UPDATE chill_main_address_reference SET deletedat = NOW() WHERE chill_main_address_reference.refid NOT IN (SELECT refid FROM reference_address_temp WHERE source LIKE ? AND postcode_id IS NOT NULL) AND chill_main_address_reference.source LIKE ? ', [$this->currentSource, $this->currentSource]); $this->logger->info(self::LOG_PREFIX.'addresses deleted', ['deleted' => $affected]); }); // Create a list of addresses without any postal code $results = $this->defaultConnection->executeQuery('SELECT postalcode, refid, street, streetnumber, municipalitycode, source, ST_AsText(point) FROM reference_address_temp WHERE postcode_id IS NULL '); $count = $results->rowCount(); if ($count > 0) { $this->logger->warning(self::LOG_PREFIX.'There are addresses that could not be associated with a postal code', ['nb' => $count]); $filename = sprintf('%s-%s.csv', (new \DateTimeImmutable())->format('Ymd-His'), uniqid()); $path = Path::normalize(sprintf('%s%s%s', sys_get_temp_dir(), DIRECTORY_SEPARATOR, $filename)); $writer = Writer::from($path, 'w+'); // insert headers $writer->insertOne([ 'postalcode', 'refid', 'street', 'streetnumber', 'municipalitycode', 'source', 'point', ]); $writer->insertAll($results->iterateAssociative()); $this->logger->info(sprintf(self::LOG_PREFIX.'The addresses that could not be inserted within the database are registered at path %s', $path)); if (null !== $sendAddressReportToEmail) { // first, we compress the existing file which can be quite big $attachment = gzopen($attachmentPath = sprintf('%s.gz', $path), 'w9'); gzwrite($attachment, file_get_contents($path)); gzclose($attachment); $email = (new Email()) ->addTo($sendAddressReportToEmail) ->subject('Addresses that could not be imported') ->attach(file_get_contents($attachmentPath), sprintf('%s.gz', $path)); try { $this->mailer->send($email); } catch (TransportExceptionInterface $e) { $this->logger->error(self::LOG_PREFIX.'Could not send an email with addresses that could not be registered', ['exception' => $e->getTraceAsString()]); } unlink($attachmentPath); } } } }