import base64 import io import json import logging import os import pika import sign LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) ' '-35s %(lineno) -5d: %(message)s') logging.basicConfig(level=logging.INFO, format=LOG_FORMAT) LOGGER = logging.getLogger(__name__) LOGGER.setLevel(os.environ.get('LOG_LEVEL', logging.INFO)) for v in ['AMQP_URL', 'PKCS12_PATH', 'TIMESTAMP_URL', 'QUEUE_IN', 'EXCHANGE_OUT', 'OUT_ROUTING_KEY']: if v not in os.environ: LOGGER.error('Missing environment variable: %s', v) raise ValueError('Missing environment variable: ' + v) DSN = os.environ.get('AMQP_URL') PKCS12_PATH = os.environ.get('PKCS12_PATH') TIMESTAMP_URL = os.environ.get('TIMESTAMP_URL') QUEUE_IN = os.environ.get('QUEUE_IN') EXCHANGE_OUT = os.environ.get('EXCHANGE_OUT') OUT_ROUTING_KEY = os.environ.get('OUT_ROUTING_KEY') orchestrator = sign.SignOrchestrator(PKCS12_PATH, TIMESTAMP_URL, pkcs12_password=os.environ.get('PKCS12_PASSWORD', None)) parameters = pika.URLParameters(DSN) connection = pika.BlockingConnection(parameters) channel = connection.channel() channel.confirm_delivery() def on_message(channel, method_frame, header_frame, body): LOGGER.debug("receiving a message") body_content = json.loads(body) LOGGER.info(f"request to add a signature, signatureId: {body_content['signatureId']}") try: box_place = (body_content['signatureZone']['x'], body_content['signatureZone']['y'], body_content['signatureZone']['x'] + body_content['signatureZone']['width'], body_content['signatureZone']['y'] + body_content['signatureZone']['height']) LOGGER.debug("will try signature") signed = orchestrator.sign(reason=body_content['reason'], signature_index=body_content['signatureZoneIndex'], box_place=box_place, on_page=body_content['signatureZone']['PDFPage']['index'], signer_text=body_content['signerText'], input_content=base64.b64decode(body_content['content'])) LOGGER.info(f"signature obtained, signatureId: {body_content['signatureId']}") if bool(os.environ.get('DEBUG', 'false')): with open(f"./assets/new.{method_frame.consumer_tag}.{method_frame.delivery_tag}.pdf", 'wb') as f: f.write(signed.read()) LOGGER.debug("signed file saved") # because we consumed the buffer to write a file, we have to rewind it signed.seek(0) channel.basic_publish(exchange=EXCHANGE_OUT, body=json.dumps({'signatureId': body_content['signatureId'], 'content': base64.b64encode(signed.read()).decode('utf-8')}), properties=pika.BasicProperties(content_type='application/json', delivery_mode=pika.DeliveryMode.Transient), routing_key=OUT_ROUTING_KEY) LOGGER.debug("signed file resend to amqp") channel.basic_ack(delivery_tag=method_frame.delivery_tag) except Exception as e: LOGGER.warning(f"error encountered while signing: {e}") if method_frame.redelivered: LOGGER.warning( f"stopping handling this message, because the message is already redelivered, signatureId: {body_content['signatureId']}") channel.basic_reject(delivery_tag=method_frame.delivery_tag, requeue=True) else: LOGGER.warning(f"first try failed, signatureId: {body_content['signatureId']}") channel.basic_ack(delivery_tag=method_frame.delivery_tag) if __name__ == '__main__': LOGGER.info('starting worker') channel.basic_consume(QUEUE_IN, on_message) try: LOGGER.info("start consuming") channel.start_consuming() except KeyboardInterrupt: LOGGER.info("keyboard interrupt") channel.stop_consuming() connection.close()