Julien Fastré
97a2385167
All checks were successful
Build image and push it to registry / build (push) Successful in 1m27s
The 'TIMESTAMP_URL' environment variable check was removed from the worker script as it is no longer required. This simplifies the code by eliminating unnecessary validation.
92 lines
4.3 KiB
Python
92 lines
4.3 KiB
Python
import base64
|
|
import json
|
|
import logging
|
|
import os
|
|
|
|
import pika
|
|
import sign
|
|
|
|
LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) '
|
|
'-35s %(lineno) -5d: %(message)s')
|
|
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
|
|
LOGGER = logging.getLogger(__name__)
|
|
LOGGER.setLevel(os.environ.get('LOG_LEVEL', logging.INFO))
|
|
|
|
for v in ['AMQP_URL', 'PKCS12_PATH', 'QUEUE_IN', 'EXCHANGE_OUT', 'OUT_ROUTING_KEY', 'TSA_CONFIG_PATH', 'TSA_CERT_CHAIN', 'TSA_KEY_PASSWORD']:
|
|
if v not in os.environ:
|
|
LOGGER.error('Missing environment variable: %s', v)
|
|
raise ValueError('Missing environment variable: ' + v)
|
|
|
|
DSN = os.environ.get('AMQP_URL')
|
|
PKCS12_PATH = os.environ.get('PKCS12_PATH')
|
|
QUEUE_IN = os.environ.get('QUEUE_IN')
|
|
EXCHANGE_OUT = os.environ.get('EXCHANGE_OUT')
|
|
OUT_ROUTING_KEY = os.environ.get('OUT_ROUTING_KEY')
|
|
TSA_CONFIG_PATH = os.environ.get('TSA_CONFIG_PATH')
|
|
TSA_CERT_CHAIN = os.environ.get('TSA_CERT_CHAIN')
|
|
TSA_KEY_PASSWORD = os.environ.get('TSA_KEY_PASSWORD')
|
|
|
|
orchestrator = sign.SignOrchestrator(PKCS12_PATH, TSA_CONFIG_PATH, TSA_KEY_PASSWORD, TSA_CERT_CHAIN, pkcs12_password=os.environ.get('PKCS12_PASSWORD', None))
|
|
|
|
parameters = pika.URLParameters(DSN)
|
|
connection = pika.BlockingConnection(parameters)
|
|
channel = connection.channel()
|
|
channel.confirm_delivery()
|
|
|
|
|
|
def on_message(channel, method_frame, header_frame, body):
|
|
LOGGER.debug("receiving a message")
|
|
body_content = json.loads(body)
|
|
LOGGER.info(f"request to add a signature, signatureId: {body_content['signatureId']}")
|
|
|
|
try:
|
|
box_place = (body_content['signatureZone']['x'], body_content['signatureZone']['y'],
|
|
body_content['signatureZone']['x'] + body_content['signatureZone']['width'],
|
|
body_content['signatureZone']['y'] - body_content['signatureZone']['height'])
|
|
LOGGER.debug("will try signature")
|
|
signed = orchestrator.sign(reason=body_content['reason'], signature_index=body_content['signatureZoneIndex'],
|
|
box_place=box_place, on_page=body_content['signatureZone']['PDFPage']['index'],
|
|
signer_text=body_content['signerText'],
|
|
input_content=base64.b64decode(body_content['content']))
|
|
LOGGER.info(f"signature obtained, signatureId: {body_content['signatureId']}")
|
|
|
|
if bool(os.environ.get('DEBUG', 'false')):
|
|
with open(f"./assets/new.{method_frame.consumer_tag}.{method_frame.delivery_tag}.pdf", 'wb') as f:
|
|
f.write(signed.read())
|
|
LOGGER.debug("signed file saved")
|
|
# because we consumed the buffer to write a file, we have to rewind it
|
|
signed.seek(0)
|
|
|
|
channel.basic_publish(exchange=EXCHANGE_OUT,
|
|
body=json.dumps({'signatureId': body_content['signatureId'],
|
|
'signatureZoneIndex': body_content['signatureZoneIndex'],
|
|
'content': base64.b64encode(signed.read()).decode('utf-8')}),
|
|
properties=pika.BasicProperties(content_type='application/json',
|
|
delivery_mode=pika.DeliveryMode.Transient),
|
|
routing_key=OUT_ROUTING_KEY)
|
|
LOGGER.debug("signed file resend to amqp")
|
|
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
|
|
|
|
except Exception as e:
|
|
LOGGER.warning(f"error encountered while signing: {e}")
|
|
if method_frame.redelivered:
|
|
LOGGER.warning(
|
|
f"stopping handling this message, because the message is already redelivered, signatureId: {body_content['signatureId']}")
|
|
channel.basic_reject(delivery_tag=method_frame.delivery_tag, requeue=True)
|
|
else:
|
|
LOGGER.warning(f"first try failed, signatureId: {body_content['signatureId']}")
|
|
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
|
|
raise e
|
|
|
|
|
|
if __name__ == '__main__':
|
|
LOGGER.info('starting worker')
|
|
channel.basic_consume(QUEUE_IN, on_message)
|
|
try:
|
|
LOGGER.info("start consuming")
|
|
channel.start_consuming()
|
|
except KeyboardInterrupt:
|
|
LOGGER.info("keyboard interrupt")
|
|
channel.stop_consuming()
|
|
connection.close()
|