mirror of
				https://github.com/zulip/zulip.git
				synced 2025-11-03 21:43:21 +00:00 
			
		
		
		
	Otherwise we get spammed with “Dropped breadcrumb because no client bound” log messages. Signed-off-by: Anders Kaseorg <anders@zulip.com>
		
			
				
	
	
		
			329 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			329 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# Documented in https://zulip.readthedocs.io/en/latest/subsystems/queuing.html
 | 
						|
import logging
 | 
						|
import os
 | 
						|
import signal
 | 
						|
import time
 | 
						|
from abc import ABC, abstractmethod
 | 
						|
from collections import deque
 | 
						|
from collections.abc import Callable, MutableSequence
 | 
						|
from types import FrameType
 | 
						|
from typing import Any, TypeVar
 | 
						|
 | 
						|
import orjson
 | 
						|
import sentry_sdk
 | 
						|
from django.conf import settings
 | 
						|
from django.db import connection
 | 
						|
from typing_extensions import override
 | 
						|
 | 
						|
from zerver.lib.context_managers import lockfile
 | 
						|
from zerver.lib.db_connections import reset_queries
 | 
						|
from zerver.lib.partial import partial
 | 
						|
from zerver.lib.per_request_cache import flush_per_request_caches
 | 
						|
from zerver.lib.pysa import mark_sanitized
 | 
						|
from zerver.lib.queue import SimpleQueueClient
 | 
						|
 | 
						|
logger = logging.getLogger(__name__)
 | 
						|
 | 
						|
 | 
						|
class WorkerTimeoutError(Exception):
 | 
						|
    def __init__(self, queue_name: str, limit: int, event_count: int) -> None:
 | 
						|
        self.queue_name = queue_name
 | 
						|
        self.limit = limit
 | 
						|
        self.event_count = event_count
 | 
						|
 | 
						|
    @override
 | 
						|
    def __str__(self) -> str:
 | 
						|
        return f"Timed out in {self.queue_name} after {self.limit * self.event_count} seconds processing {self.event_count} events"
 | 
						|
 | 
						|
 | 
						|
class InterruptConsumeError(Exception):
 | 
						|
    """
 | 
						|
    This exception is to be thrown inside event consume function
 | 
						|
    if the intention is to simply interrupt the processing
 | 
						|
    of the current event and normally continue the work of the queue.
 | 
						|
    """
 | 
						|
 | 
						|
 | 
						|
class WorkerDeclarationError(Exception):
 | 
						|
    pass
 | 
						|
 | 
						|
 | 
						|
ConcreteQueueWorker = TypeVar("ConcreteQueueWorker", bound="QueueProcessingWorker")
 | 
						|
 | 
						|
 | 
						|
def assign_queue(
 | 
						|
    queue_name: str,
 | 
						|
    enabled: bool = True,
 | 
						|
    is_test_queue: bool = False,
 | 
						|
) -> Callable[[type[ConcreteQueueWorker]], type[ConcreteQueueWorker]]:
 | 
						|
    def decorate(clazz: type[ConcreteQueueWorker]) -> type[ConcreteQueueWorker]:
 | 
						|
        clazz.queue_name = queue_name
 | 
						|
        if enabled:
 | 
						|
            register_worker(queue_name, clazz, is_test_queue)
 | 
						|
        return clazz
 | 
						|
 | 
						|
    return decorate
 | 
						|
 | 
						|
 | 
						|
worker_classes: dict[str, type["QueueProcessingWorker"]] = {}
 | 
						|
test_queues: set[str] = set()
 | 
						|
 | 
						|
 | 
						|
def register_worker(
 | 
						|
    queue_name: str, clazz: type["QueueProcessingWorker"], is_test_queue: bool = False
 | 
						|
) -> None:
 | 
						|
    worker_classes[queue_name] = clazz
 | 
						|
    if is_test_queue:
 | 
						|
        test_queues.add(queue_name)
 | 
						|
 | 
						|
 | 
						|
def check_and_send_restart_signal() -> None:
 | 
						|
    try:
 | 
						|
        if not connection.is_usable():
 | 
						|
            logging.warning("*** Sending self SIGUSR1 to trigger a restart.")
 | 
						|
            os.kill(os.getpid(), signal.SIGUSR1)
 | 
						|
    except Exception:
 | 
						|
        pass
 | 
						|
 | 
						|
 | 
						|
