mirror of
https://github.com/zulip/zulip.git
synced 2025-11-02 21:13:36 +00:00
missedmessage_emails: Add Sentry spans to worker thread.
This commit is contained in:
committed by
Tim Abbott
parent
9451d08bb9
commit
9d8d2d138b
@@ -620,6 +620,7 @@ class MissedMessageWorker(QueueProcessingWorker):
|
|||||||
# The main thread, which handles the RabbitMQ connection and creates
|
# The main thread, which handles the RabbitMQ connection and creates
|
||||||
# database rows from them.
|
# database rows from them.
|
||||||
@override
|
@override
|
||||||
|
@sentry_sdk.trace
|
||||||
def consume(self, event: Dict[str, Any]) -> None:
|
def consume(self, event: Dict[str, Any]) -> None:
|
||||||
logging.debug("Processing missedmessage_emails event: %s", event)
|
logging.debug("Processing missedmessage_emails event: %s", event)
|
||||||
# When we consume an event, check if there are existing pending emails
|
# When we consume an event, check if there are existing pending emails
|
||||||
@@ -693,17 +694,22 @@ class MissedMessageWorker(QueueProcessingWorker):
|
|||||||
|
|
||||||
def work(self) -> None:
|
def work(self) -> None:
|
||||||
while True:
|
while True:
|
||||||
flush_per_request_caches()
|
with sentry_sdk.start_transaction(
|
||||||
reset_queries()
|
op="task",
|
||||||
try:
|
name=f"{self.queue_name} worker thread",
|
||||||
finished = self.background_loop()
|
custom_sampling_context={"queue": self.queue_name},
|
||||||
if finished:
|
):
|
||||||
break
|
flush_per_request_caches()
|
||||||
except Exception:
|
reset_queries()
|
||||||
logging.exception(
|
try:
|
||||||
"Exception in MissedMessage background worker; restarting the loop",
|
finished = self.background_loop()
|
||||||
stack_info=True,
|
if finished:
|
||||||
)
|
break
|
||||||
|
except Exception:
|
||||||
|
logging.exception(
|
||||||
|
"Exception in MissedMessage background worker; restarting the loop",
|
||||||
|
stack_info=True,
|
||||||
|
)
|
||||||
|
|
||||||
def background_loop(self) -> bool:
|
def background_loop(self) -> bool:
|
||||||
with self.cv:
|
with self.cv:
|
||||||
@@ -747,7 +753,10 @@ class MissedMessageWorker(QueueProcessingWorker):
|
|||||||
# re-checking the condition.
|
# re-checking the condition.
|
||||||
return False
|
return False
|
||||||
|
|
||||||
was_notified = self.cv.wait_for(wait_condition, timeout=timeout)
|
with sentry_sdk.start_span(description="condvar wait") as span:
|
||||||
|
span.set_data("timeout", timeout)
|
||||||
|
was_notified = self.cv.wait_for(wait_condition, timeout=timeout)
|
||||||
|
span.set_data("was_notified", was_notified)
|
||||||
|
|
||||||
# Being notified means that we are in conditions (1) or
|
# Being notified means that we are in conditions (1) or
|
||||||
# (2), above. In neither case do we need to look at if
|
# (2), above. In neither case do we need to look at if
|
||||||
@@ -759,6 +768,7 @@ class MissedMessageWorker(QueueProcessingWorker):
|
|||||||
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
@sentry_sdk.trace
|
||||||
def maybe_send_batched_emails(self) -> None:
|
def maybe_send_batched_emails(self) -> None:
|
||||||
current_time = timezone_now()
|
current_time = timezone_now()
|
||||||
|
|
||||||
@@ -782,20 +792,25 @@ class MissedMessageWorker(QueueProcessingWorker):
|
|||||||
len(events),
|
len(events),
|
||||||
user_profile_id,
|
user_profile_id,
|
||||||
)
|
)
|
||||||
try:
|
with sentry_sdk.start_span(
|
||||||
# Because we process events in batches, an
|
description="sending missedmessage_emails to user"
|
||||||
# escaped exception here would lead to
|
) as span:
|
||||||
# duplicate messages being sent for other
|
span.set_data("user_profile_id", user_profile_id)
|
||||||
# users in the same events_to_process batch,
|
span.set_data("event_count", len(events))
|
||||||
# and no guarantee of forward progress.
|
try:
|
||||||
handle_missedmessage_emails(user_profile_id, events)
|
# Because we process events in batches, an
|
||||||
except Exception:
|
# escaped exception here would lead to
|
||||||
logging.exception(
|
# duplicate messages being sent for other
|
||||||
"Failed to process %d missedmessage_emails for user %s",
|
# users in the same events_to_process batch,
|
||||||
len(events),
|
# and no guarantee of forward progress.
|
||||||
user_profile_id,
|
handle_missedmessage_emails(user_profile_id, events)
|
||||||
stack_info=True,
|
except Exception:
|
||||||
)
|
logging.exception(
|
||||||
|
"Failed to process %d missedmessage_emails for user %s",
|
||||||
|
len(events),
|
||||||
|
user_profile_id,
|
||||||
|
stack_info=True,
|
||||||
|
)
|
||||||
|
|
||||||
events_to_process.delete()
|
events_to_process.delete()
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user