Fix infinite retry loop on signing errors in worker
Catch SigningError separately (logged at ERROR level) and fix the ack/nack logic so failed messages are requeued once then discarded, instead of being requeued indefinitely or lost. Remove the bare `raise e` that crashed the worker after each error. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
+15
-8
@@ -5,6 +5,7 @@ import os
|
|||||||
|
|
||||||
import pika
|
import pika
|
||||||
import sign
|
import sign
|
||||||
|
from pyhanko.sign.general import SigningError
|
||||||
|
|
||||||
LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) '
|
LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) '
|
||||||
'-35s %(lineno) -5d: %(message)s')
|
'-35s %(lineno) -5d: %(message)s')
|
||||||
@@ -67,16 +68,22 @@ def on_message(channel, method_frame, header_frame, body):
|
|||||||
LOGGER.debug("signed file resend to amqp")
|
LOGGER.debug("signed file resend to amqp")
|
||||||
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
|
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
|
||||||
|
|
||||||
except Exception as e:
|
except SigningError as e:
|
||||||
LOGGER.warning(f"error encountered while signing: {e}")
|
LOGGER.error(f"signing error for signatureId {body_content['signatureId']}: {e}", exc_info=True)
|
||||||
if method_frame.redelivered:
|
if method_frame.redelivered:
|
||||||
LOGGER.warning(
|
LOGGER.error(f"message already redelivered, discarding signatureId: {body_content['signatureId']}")
|
||||||
f"stopping handling this message, because the message is already redelivered, signatureId: {body_content['signatureId']}")
|
channel.basic_reject(delivery_tag=method_frame.delivery_tag, requeue=False)
|
||||||
channel.basic_reject(delivery_tag=method_frame.delivery_tag, requeue=True)
|
|
||||||
else:
|
else:
|
||||||
LOGGER.warning(f"first try failed, signatureId: {body_content['signatureId']}")
|
LOGGER.warning(f"first try failed, requeueing for one retry, signatureId: {body_content['signatureId']}")
|
||||||
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
|
channel.basic_nack(delivery_tag=method_frame.delivery_tag, requeue=True)
|
||||||
raise e
|
except Exception as e:
|
||||||
|
LOGGER.warning(f"error encountered while signing, signatureId: {body_content['signatureId']}: {e}", exc_info=True)
|
||||||
|
if method_frame.redelivered:
|
||||||
|
LOGGER.warning(f"message already redelivered, discarding signatureId: {body_content['signatureId']}")
|
||||||
|
channel.basic_reject(delivery_tag=method_frame.delivery_tag, requeue=False)
|
||||||
|
else:
|
||||||
|
LOGGER.warning(f"first try failed, requeueing for one retry, signatureId: {body_content['signatureId']}")
|
||||||
|
channel.basic_nack(delivery_tag=method_frame.delivery_tag, requeue=True)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
|
|||||||
Reference in New Issue
Block a user