class QueueProcessingWorker(ABC):
 | 
						|
    queue_name: str
 | 
						|
    MAX_CONSUME_SECONDS: int | None = 30
 | 
						|
    CONSUME_ITERATIONS_BEFORE_UPDATE_STATS_NUM = 50
 | 
						|
    MAX_SECONDS_BEFORE_UPDATE_STATS = 30
 | 
						|
 | 
						|
    # How many un-acknowledged events the worker should have on hand,
 | 
						|
    # fetched from the rabbitmq server.  Larger values may be more
 | 
						|
    # performant, but if queues are large, cause more network IO at
 | 
						|
    # startup and steady-state memory.
 | 
						|
    PREFETCH = 100
 | 
						|
 | 
						|
    def __init__(
 | 
						|
        self,
 | 
						|
        threaded: bool = False,
 | 
						|
        disable_timeout: bool = False,
 | 
						|
        worker_num: int | None = None,
 | 
						|
    ) -> None:
 | 
						|
        self.q: SimpleQueueClient | None = None
 | 
						|
        self.threaded = threaded
 | 
						|
        self.disable_timeout = disable_timeout
 | 
						|
        self.worker_num = worker_num
 | 
						|
        if not hasattr(self, "queue_name"):
 | 
						|
            raise WorkerDeclarationError("Queue worker declared without queue_name")
 | 
						|
 | 
						|
        self.initialize_statistics()
 | 
						|
 | 
						|
    def initialize_statistics(self) -> None:
 | 
						|
        self.queue_last_emptied_timestamp = time.time()
 | 
						|
        self.consumed_since_last_emptied = 0
 | 
						|
        self.recent_consume_times: MutableSequence[tuple[int, float]] = deque(maxlen=50)
 | 
						|
        self.consume_iteration_counter = 0
 | 
						|
        self.idle = True
 | 
						|
        self.last_statistics_update_time = 0.0
 | 
						|
 | 
						|
        self.update_statistics()
 | 
						|
 | 
						|
    @sentry_sdk.trace
 | 
						|
    def update_statistics(self) -> None:
 | 
						|
        total_seconds = sum(seconds for _, seconds in self.recent_consume_times)
 | 
						|
        total_events = sum(events_number for events_number, _ in self.recent_consume_times)
 | 
						|
        if total_events == 0:
 | 
						|
            recent_average_consume_time = None
 | 
						|
        else:
 | 
						|
            recent_average_consume_time = total_seconds / total_events
 | 
						|
        stats_dict = dict(
 | 
						|
            update_time=time.time(),
 | 
						|
            recent_average_consume_time=recent_average_consume_time,
 | 
						|
            queue_last_emptied_timestamp=self.queue_last_emptied_timestamp,
 | 
						|
            consumed_since_last_emptied=self.consumed_since_last_emptied,
 | 
						|
        )
 | 
						|
 | 
						|
        os.makedirs(settings.QUEUE_STATS_DIR, exist_ok=True)
 | 
						|
 | 
						|
        fname = f"{self.queue_name}.stats"
 | 
						|
        fn = os.path.join(settings.QUEUE_STATS_DIR, fname)
 | 
						|
        with lockfile(fn + ".lock"):
 | 
						|
            tmp_fn = fn + ".tmp"
 | 
						|
            with open(tmp_fn, "wb") as f:
 | 
						|
                f.write(
 | 
						|
                    orjson.dumps(stats_dict, option=orjson.OPT_APPEND_NEWLINE | orjson.OPT_INDENT_2)
 | 
						|
                )
 | 
						|
            os.rename(tmp_fn, fn)
 | 
						|
        self.last_statistics_update_time = time.time()
 | 
						|
 | 
						|
    def get_remaining_local_queue_size(self) -> int:
 | 
						|
        if self.q is not None:
 | 
						|
            return self.q.local_queue_size()
 | 
						|
        else:
 | 
						|
            # This is a special case that will happen if we're operating without
 | 
						|
            # using RabbitMQ (e.g. in tests). In that case there's no queuing to speak of
 | 
						|
            # and the only reasonable size to return is 0.
 | 
						|
            return 0
 | 
						|
 | 
						|
    @abstractmethod
 | 
						|
    def consume(self, data: dict[str, Any]) -> None:
 | 
						|
        pass
 | 
						|
 | 
						|
    def do_consume(
 | 
						|
        self, consume_func: Callable[[list[dict[str, Any]]], None], events: list[dict[str, Any]]
 | 
						|
    ) -> None:
 | 
						|
        consume_time_seconds: float | None = None
 | 
						|
        with sentry_sdk.start_transaction(
 | 
						|
            op="task",
 | 
						|
            name=f"consume {self.queue_name}",
 | 
						|
            custom_sampling_context={"queue": self.queue_name},
 | 
						|
        ):
 | 
						|
            if sentry_sdk.is_initialized():
 | 
						|
                sentry_sdk.add_breadcrumb(
 | 
						|
                    type="debug",
 | 
						|
                    category="queue_processor",
 | 
						|
                    message=f"Consuming {self.queue_name}",
 | 
						|
                    data={
 | 
						|
                        "events": events,
 | 
						|
                        "local_queue_size": self.get_remaining_local_queue_size(),
 | 
						|
                    },
 | 
						|
                )
 | 
						|
            try:
 | 
						|
                if self.idle:
 | 
						|
                    # We're reactivating after having gone idle due to emptying the queue.
 | 
						|
                    # We should update the stats file to keep it fresh and to make it clear
 | 
						|
                    # that the queue started processing, in case the event we're about to process
 | 
						|
                    # makes us freeze.
 | 
						|
                    self.idle = False
 | 
						|
                    self.update_statistics()
 | 
						|
 | 
						|
                time_start = time.time()
 | 
						|
                if self.MAX_CONSUME_SECONDS and not self.threaded and not self.disable_timeout:
 | 
						|
                    try:
 | 
						|
                        signal.signal(
 | 
						|
                            signal.SIGALRM,
 | 
						|
                            partial(self.timer_expired, self.MAX_CONSUME_SECONDS, events),
 | 
						|
                        )
 | 
						|
                        try:
 | 
						|
                            signal.alarm(self.MAX_CONSUME_SECONDS * len(events))
 | 
						|
                            consume_func(events)
 | 
						|
                        finally:
 | 
						|
                            signal.alarm(0)
 | 
						|
                    finally:
 | 
						|
                        signal.signal(signal.SIGALRM, signal.SIG_DFL)
 | 
						|
                else:
 | 
						|
                    consume_func(events)
 | 
						|
                consume_time_seconds = time.time() - time_start
 | 
						|
                self.consumed_since_last_emptied += len(events)
 | 
						|
            except Exception as e:
 | 
						|
                self._handle_consume_exception(events, e)
 | 
						|
            finally:
 | 
						|
                flush_per_request_caches()
 | 
						|
                reset_queries()
 | 
						|
 | 
						|
                with sentry_sdk.start_span(name="statistics"):
 | 
						|
                    if consume_time_seconds is not None:
 | 
						|
                        self.recent_consume_times.append((len(events), consume_time_seconds))
 | 
						|
 | 
						|
                    remaining_local_queue_size = self.get_remaining_local_queue_size()
 | 
						|
                    if remaining_local_queue_size == 0:
 | 
						|
                        self.queue_last_emptied_timestamp = time.time()
 | 
						|
                        self.consumed_since_last_emptied = 0
 | 
						|
                        # We've cleared all the events from the queue, so we don't
 | 
						|
                        # need to worry about the small overhead of doing a disk write.
 | 
						|
                        # We take advantage of this to update the stats file to keep it fresh,
 | 
						|
                        # especially since the queue might go idle until new events come in.
 | 
						|
                        self.update_statistics()
 | 
						|
                        self.idle = True
 | 
						|
                    else:
 | 
						|
                        self.consume_iteration_counter += 1
 | 
						|
                        if (
 | 
						|
                            self.consume_iteration_counter
 | 
						|
                            >= self.CONSUME_ITERATIONS_BEFORE_UPDATE_STATS_NUM
 | 
						|
                            or time.time() - self.last_statistics_update_time
 | 
						|
                            >= self.MAX_SECONDS_BEFORE_UPDATE_STATS
 | 
						|
                        ):
 | 
						|
                            self.consume_iteration_counter = 0
 | 
						|
                            self.update_statistics()
 | 
						|
 | 
						|
    def consume_single_event(self, event: dict[str, Any]) -> None:
 | 
						|
        consume_func = lambda events: self.consume(events[0])
 | 
						|
        self.do_consume(consume_func, [event])
 | 
						|
 | 
						|
    def timer_expired(
 | 
						|
        self, limit: int, events: list[dict[str, Any]], signal: int, frame: FrameType | None
 | 
						|
    ) -> None:
 | 
						|
        raise WorkerTimeoutError(self.queue_name, limit, len(events))
 | 
						|
 | 
						|
    def _handle_consume_exception(self, events: list[dict[str, Any]], exception: Exception) -> None:
 | 
						|
        if isinstance(exception, InterruptConsumeError):
 | 
						|
            # The exception signals that no further error handling
 | 
						|
            # is needed and the worker can proceed.
 | 
						|
            return
 | 
						|
 | 
						|
        with sentry_sdk.new_scope() as scope:
 | 
						|
            scope.set_context(
 | 
						|
                "events",
 | 
						|
                {
 | 
						|
                    "data": events,
 | 
						|
                    "queue_name": self.queue_name,
 | 
						|
                },
 | 
						|
            )
 | 
						|
            if isinstance(exception, WorkerTimeoutError):
 | 
						|
                scope.fingerprint = ["worker-timeout", self.queue_name]
 | 
						|
                logging.exception(exception, stack_info=True)
 | 
						|
            else:
 | 
						|
                logging.exception(
 | 
						|
                    "Problem handling data on queue %s", self.queue_name, stack_info=True
 | 
						|
                )
 | 
						|
        if not os.path.exists(settings.QUEUE_ERROR_DIR):
 | 
						|
            os.mkdir(settings.QUEUE_ERROR_DIR)  # nocoverage
 | 
						|
        # Use 'mark_sanitized' to prevent Pysa from detecting this false positive
 | 
						|
        # flow. 'queue_name' is always a constant string.
 | 
						|
        fname = mark_sanitized(f"{self.queue_name}.errors")
 | 
						|
        fn = os.path.join(settings.QUEUE_ERROR_DIR, fname)
 | 
						|
        line = f"{time.asctime()}\t{orjson.dumps(events).decode()}\n"
 | 
						|
        lock_fn = fn + ".lock"
 | 
						|
        with lockfile(lock_fn), open(fn, "a") as f:
 | 
						|
            f.write(line)
 | 
						|
        check_and_send_restart_signal()
 | 
						|
 | 
						|
    def setup(self) -> None:
 | 
						|
        self.q = SimpleQueueClient(prefetch=self.PREFETCH)
 | 
						|
 | 
						|
    def start(self) -> None:
 | 
						|
        assert self.q is not None
 | 
						|
        self.initialize_statistics()
 | 
						|
        self.q.start_json_consumer(
 | 
						|
            self.queue_name,
 | 
						|
            lambda events: self.consume_single_event(events[0]),
 | 
						|
        )
 | 
						|
 | 
						|
    def stop(self) -> None:  # nocoverage
 | 
						|
        assert self.q is not None
 | 
						|
        self.q.stop_consuming()
 | 
						|
 | 
						|
 | 
						|
