import logging import os from asn1crypto import tsp from asn1crypto.tsp import TimeStampResp from pyhanko.sign.timestamps import TimeStamper import subprocess LOGGER = logging.getLogger(__name__) LOGGER.setLevel(os.environ.get('LOG_LEVEL', logging.DEBUG)) class LocalOpensslTimestamp(TimeStamper): """ Apply a timestamp using a local openssl cli. The class provides methods to request a timestamp response from a local OpenSSL TSA cli. """ def __init__(self, path_config: str, password: str, path_chain: str): super().__init__() self.path_conf = path_config self.password = password self.path_chain = path_chain def request_tsa_response(self, req: tsp.TimeStampReq) -> tsp.TimeStampResp: with open('/tmp/request.tsq', 'wb') as request: request.write(req.dump()) cmd = [ 'openssl', 'ts', '-reply', '-config', self.path_conf, '-queryfile', '/tmp/request.tsq', '-chain', self.path_chain, '-out', '/tmp/response.tsr', '-passin', f'pass:{self.password}' ] try: subprocess.run(cmd, capture_output=True, check=True, timeout=10) except subprocess.CalledProcessError as e: LOGGER.error("Could not generate a timestamp") LOGGER.error(f"response code: {e.returncode}") LOGGER.error(f"stderr: {e.stderr}") LOGGER.error(f"stdout: {e.stdout}") LOGGER.error(f"error: {e.output}") raise Exception("could not generate a timestamp") except subprocess.TimeoutExpired as e: LOGGER.error("timeout expired") LOGGER.error(f"stderr: {e.stderr}") LOGGER.error(f"stdout: {e.stdout}") LOGGER.error(f"error: {e.output}") raise e with open('/tmp/response.tsr', mode='rb') as f: tsp = TimeStampResp.load(f.read()) os.unlink('/tmp/response.tsr') return tsp async def async_request_tsa_response(self, req: tsp.TimeStampReq) -> tsp.TimeStampResp: return self.request_tsa_response(req)