Create a dedicated timestamper which use the openssl cli and custom certificates #2

Merged
julienfastre merged 6 commits from test-ts-sign into main 2024-10-11 14:19:52 +00:00
5 changed files with 91 additions and 14 deletions
Showing only changes of commit c8042a6f84 - Show all commits

View File

@ -7,11 +7,15 @@ from pyhanko.sign import signers, timestamps, fields
from pyhanko_certvalidator import ValidationContext
from typing_extensions import Buffer
from pythonProject.timestamp import LocalOpensslTimestamp
class SignOrchestrator:
"""Orchestrate the signature on document"""
def __init__(self, pkcs12_path: str, timestamp_url: str, pkcs12_password: Optional[bytes] = None):
def __init__(self, pkcs12_path: str,
tsa_config_path: str, tsa_password: str, tsa_cert_chain: str,
pkcs12_password: Optional[bytes] = None):
# Load signer key material from PKCS#12 file
# This assumes that any relevant intermediate certs are also included
# in the PKCS#12 file.
@ -20,9 +24,7 @@ class SignOrchestrator:
)
# Set up a timestamping client to fetch timestamps tokens
self.timestamper = timestamps.HTTPTimeStamper(
url=timestamp_url,
)
self.timestamper = LocalOpensslTimestamp(tsa_config_path, tsa_password, tsa_cert_chain)
self.stamp_style = stamp.TextStampStyle(
stamp_text="Signé par:\n%(signer_text)s\nLe %(ts)s",

View File

@ -1,6 +1,16 @@
from sign import SignOrchestrator
orchestrator = SignOrchestrator('./assets/dummy.p12','http://freetsa.org/tsr', pkcs12_password=None)
"""
This is a script to sign a file with the dummy assets
It is created mainly for testing purpose
"""
orchestrator = SignOrchestrator('./assets/dummy.p12',
'/home/julien/dev/chill/sign-pdf-worker/ts-authority/rootca.conf',
'5678',
'/home/julien/dev/chill/sign-pdf-worker/ts-authority/ca/tsa-chain.pem',
pkcs12_password=None)
with open('./assets/test.pdf', 'rb') as input:
signed_content = orchestrator.sign(reason="first signer", signature_index=0,
@ -11,9 +21,9 @@ with open('./assets/test.pdf', 'rb') as input:
output.write(signed_content.read())
with open('./assets/test_signed_0.pdf', 'rb') as input:
signed_content = orchestrator.sign(reason="second signer", signature_index=1,
input_content=input.read(), box_place=(100, 600, 300, 660), on_page=0,
signer_text="M. Bah Mamadou")
signed_content = orchestrator.sign(reason="second signer", signature_index=1,
input_content=input.read(), box_place=(100, 600, 300, 660), on_page=0,
signer_text="M. Bah Mamadou")
with open('./assets/test_signed_1.pdf', 'wb') as output:
output.write(signed_content.read())
with open('./assets/test_signed_1.pdf', 'wb') as output:
output.write(signed_content.read())

View File

@ -0,0 +1,64 @@
import logging
import os
from asn1crypto import tsp
from asn1crypto.tsp import TimeStampResp
from pyhanko.sign.timestamps import TimeStamper
import subprocess
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(os.environ.get('LOG_LEVEL', logging.DEBUG))
class LocalOpensslTimestamp(TimeStamper):
"""
Apply a timestamp using a local openssl cli.
The class provides methods to request a timestamp response from a local OpenSSL TSA cli.
"""
def __init__(self, path_config: str, password: str, path_chain: str):
super().__init__()
self.path_conf = path_config
self.password = password
self.path_chain = path_chain
def request_tsa_response(self, req: tsp.TimeStampReq) -> tsp.TimeStampResp:
with open('/tmp/request.tsq', 'wb') as request:
request.write(req.dump())
cmd = [
'openssl', 'ts', '-reply',
'-config', self.path_conf,
'-queryfile', '/tmp/request.tsq',
'-chain', self.path_chain,
'-out', '/tmp/response.tsr',
'-passin', f'pass:{self.password}'
]
try:
subprocess.run(cmd, capture_output=True, check=True, timeout=10)
except subprocess.CalledProcessError as e:
LOGGER.error("Could not generate a timestamp")
LOGGER.error(f"response code: {e.returncode}")
LOGGER.error(f"stderr: {e.stderr}")
LOGGER.error(f"stdout: {e.stdout}")
LOGGER.error(f"error: {e.output}")
raise Exception("could not generate a timestamp")
except subprocess.TimeoutExpired as e:
LOGGER.error("timeout expired")
LOGGER.error(f"stderr: {e.stderr}")
LOGGER.error(f"stdout: {e.stdout}")
LOGGER.error(f"error: {e.output}")
raise e
with open('/tmp/response.tsr', mode='rb') as f:
tsp = TimeStampResp.load(f.read())
os.unlink('/tmp/response.tsr')
return tsp
async def async_request_tsa_response(self, req: tsp.TimeStampReq) -> tsp.TimeStampResp:
return self.request_tsa_response(req)

View File

@ -19,13 +19,14 @@ for v in ['AMQP_URL', 'PKCS12_PATH', 'TIMESTAMP_URL', 'QUEUE_IN', 'EXCHANGE_OUT'
DSN = os.environ.get('AMQP_URL')
PKCS12_PATH = os.environ.get('PKCS12_PATH')
TIMESTAMP_URL = os.environ.get('TIMESTAMP_URL')
QUEUE_IN = os.environ.get('QUEUE_IN')
EXCHANGE_OUT = os.environ.get('EXCHANGE_OUT')
OUT_ROUTING_KEY = os.environ.get('OUT_ROUTING_KEY')
TSA_CONFIG_PATH = os.environ.get('TSA_CONFIG_PATH')
TSA_CERT_CHAIN = os.environ.get('TSA_CERT_CHAIN')
TSA_KEY_PASSWORD = os.environ.get('TSA_KEY_PASSWORD')
orchestrator = sign.SignOrchestrator(PKCS12_PATH, TIMESTAMP_URL, pkcs12_password=os.environ.get('PKCS12_PASSWORD', None))
orchestrator = sign.SignOrchestrator(PKCS12_PATH, TSA_CONFIG_PATH, TSA_KEY_PASSWORD, TSA_CERT_CHAIN, pkcs12_password=os.environ.get('PKCS12_PASSWORD', None))
parameters = pika.URLParameters(DSN)
connection = pika.BlockingConnection(parameters)

View File

@ -138,7 +138,7 @@ subjectKeyIdentifier = hash
default_tsa = tsa_config1
[ tsa_config1 ]
dir = . # TSA root directory, same as root-ca
dir = /home/julien/dev/chill/sign-pdf-worker/ts-authority # TSA root directory, same as root-ca
serial = $dir/ca/tsa_serial # current serial number (mandatory)
signer_cert = $dir/ca/tsa.crt # signing certificate (optional)
certs = $dir/ca/tsa-chain.pem # certification chain (optional)