class LoopQueueProcessingWorker(QueueProcessingWorker):
 | 
						|
    sleep_delay = 1
 | 
						|
    batch_size = 100
 | 
						|
 | 
						|
    @override
 | 
						|
    def setup(self) -> None:
 | 
						|
        self.q = SimpleQueueClient(prefetch=max(self.PREFETCH, self.batch_size))
 | 
						|
 | 
						|
    @override
 | 
						|
    def start(self) -> None:  # nocoverage
 | 
						|
        assert self.q is not None
 | 
						|
        self.initialize_statistics()
 | 
						|
        self.q.start_json_consumer(
 | 
						|
            self.queue_name,
 | 
						|
            lambda events: self.do_consume(self.consume_batch, events),
 | 
						|
            batch_size=self.batch_size,
 | 
						|
            timeout=self.sleep_delay,
 | 
						|
        )
 | 
						|
 | 
						|
    @abstractmethod
 | 
						|
    def consume_batch(self, events: list[dict[str, Any]]) -> None:
 | 
						|
        pass
 | 
						|
 | 
						|
    @override
 | 
						|
    def consume(self, event: dict[str, Any]) -> None:
 | 
						|
        """In LoopQueueProcessingWorker, consume is used just for automated tests"""
 | 
						|
        self.consume_batch([event])
 |