mirror of
https://github.com/zulip/zulip.git
synced 2025-11-02 04:53:36 +00:00
queue_processors: Make timer_expired() a method.
This allows specific queue workers to override the defaut behavior and implement their own response to the timer expiring. We will want to use this for embed_links queue at least.
This commit is contained in:
committed by
Tim Abbott
parent
0fab79c027
commit
85f14eb4f7
@@ -197,10 +197,6 @@ def retry_send_email_failures(
|
||||
return wrapper
|
||||
|
||||
|
||||
def timer_expired(limit: int, event_count: int, signal: int, frame: FrameType) -> None:
|
||||
raise WorkerTimeoutException(limit, event_count)
|
||||
|
||||
|
||||
class QueueProcessingWorker(ABC):
|
||||
queue_name: str
|
||||
MAX_CONSUME_SECONDS: Optional[int] = 30
|
||||
@@ -292,7 +288,9 @@ class QueueProcessingWorker(ABC):
|
||||
try:
|
||||
signal.signal(
|
||||
signal.SIGALRM,
|
||||
functools.partial(timer_expired, self.MAX_CONSUME_SECONDS, len(events)),
|
||||
functools.partial(
|
||||
self.timer_expired, self.MAX_CONSUME_SECONDS, len(events)
|
||||
),
|
||||
)
|
||||
try:
|
||||
signal.alarm(self.MAX_CONSUME_SECONDS * len(events))
|
||||
@@ -339,6 +337,9 @@ class QueueProcessingWorker(ABC):
|
||||
consume_func = lambda events: self.consume(events[0])
|
||||
self.do_consume(consume_func, [event])
|
||||
|
||||
def timer_expired(self, limit: int, event_count: int, signal: int, frame: FrameType) -> None:
|
||||
raise WorkerTimeoutException(limit, event_count)
|
||||
|
||||
def _handle_consume_exception(self, events: List[Dict[str, Any]], exception: Exception) -> None:
|
||||
with configure_scope() as scope:
|
||||
scope.set_context(
|
||||
|
||||
Reference in New Issue
Block a user