chill-bundles/src/Bundle/ChillMainBundle/Service/Import/AddressReferenceBaseImporter.php

261 lines
9.4 KiB
PHP

<?php
declare(strict_types=1);
/*
* Chill is a software for social workers
*
* For the full copyright and license information, please view
* the LICENSE file that was distributed with this source code.
*/
namespace Chill\MainBundle\Service\Import;
use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Statement;
use Psr\Log\LoggerInterface;
/**
* Import addresses into the database.
*
* This importer do some optimization about the import, ensuring that adresses are inserted and reconciled with
* the existing one on a optimized way.
*/
final class AddressReferenceBaseImporter
{
private const INSERT = <<<'SQL'
INSERT INTO reference_address_temp
(postcode_id, refid, street, streetnumber, municipalitycode, source, point)
SELECT
cmpc.id, i.refid, i.street, i.streetnumber, i.refpostalcode, i.source,
CASE WHEN (i.lon::float != 0.0 AND i.lat::float != 0.0) THEN ST_Transform(ST_setSrid(ST_point(i.lon::float, i.lat::float), i.srid::int), 4326) ELSE NULL END
FROM
(VALUES
{{ values }}
) AS i (refid, refpostalcode, postalcode, street, streetnumber, source, lat, lon, srid)
JOIN chill_main_postal_code cmpc ON cmpc.refpostalcodeid = i.refpostalcode and cmpc.code = i.postalcode
SQL;
private const LOG_PREFIX = '[AddressReferenceImporter] ';
private const VALUE = '(?, ?, ?, ?, ?, ?, ?, ?, ?)';
/**
* @var array<int, Statement>
*/
private array $cachingStatements = [];
private ?string $currentSource = null;
private bool $isInitialized = false;
private array $waitingForInsert = [];
public function __construct(private readonly Connection $defaultConnection, private readonly LoggerInterface $logger) {}
/**
* Finalize the import process and make reconciliation with addresses.
*
* @param bool $allowRemoveDoubleRefId if true, allow the importer to remove automatically addresses with same refid
*
* @throws \Exception
*/
public function finalize(bool $allowRemoveDoubleRefId = false): void
{
$this->doInsertPending();
$this->updateAddressReferenceTable($allowRemoveDoubleRefId);
$this->deleteTemporaryTable();
$this->currentSource = null;
$this->isInitialized = false;
}
/**
* Do import a single address.
*
* @throws \Exception
*/
public function importAddress(
string $refAddress,
?string $refPostalCode,
string $postalCode,
string $street,
string $streetNumber,
string $source,
?float $lat = null,
?float $lon = null,
?int $srid = null,
): void {
if (!$this->isInitialized) {
$this->initialize($source);
}
if ($this->currentSource !== $source) {
throw new \LogicException('Cannot store addresses from different sources during same import. Execute finalize to commit inserts before changing the source');
}
$this->waitingForInsert[] = [
$refAddress,
$refPostalCode,
$postalCode,
$street,
$streetNumber,
$source,
$lat,
$lon,
$srid,
];
if (100 <= \count($this->waitingForInsert)) {
$this->doInsertPending();
}
}
private function createTemporaryTable(): void
{
$this->defaultConnection->executeStatement('CREATE TEMPORARY TABLE reference_address_temp (
postcode_id INT,
refid VARCHAR(255),
street VARCHAR(255),
streetnumber VARCHAR(255),
municipalitycode VARCHAR(255),
source VARCHAR(255),
point GEOMETRY
);
');
$this->defaultConnection->executeStatement('SET work_mem TO \'50MB\'');
}
private function deleteTemporaryTable(): void
{
$this->defaultConnection->executeStatement('DROP TABLE IF EXISTS reference_address_temp');
}
private function doInsertPending(): void
{
if (!\array_key_exists($forNumber = \count($this->waitingForInsert), $this->cachingStatements)) {
$sql = strtr(self::INSERT, [
'{{ values }}' => implode(
', ',
array_fill(0, $forNumber, self::VALUE)
),
]);
$this->logger->debug(self::LOG_PREFIX.' generated sql for insert', [
'sql' => $sql,
'forNumber' => $forNumber,
]);
$this->cachingStatements[$forNumber] = $this->defaultConnection->prepare($sql);
}
if (0 === $forNumber) {
return;
}
$this->logger->debug(self::LOG_PREFIX.' inserting pending addresses', [
'number' => $forNumber,
'first' => $this->waitingForInsert[0] ?? null,
]);
$statement = $this->cachingStatements[$forNumber];
try {
$affected = $statement->executeStatement(array_merge(...$this->waitingForInsert));
if (0 === $affected) {
throw new \RuntimeException('no row affected');
}
} catch (\Exception $e) {
// in some case, we can add debug code here
// dump($this->waitingForInsert);
throw $e;
} finally {
$this->waitingForInsert = [];
}
}
private function initialize(string $source): void
{
$this->currentSource = $source;
$this->deleteTemporaryTable();
$this->createTemporaryTable();
$this->isInitialized = true;
}
private function updateAddressReferenceTable(bool $allowRemoveDoubleRefId): void
{
$this->defaultConnection->executeStatement(
'CREATE INDEX idx_ref_add_temp ON reference_address_temp (refid)'
);
// 0) detect for doublon in current temporary table
$results = $this->defaultConnection->executeQuery(
'SELECT COUNT(*) AS nb_appearance, refid FROM reference_address_temp GROUP BY refid HAVING count(*) > 1'
);
$hasDouble = false;
foreach ($results->iterateAssociative() as $result) {
$this->logger->error(self::LOG_PREFIX.'Some reference id are present more than one time', ['nb_apparearance' => $result['nb_appearance'], 'refid' => $result['refid']]);
$hasDouble = true;
}
if ($hasDouble) {
if ($allowRemoveDoubleRefId) {
$this->logger->alert(self::LOG_PREFIX.'We are going to remove the addresses which are present more than once in the table');
$this->defaultConnection->executeStatement('ALTER TABLE reference_address_temp ADD COLUMN gid SERIAL');
$removed = $this->defaultConnection->executeStatement(<<<'SQL'
WITH ordering AS (
SELECT gid, rank() over (PARTITION BY refid ORDER BY gid DESC) AS ranking
FROM reference_address_temp
WHERE refid IN (SELECT refid FROM reference_address_temp group by refid having count(*) > 1)
),
keep_last AS (
SELECT gid, ranking FROM ordering where ranking > 1
)
DELETE FROM reference_address_temp WHERE gid IN (SELECT gid FROM keep_last);
SQL);
$this->logger->alert(self::LOG_PREFIX.'addresses with same refid present twice, we removed some double', ['nb_removed', $removed]);
} else {
throw new \RuntimeException('Some addresses are present twice in the database, we cannot process them');
}
}
$this->defaultConnection->transactional(function ($connection): void {
// 1) Add new addresses
$this->logger->info(self::LOG_PREFIX.'upsert new addresses');
$affected = $connection->executeStatement("INSERT INTO chill_main_address_reference
(id, postcode_id, refid, street, streetnumber, municipalitycode, source, point, createdat, deletedat, updatedat)
SELECT
nextval('chill_main_address_reference_id_seq'),
postcode_id,
refid,
street,
streetnumber,
municipalitycode,
source,
point,
NOW(),
null,
NOW()
FROM reference_address_temp
ON CONFLICT (refid, source) DO UPDATE
SET postcode_id = excluded.postcode_id, refid = excluded.refid, street = excluded.street, streetnumber = excluded.streetnumber, municipalitycode = excluded.municipalitycode, source = excluded.source, point = excluded.point, updatedat = NOW(), deletedAt = NULL
");
$this->logger->info(self::LOG_PREFIX.'addresses upserted', ['upserted' => $affected]);
// 3) Delete addresses
$this->logger->info(self::LOG_PREFIX.'soft delete adresses');
$affected = $connection->executeStatement('UPDATE chill_main_address_reference
SET deletedat = NOW()
WHERE
chill_main_address_reference.refid NOT IN (SELECT refid FROM reference_address_temp WHERE source LIKE ?)
AND chill_main_address_reference.source LIKE ?
', [$this->currentSource, $this->currentSource]);
$this->logger->info(self::LOG_PREFIX.'addresses deleted', ['deleted' => $affected]);
});
}
}