mirror of
				https://github.com/zulip/zulip.git
				synced 2025-11-03 21:43:21 +00:00 
			
		
		
		
	https://docs.sentry.io/platforms/python/migration/1.x-to-2.x#scope-configuring https://github.com/getsentry/sentry-python/releases/2.0.0 https://github.com/getsentry/sentry-python/releases/2.15.0 Signed-off-by: Anders Kaseorg <anders@zulip.com>
		
			
				
	
	
		
			135 lines
		
	
	
		
			5.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			135 lines
		
	
	
		
			5.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import logging
 | 
						|
import os
 | 
						|
import signal
 | 
						|
import sys
 | 
						|
import threading
 | 
						|
from argparse import ArgumentParser
 | 
						|
from collections.abc import Iterator
 | 
						|
from contextlib import contextmanager
 | 
						|
from types import FrameType
 | 
						|
from typing import Any
 | 
						|
 | 
						|
import sentry_sdk
 | 
						|
from django.conf import settings
 | 
						|
from django.core.management.base import CommandError
 | 
						|
from django.utils import autoreload
 | 
						|
from typing_extensions import override
 | 
						|
 | 
						|
from zerver.lib.management import ZulipBaseCommand
 | 
						|
from zerver.worker.queue_processors import get_active_worker_queues, get_worker
 | 
						|
 | 
						|
 | 
						|
@contextmanager
 | 
						|
def log_and_exit_if_exception(
 | 
						|
    logger: logging.Logger, queue_name: str, threaded: bool
 | 
						|
) -> Iterator[None]:
 | 
						|
    try:
 | 
						|
        yield
 | 
						|
    except Exception:
 | 
						|
        logger.exception("Unhandled exception from queue: %s", queue_name, stack_info=True)
 | 
						|
        if threaded:
 | 
						|
            # Sending SIGUSR1 is the right way to exit - triggering the
 | 
						|
            # exit_with_three signal handler, causing exit and reload.
 | 
						|
            os.kill(os.getpid(), signal.SIGUSR1)
 | 
						|
        else:
 | 
						|
            sys.exit(1)
 | 
						|
 | 
						|
 | 
						|
class Command(ZulipBaseCommand):
 | 
						|
    @override
 | 
						|
    def add_arguments(self, parser: ArgumentParser) -> None:
 | 
						|
        parser.add_argument("--queue_name", metavar="<queue name>", help="queue to process")
 | 
						|
        parser.add_argument(
 | 
						|
            "--worker_num", metavar="<worker number>", type=int, default=0, help="worker label"
 | 
						|
        )
 | 
						|
        parser.add_argument("--all", action="store_true", help="run all queues")
 | 
						|
        parser.add_argument(
 | 
						|
            "--multi_threaded",
 | 
						|
            nargs="+",
 | 
						|
            metavar="<list of queue name>",
 | 
						|
            required=False,
 | 
						|
            help="list of queue to process",
 | 
						|
        )
 | 
						|
 | 
						|
    help = "Runs a queue processing worker"
 | 
						|
 | 
						|
    @override
 | 
						|
    def handle(self, *args: Any, **options: Any) -> None:
 | 
						|
        logging.basicConfig()
 | 
						|
        logger = logging.getLogger("process_queue")
 | 
						|
 | 
						|
        def exit_with_three(signal: int, frame: FrameType | None) -> None:
 | 
						|
            """
 | 
						|
            This process is watched by Django's autoreload, so exiting
 | 
						|
            with status code 3 will cause this process to restart.
 | 
						|
            """
 | 
						|
            logger.warning("SIGUSR1 received. Restarting this queue processor.")
 | 
						|
            sys.exit(3)
 | 
						|
 | 
						|
        if not settings.USING_RABBITMQ:
 | 
						|
            # Make the warning silent when running the tests
 | 
						|
            if settings.TEST_SUITE:
 | 
						|
                logger.info("Not using RabbitMQ queue workers in the test suite.")
 | 
						|
            else:
 | 
						|
                logger.error("Cannot run a queue processor when USING_RABBITMQ is False!")
 | 
						|
            raise CommandError
 | 
						|
 | 
						|
        def run_threaded_workers(queues: list[str], logger: logging.Logger) -> None:
 | 
						|
            cnt = 0
 | 
						|
            for queue_name in queues:
 | 
						|
                if not settings.DEVELOPMENT:
 | 
						|
                    logger.info("launching queue worker thread %s", queue_name)
 | 
						|
                cnt += 1
 | 
						|
                td = ThreadedWorker(queue_name, logger)
 | 
						|
                td.start()
 | 
						|
            assert len(queues) == cnt
 | 
						|
            logger.info("%d queue worker threads were launched", cnt)
 | 
						|
 | 
						|
        if options["all"]:
 | 
						|
            signal.signal(signal.SIGUSR1, exit_with_three)
 | 
						|
            autoreload.run_with_reloader(run_threaded_workers, get_active_worker_queues(), logger)
 | 
						|
        elif options["multi_threaded"]:
 | 
						|
            signal.signal(signal.SIGUSR1, exit_with_three)
 | 
						|
            queues = options["multi_threaded"]
 | 
						|
            autoreload.run_with_reloader(run_threaded_workers, queues, logger)
 | 
						|
        else:
 | 
						|
            queue_name = options["queue_name"]
 | 
						|
            worker_num = options["worker_num"]
 | 
						|
 | 
						|
            def signal_handler(signal: int, frame: FrameType | None) -> None:
 | 
						|
                logger.info("Worker %d disconnecting from queue %s", worker_num, queue_name)
 | 
						|
                worker.stop()
 | 
						|
                sys.exit(0)
 | 
						|
 | 
						|
            logger.info("Worker %d connecting to queue %s", worker_num, queue_name)
 | 
						|
            with log_and_exit_if_exception(logger, queue_name, threaded=False):
 | 
						|
                worker = get_worker(queue_name, worker_num=worker_num)
 | 
						|
                with sentry_sdk.isolation_scope() as scope:
 | 
						|
                    scope.set_tag("queue_worker", queue_name)
 | 
						|
                    scope.set_tag("worker_num", worker_num)
 | 
						|
 | 
						|
                    worker.setup()
 | 
						|
                    signal.signal(signal.SIGTERM, signal_handler)
 | 
						|
                    signal.signal(signal.SIGINT, signal_handler)
 | 
						|
                    signal.signal(signal.SIGUSR1, signal_handler)
 | 
						|
                    worker.start()
 | 
						|
 | 
						|
 | 
						|
class ThreadedWorker(threading.Thread):
 | 
						|
    def __init__(self, queue_name: str, logger: logging.Logger) -> None:
 | 
						|
        threading.Thread.__init__(self)
 | 
						|
        self.logger = logger
 | 
						|
        self.queue_name = queue_name
 | 
						|
 | 
						|
    @override
 | 
						|
    def run(self) -> None:
 | 
						|
        with (
 | 
						|
            sentry_sdk.isolation_scope() as scope,
 | 
						|
            log_and_exit_if_exception(self.logger, self.queue_name, threaded=True),
 | 
						|
        ):
 | 
						|
            scope.set_tag("queue_worker", self.queue_name)
 | 
						|
            worker = get_worker(self.queue_name, threaded=True)
 | 
						|
            worker.setup()
 | 
						|
            logging.debug("starting consuming %s", self.queue_name)
 | 
						|
            worker.start()
 |