queue_processors: Make timer_expired receive list of events as argument.

This will give queue workers more flexibility when defining their own
override of the method.
This commit is contained in:
Mateusz Mandera
2021-07-06 14:56:23 +02:00
committed by Tim Abbott
parent 85f14eb4f7
commit 36638c95b9

View File

@@ -288,9 +288,7 @@ class QueueProcessingWorker(ABC):
try: try:
signal.signal( signal.signal(
signal.SIGALRM, signal.SIGALRM,
functools.partial( functools.partial(self.timer_expired, self.MAX_CONSUME_SECONDS, events),
self.timer_expired, self.MAX_CONSUME_SECONDS, len(events)
),
) )
try: try:
signal.alarm(self.MAX_CONSUME_SECONDS * len(events)) signal.alarm(self.MAX_CONSUME_SECONDS * len(events))
@@ -337,8 +335,10 @@ class QueueProcessingWorker(ABC):
consume_func = lambda events: self.consume(events[0]) consume_func = lambda events: self.consume(events[0])
self.do_consume(consume_func, [event]) self.do_consume(consume_func, [event])
def timer_expired(self, limit: int, event_count: int, signal: int, frame: FrameType) -> None: def timer_expired(
raise WorkerTimeoutException(limit, event_count) self, limit: int, events: List[Dict[str, Any]], signal: int, frame: FrameType
) -> None:
raise WorkerTimeoutException(limit, len(events))
def _handle_consume_exception(self, events: List[Dict[str, Any]], exception: Exception) -> None: def _handle_consume_exception(self, events: List[Dict[str, Any]], exception: Exception) -> None:
with configure_scope() as scope: with configure_scope() as scope: