mirror of
				https://github.com/zulip/zulip.git
				synced 2025-11-04 05:53:43 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			278 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			278 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# Documented in https://zulip.readthedocs.io/en/latest/subsystems/queuing.html
 | 
						|
import logging
 | 
						|
import threading
 | 
						|
from collections import defaultdict
 | 
						|
from datetime import timedelta
 | 
						|
from typing import Any
 | 
						|
 | 
						|
import sentry_sdk
 | 
						|
from django.conf import settings
 | 
						|
from django.db import transaction
 | 
						|
from django.db.utils import IntegrityError
 | 
						|
from django.utils.timezone import now as timezone_now
 | 
						|
from typing_extensions import override
 | 
						|
 | 
						|
from zerver.lib.db_connections import reset_queries
 | 
						|
from zerver.lib.email_notifications import MissedMessageData, handle_missedmessage_emails
 | 
						|
from zerver.lib.per_request_cache import flush_per_request_caches
 | 
						|
from zerver.models import ScheduledMessageNotificationEmail
 | 
						|
from zerver.models.users import get_user_profile_by_id
 | 
						|
from zerver.worker.base import QueueProcessingWorker, assign_queue
 | 
						|
 | 
						|
logger = logging.getLogger(__name__)
 | 
						|
 | 
						|
 | 
						|
@assign_queue("missedmessage_emails")
 | 
						|
class MissedMessageWorker(QueueProcessingWorker):
 | 
						|
    # Aggregate all messages received over the last several seconds
 | 
						|
    # (configurable by each recipient) to let someone finish sending a
 | 
						|
    # batch of messages and/or editing them before they are sent out
 | 
						|
    # as emails to recipients.
 | 
						|
    #
 | 
						|
    # The batch interval is best-effort -- we poll at most every
 | 
						|
    # CHECK_FREQUENCY_SECONDS, to avoid excessive activity.
 | 
						|
    CHECK_FREQUENCY_SECONDS = 5
 | 
						|
 | 
						|
    worker_thread: threading.Thread | None = None
 | 
						|
 | 
						|
    # This condition variable mediates the stopping and has_timeout
 | 
						|
    # pieces of state, below it.
 | 
						|
    cv = threading.Condition()
 | 
						|
    stopping = False
 | 
						|
    has_timeout = False
 | 
						|
 | 
						|
    # The main thread, which handles the RabbitMQ connection and creates
 | 
						|
    # database rows from them.
 | 
						|
    @override
 | 
						|
    @sentry_sdk.trace
 | 
						|
    def consume(self, event: dict[str, Any]) -> None:
 | 
						|
        logging.debug("Processing missedmessage_emails event: %s", event)
 | 
						|
        # When we consume an event, check if there are existing pending emails
 | 
						|
        # for that user, and if so use the same scheduled timestamp.
 | 
						|
 | 
						|
        user_profile_id: int = event["user_profile_id"]
 | 
						|
        user_profile = get_user_profile_by_id(user_profile_id)
 | 
						|
        batch_duration_seconds = user_profile.email_notifications_batching_period_seconds
 | 
						|
        batch_duration = timedelta(seconds=batch_duration_seconds)
 | 
						|
 | 
						|
        try:
 | 
						|
            pending_email = ScheduledMessageNotificationEmail.objects.filter(
 | 
						|
                user_profile_id=user_profile_id
 | 
						|
            )[0]
 | 
						|
            scheduled_timestamp = pending_email.scheduled_timestamp
 | 
						|
        except IndexError:
 | 
						|
            scheduled_timestamp = timezone_now() + batch_duration
 | 
						|
 | 
						|
        with self.cv:
 | 
						|
            # We now hold the lock, so there are three places the
 | 
						|
            # worker thread can be:
 | 
						|
            #
 | 
						|
            #  1. In maybe_send_batched_emails, and will have to take
 | 
						|
            #     the lock (and thus block insertions of new rows
 | 
						|
            #     here) to decide if there are any rows and if it thus
 | 
						|
            #     needs a timeout.
 | 
						|
            #
 | 
						|
            #  2. In the cv.wait_for with a timeout because there were
 | 
						|
            #     rows already.  There's nothing for us to do, since
 | 
						|
            #     the newly-inserted row will get checked upon that
 | 
						|
            #     timeout.
 | 
						|
            #
 | 
						|
            #  3. In the cv.wait_for without a timeout, because there
 | 
						|
            #     weren't any rows (which we're about to change).
 | 
						|
            #
 | 
						|
            # Notifying in (1) is irrelevant, since the thread is not
 | 
						|
            # waiting.  If we over-notify by doing so for both (2) and
 | 
						|
            # (3), the behaviour is correct but slightly inefficient,
 | 
						|
            # as the thread will be needlessly awoken and will just
 | 
						|
            # re-wait.  However, if we fail to awake case (3), the
 | 
						|
            # worker thread will never wake up, and the
 | 
						|
            # ScheduledMessageNotificationEmail internal queue will
 | 
						|
            # back up.
 | 
						|
            #
 | 
						|
            # Use the self.has_timeout property (which is protected by
 | 
						|
            # the lock) to determine which of cases (2) or (3) we are
 | 
						|
            # in, and as such if we need to notify after making the
 | 
						|
            # row.
 | 
						|
            try:
 | 
						|
                ScheduledMessageNotificationEmail.objects.create(
 | 
						|
                    user_profile_id=user_profile_id,
 | 
						|
                    message_id=event["message_id"],
 | 
						|
                    trigger=event["trigger"],
 | 
						|
                    scheduled_timestamp=scheduled_timestamp,
 | 
						|
                    mentioned_user_group_id=event.get("mentioned_user_group_id"),
 | 
						|
                )
 | 
						|
                if not self.has_timeout:
 | 
						|
                    self.cv.notify()
 | 
						|
            except IntegrityError:
 | 
						|
                logging.debug(
 | 
						|
                    "ScheduledMessageNotificationEmail row could not be created. The message may have been deleted. Skipping event."
 | 
						|
                )
 | 
						|
 | 
						|
    @override
 | 
						|
    def start(self) -> None:
 | 
						|
        with self.cv:
 | 
						|
            self.stopping = False
 | 
						|
 | 
						|
        # Do nothing to process events on staging servers, since we do
 | 
						|
        # not support running multiple copies of this worker.
 | 
						|
        if settings.STAGING:
 | 
						|
            with self.cv:
 | 
						|
                self.cv.wait()
 | 
						|
            return
 | 
						|
 | 
						|
        self.worker_thread = threading.Thread(target=self.work)
 | 
						|
        self.worker_thread.start()
 | 
						|
        super().start()
 | 
						|
 | 
						|
    def work(self) -> None:
 | 
						|
        backoff = 1
 | 
						|
        while True:
 | 
						|
            with sentry_sdk.start_transaction(
 | 
						|
                op="task",
 | 
						|
                name=f"{self.queue_name} worker thread",
 | 
						|
                custom_sampling_context={"queue": self.queue_name},
 | 
						|
            ):
 | 
						|
                flush_per_request_caches()
 | 
						|
                reset_queries()
 | 
						|
                try:
 | 
						|
                    finished = self.background_loop()
 | 
						|
                    if finished:
 | 
						|
                        break
 | 
						|
                    # Success running the background loop; reset our backoff
 | 
						|
                    backoff = 1
 | 
						|
                except Exception:
 | 
						|
                    logging.exception(
 | 
						|
                        "Exception in MissedMessage background worker; restarting the loop",
 | 
						|
                        stack_info=True,
 | 
						|
                    )
 | 
						|
 | 
						|
                    # We want to sleep, with backoff, before retrying
 | 
						|
                    # the background loop; there may be
 | 
						|
                    # non-recoverable errors which cause immediate
 | 
						|
                    # exceptions, and we should avoid fast
 | 
						|
                    # crash-looping.  Instead of using time.sleep,
 | 
						|
                    # which would block this thread and delay attempts
 | 
						|
                    # to exit, we wait on the condition variable.
 | 
						|
                    # With has_timeout set, this will only be notified
 | 
						|
                    # by .stop(), below.
 | 
						|
                    #
 | 
						|
                    # Generally, delays in this background process are
 | 
						|
                    # acceptable, so long as they at least
 | 
						|
                    # occasionally retry.
 | 
						|
                    with self.cv:
 | 
						|
                        self.has_timeout = True
 | 
						|
                        self.cv.wait(timeout=backoff)
 | 
						|
                    backoff = min(30, backoff * 2)
 | 
						|
 | 
						|
    def background_loop(self) -> bool:
 | 
						|
        with self.cv:
 | 
						|
            if self.stopping:
 | 
						|
                return True
 | 
						|
            # There are three conditions which we wait for:
 | 
						|
            #
 | 
						|
            #  1. We are being explicitly asked to stop; see the
 | 
						|
            #     notify() call in stop()
 | 
						|
            #
 | 
						|
            #  2. We have no ScheduledMessageNotificationEmail
 | 
						|
            #     objects currently (has_timeout = False) and the
 | 
						|
            #     first one was just enqueued; see the notify()
 | 
						|
            #     call in consume().  We break out so that we can
 | 
						|
            #     come back around the loop and re-wait with a
 | 
						|
            #     timeout (see next condition).
 | 
						|
            #
 | 
						|
            #  3. One or more ScheduledMessageNotificationEmail
 | 
						|
            #     exist in the database, so we need to re-check
 | 
						|
            #     them regularly; this happens by hitting the
 | 
						|
            #     timeout and calling maybe_send_batched_emails().
 | 
						|
            #     There is no explicit notify() for this.
 | 
						|
            timeout: int | None = None
 | 
						|
            if ScheduledMessageNotificationEmail.objects.exists():
 | 
						|
                timeout = self.CHECK_FREQUENCY_SECONDS
 | 
						|
            self.has_timeout = timeout is not None
 | 
						|
 | 
						|
            def wait_condition() -> bool:
 | 
						|
                if self.stopping:
 | 
						|
                    # Condition (1)
 | 
						|
                    return True
 | 
						|
                if timeout is None:
 | 
						|
                    # Condition (2).  We went to sleep with no
 | 
						|
                    # ScheduledMessageNotificationEmail existing,
 | 
						|
                    # and one has just been made.  We re-check
 | 
						|
                    # that is still true now that we have the
 | 
						|
                    # lock, and if we see it, we stop waiting.
 | 
						|
                    return ScheduledMessageNotificationEmail.objects.exists()
 | 
						|
                # This should only happen at the start or end of
 | 
						|
                # the wait, when we haven't been notified, but are
 | 
						|
                # re-checking the condition.
 | 
						|
                return False
 | 
						|
 | 
						|
            with sentry_sdk.start_span(name="condvar wait") as span:
 | 
						|
                span.set_data("timeout", timeout)
 | 
						|
                was_notified = self.cv.wait_for(wait_condition, timeout=timeout)
 | 
						|
                span.set_data("was_notified", was_notified)
 | 
						|
 | 
						|
        # Being notified means that we are in conditions (1) or
 | 
						|
        # (2), above.  In neither case do we need to look at if
 | 
						|
        # there are batches to send -- (2) means that the
 | 
						|
        # ScheduledMessageNotificationEmail was _just_ created, so
 | 
						|
        # there is no need to check it now.
 | 
						|
        if not was_notified:
 | 
						|
            self.maybe_send_batched_emails()
 | 
						|
 | 
						|
        return False
 | 
						|
 | 
						|
    @sentry_sdk.trace
 | 
						|
    def maybe_send_batched_emails(self) -> None:
 | 
						|
        current_time = timezone_now()
 | 
						|
 | 
						|
        with transaction.atomic(durable=True):
 | 
						|
            events_to_process = ScheduledMessageNotificationEmail.objects.filter(
 | 
						|
                scheduled_timestamp__lte=current_time
 | 
						|
            ).select_for_update()
 | 
						|
 | 
						|
            # Batch the entries by user
 | 
						|
            events_by_recipient: dict[int, dict[int, MissedMessageData]] = defaultdict(dict)
 | 
						|
            for event in events_to_process:
 | 
						|
                events_by_recipient[event.user_profile_id][event.message_id] = MissedMessageData(
 | 
						|
                    trigger=event.trigger, mentioned_user_group_id=event.mentioned_user_group_id
 | 
						|
                )
 | 
						|
 | 
						|
            for user_profile_id, events in events_by_recipient.items():
 | 
						|
                logging.info(
 | 
						|
                    "Batch-processing %s missedmessage_emails events for user %s",
 | 
						|
                    len(events),
 | 
						|
                    user_profile_id,
 | 
						|
                )
 | 
						|
                with sentry_sdk.start_span(name="sending missedmessage_emails to user") as span:
 | 
						|
                    span.set_data("user_profile_id", user_profile_id)
 | 
						|
                    span.set_data("event_count", len(events))
 | 
						|
                    try:
 | 
						|
                        # Because we process events in batches, an
 | 
						|
                        # escaped exception here would lead to
 | 
						|
                        # duplicate messages being sent for other
 | 
						|
                        # users in the same events_to_process batch,
 | 
						|
                        # and no guarantee of forward progress.
 | 
						|
                        handle_missedmessage_emails(user_profile_id, events)
 | 
						|
                    except Exception:
 | 
						|
                        logging.exception(
 | 
						|
                            "Failed to process %d missedmessage_emails for user %s",
 | 
						|
                            len(events),
 | 
						|
                            user_profile_id,
 | 
						|
                            stack_info=True,
 | 
						|
                        )
 | 
						|
 | 
						|
            events_to_process.delete()
 | 
						|
 | 
						|
    @override
 | 
						|
    def stop(self) -> None:
 | 
						|
        with self.cv:
 | 
						|
            self.stopping = True
 | 
						|
            self.cv.notify()
 | 
						|
        if self.worker_thread is not None:
 | 
						|
            self.worker_thread.join()
 | 
						|
 | 
						|
        if settings.STAGING:
 | 
						|
            return
 | 
						|
 | 
						|
        super().stop()
 |