*/ private array $cachingStatements = []; private ?string $currentSource = null; private Connection $defaultConnection; private bool $isInitialized = false; private LoggerInterface $logger; private array $waitingForInsert = []; public function __construct(Connection $defaultConnection, LoggerInterface $logger) { $this->defaultConnection = $defaultConnection; $this->logger = $logger; } public function finalize(): void { $this->doInsertPending(); $this->updateAddressReferenceTable(); $this->deleteTemporaryTable(); $this->currentSource = null; $this->isInitialized = false; } public function importAddress( string $refAddress, string $refPostalCode, string $postalCode, string $street, string $streetNumber, string $source, ?float $lat = null, ?float $lon = null, ?int $srid = null ): void { if (!$this->isInitialized) { $this->initialize($source); } if ($this->currentSource !== $source) { throw new LogicException('Cannot store addresses from different sources during same import. Execute finalize to commit inserts before changing the source'); } $this->waitingForInsert[] = [ $refAddress, $refPostalCode, $postalCode, $street, $streetNumber, $source, $lat, $lon, $srid, ]; if (100 <= count($this->waitingForInsert)) { $this->doInsertPending(); } } private function createTemporaryTable(): void { $this->defaultConnection->executeStatement('CREATE TEMPORARY TABLE reference_address_temp ( postcode_id INT, refid VARCHAR(255), street VARCHAR(255), streetnumber VARCHAR(255), municipalitycode VARCHAR(255), source VARCHAR(255), point GEOMETRY ); '); $this->defaultConnection->executeStatement('SET work_mem TO \'50MB\''); } private function deleteTemporaryTable(): void { $this->defaultConnection->executeStatement('DROP TABLE IF EXISTS reference_address_temp'); } private function doInsertPending(): void { if (!array_key_exists($forNumber = count($this->waitingForInsert), $this->cachingStatements)) { $sql = strtr(self::INSERT, [ '{{ values }}' => implode( ', ', array_fill(0, $forNumber, self::VALUE) ), ]); $this->cachingStatements[$forNumber] = $this->defaultConnection->prepare($sql); } $this->logger->debug(self::LOG_PREFIX . ' inserting pending addresses', [ 'number' => $forNumber, 'first' => $this->waitingForInsert[0] ?? null, ]); $statement = $this->cachingStatements[$forNumber]; try { $statement->executeStatement(array_merge(...$this->waitingForInsert)); } catch (Exception $e) { // in some case, we can add debug code here //dump($this->waitingForInsert); throw $e; } finally { $this->waitingForInsert = []; } } private function initialize(string $source): void { $this->currentSource = $source; $this->deleteTemporaryTable(); $this->createTemporaryTable(); $this->isInitialized = true; } private function updateAddressReferenceTable(): void { $this->defaultConnection->executeStatement( 'CREATE INDEX idx_ref_add_temp ON reference_address_temp (refid)' ); //1) Add new addresses $this->logger->info(self::LOG_PREFIX . 'upsert new addresses'); $affected = $this->defaultConnection->executeStatement("INSERT INTO chill_main_address_reference (id, postcode_id, refid, street, streetnumber, municipalitycode, source, point, createdat, deletedat, updatedat) SELECT nextval('chill_main_address_reference_id_seq'), postcode_id, refid, street, streetnumber, municipalitycode, source, point, NOW(), null, NOW() FROM reference_address_temp ON CONFLICT (refid, source) DO UPDATE SET postcode_id = excluded.postcode_id, refid = excluded.refid, street = excluded.street, streetnumber = excluded.streetnumber, municipalitycode = excluded.municipalitycode, source = excluded.source, point = excluded.point, updatedat = NOW(), deletedAt = NULL "); $this->logger->info(self::LOG_PREFIX . 'addresses upserted', ['upserted' => $affected]); //3) Delete addresses $this->logger->info(self::LOG_PREFIX . 'soft delete adresses'); $affected = $this->defaultConnection->executeStatement('UPDATE chill_main_address_reference SET deletedat = NOW() WHERE chill_main_address_reference.refid NOT IN (SELECT refid FROM reference_address_temp WHERE source LIKE ?) AND chill_main_address_reference.source LIKE ? ', [$this->currentSource, $this->currentSource]); $this->logger->info(self::LOG_PREFIX . 'addresses deleted', ['deleted' => $affected]); } }