sign-pdf-worker/pythonProject/worker.py

90 lines
4.0 KiB
Python
Raw Normal View History

import base64
import io
import json
import logging
import os
import pika
import sign
LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) '
'-35s %(lineno) -5d: %(message)s')
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(os.environ.get('LOG_LEVEL', logging.INFO))
for v in ['AMQP_URL', 'PKCS12_PATH', 'TIMESTAMP_URL', 'QUEUE_IN', 'EXCHANGE_OUT', 'OUT_ROUTING_KEY']:
if v not in os.environ:
LOGGER.error('Missing environment variable: %s', v)
raise ValueError('Missing environment variable: ' + v)
DSN = os.environ.get('AMQP_URL')
PKCS12_PATH = os.environ.get('PKCS12_PATH')
TIMESTAMP_URL = os.environ.get('TIMESTAMP_URL')
QUEUE_IN = os.environ.get('QUEUE_IN')
EXCHANGE_OUT = os.environ.get('EXCHANGE_OUT')
OUT_ROUTING_KEY = os.environ.get('OUT_ROUTING_KEY')
orchestrator = sign.SignOrchestrator(PKCS12_PATH, TIMESTAMP_URL, pkcs12_password=os.environ.get('PKCS12_PASSWORD', None))
parameters = pika.URLParameters(DSN)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.confirm_delivery()
def on_message(channel, method_frame, header_frame, body):
LOGGER.debug("receiving a message")
body_content = json.loads(body)
LOGGER.info(f"request to add a signature, signatureId: {body_content['signatureId']}")
try:
box_place = (body_content['signatureZone']['x'], body_content['signatureZone']['y'],
body_content['signatureZone']['x'] + body_content['signatureZone']['width'],
body_content['signatureZone']['y'] + body_content['signatureZone']['height'])
LOGGER.debug("will try signature")
signed = orchestrator.sign(reason=body_content['reason'], signature_index=body_content['signatureZoneIndex'],
box_place=box_place, on_page=body_content['signatureZone']['PDFPage']['index'],
signer_text=body_content['signerText'],
input_content=base64.b64decode(body_content['content']))
LOGGER.info(f"signature obtained, signatureId: {body_content['signatureId']}")
if bool(os.environ.get('DEBUG', 'false')):
with open(f"./assets/new.{method_frame.consumer_tag}.{method_frame.delivery_tag}.pdf", 'wb') as f:
f.write(signed.read())
LOGGER.debug("signed file saved")
# because we consumed the buffer to write a file, we have to rewind it
signed.seek(0)
channel.basic_publish(exchange=EXCHANGE_OUT,
body=json.dumps({'signatureId': body_content['signatureId'],
'content': base64.b64encode(signed.read()).decode('utf-8')}),
properties=pika.BasicProperties(content_type='application/json',
delivery_mode=pika.DeliveryMode.Transient),
routing_key=OUT_ROUTING_KEY)
LOGGER.debug("signed file resend to amqp")
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
except Exception as e:
LOGGER.warning(f"error encountered while signing: {e}")
if method_frame.redelivered:
LOGGER.warning(
f"stopping handling this message, because the message is already redelivered, signatureId: {body_content['signatureId']}")
channel.basic_reject(delivery_tag=method_frame.delivery_tag, requeue=True)
else:
LOGGER.warning(f"first try failed, signatureId: {body_content['signatureId']}")
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
if __name__ == '__main__':
LOGGER.info('starting worker')
channel.basic_consume(QUEUE_IN, on_message)
try:
LOGGER.info("start consuming")
channel.start_consuming()
except KeyboardInterrupt:
LOGGER.info("keyboard interrupt")
channel.stop_consuming()
connection.close()