Julien Fastré
c8042a6f84
Replaced HTTPTimeStamper with LocalOpensslTimestamp for TSA operations using a local OpenSSL CLI. Updated related configurations and dependencies to support this change, enhancing the timestamping process's reliability and security.
65 lines
2.1 KiB
Python
65 lines
2.1 KiB
Python
import logging
|
|
import os
|
|
|
|
from asn1crypto import tsp
|
|
from asn1crypto.tsp import TimeStampResp
|
|
from pyhanko.sign.timestamps import TimeStamper
|
|
import subprocess
|
|
|
|
LOGGER = logging.getLogger(__name__)
|
|
LOGGER.setLevel(os.environ.get('LOG_LEVEL', logging.DEBUG))
|
|
|
|
|
|
class LocalOpensslTimestamp(TimeStamper):
|
|
"""
|
|
Apply a timestamp using a local openssl cli.
|
|
|
|
The class provides methods to request a timestamp response from a local OpenSSL TSA cli.
|
|
"""
|
|
def __init__(self, path_config: str, password: str, path_chain: str):
|
|
super().__init__()
|
|
self.path_conf = path_config
|
|
self.password = password
|
|
self.path_chain = path_chain
|
|
|
|
def request_tsa_response(self, req: tsp.TimeStampReq) -> tsp.TimeStampResp:
|
|
with open('/tmp/request.tsq', 'wb') as request:
|
|
request.write(req.dump())
|
|
|
|
cmd = [
|
|
'openssl', 'ts', '-reply',
|
|
'-config', self.path_conf,
|
|
'-queryfile', '/tmp/request.tsq',
|
|
'-chain', self.path_chain,
|
|
'-out', '/tmp/response.tsr',
|
|
'-passin', f'pass:{self.password}'
|
|
]
|
|
|
|
try:
|
|
subprocess.run(cmd, capture_output=True, check=True, timeout=10)
|
|
except subprocess.CalledProcessError as e:
|
|
LOGGER.error("Could not generate a timestamp")
|
|
LOGGER.error(f"response code: {e.returncode}")
|
|
LOGGER.error(f"stderr: {e.stderr}")
|
|
LOGGER.error(f"stdout: {e.stdout}")
|
|
LOGGER.error(f"error: {e.output}")
|
|
|
|
raise Exception("could not generate a timestamp")
|
|
except subprocess.TimeoutExpired as e:
|
|
LOGGER.error("timeout expired")
|
|
LOGGER.error(f"stderr: {e.stderr}")
|
|
LOGGER.error(f"stdout: {e.stdout}")
|
|
LOGGER.error(f"error: {e.output}")
|
|
|
|
raise e
|
|
|
|
with open('/tmp/response.tsr', mode='rb') as f:
|
|
tsp = TimeStampResp.load(f.read())
|
|
|
|
os.unlink('/tmp/response.tsr')
|
|
|
|
return tsp
|
|
|
|
async def async_request_tsa_response(self, req: tsp.TimeStampReq) -> tsp.TimeStampResp:
|
|
return self.request_tsa_response(req)
|