sentry: Provide more metadata in queue processors.

This allows aggregation by queue, makes the event data more readily
accessible, and clears out the breadcrumbs upon every batch that is
serviced.
This commit is contained in:
Alex Vandiver
2020-09-18 14:13:13 -07:00
committed by Tim Abbott
parent 9c0d6becc5
commit de1db2c838
2 changed files with 32 additions and 15 deletions

View File

@@ -9,6 +9,7 @@ from typing import Any, List
from django.conf import settings from django.conf import settings
from django.core.management.base import BaseCommand, CommandError from django.core.management.base import BaseCommand, CommandError
from django.utils import autoreload from django.utils import autoreload
from sentry_sdk import configure_scope
from zerver.worker.queue_processors import get_active_worker_queues, get_worker from zerver.worker.queue_processors import get_active_worker_queues, get_worker
@@ -70,18 +71,21 @@ class Command(BaseCommand):
queue_name = options['queue_name'] queue_name = options['queue_name']
worker_num = options['worker_num'] worker_num = options['worker_num']
logger.info("Worker %d connecting to queue %s", worker_num, queue_name)
worker = get_worker(queue_name)
worker.setup()
def signal_handler(signal: int, frame: FrameType) -> None: def signal_handler(signal: int, frame: FrameType) -> None:
logger.info("Worker %d disconnecting from queue %s", worker_num, queue_name) logger.info("Worker %d disconnecting from queue %s", worker_num, queue_name)
worker.stop() worker.stop()
sys.exit(0) sys.exit(0)
logger.info("Worker %d connecting to queue %s", worker_num, queue_name)
worker = get_worker(queue_name)
with configure_scope() as scope:
scope.set_tag("queue_worker", queue_name)
scope.set_tag("worker_num", worker_num)
worker.setup()
signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGUSR1, signal_handler) signal.signal(signal.SIGUSR1, signal_handler)
worker.start() worker.start()
class Threaded_worker(threading.Thread): class Threaded_worker(threading.Thread):
@@ -90,6 +94,8 @@ class Threaded_worker(threading.Thread):
self.worker = get_worker(queue_name) self.worker = get_worker(queue_name)
def run(self) -> None: def run(self) -> None:
with configure_scope() as scope:
scope.set_tag("queue_worker", self.worker.queue_name)
self.worker.setup() self.worker.setup()
logging.debug('starting consuming ' + self.worker.queue_name) logging.debug('starting consuming ' + self.worker.queue_name)
self.worker.start() self.worker.start()

View File

@@ -39,6 +39,7 @@ from django.db.models import F
from django.utils.timezone import now as timezone_now from django.utils.timezone import now as timezone_now
from django.utils.translation import override as override_language from django.utils.translation import override as override_language
from django.utils.translation import ugettext as _ from django.utils.translation import ugettext as _
from sentry_sdk import add_breadcrumb, configure_scope
from zulip_bots.lib import ExternalBotHandler, extract_query_without_mention from zulip_bots.lib import ExternalBotHandler, extract_query_without_mention
from zerver.context_processors import common_context from zerver.context_processors import common_context
@@ -223,6 +224,14 @@ class QueueProcessingWorker(ABC):
def do_consume(self, consume_func: Callable[[List[Dict[str, Any]]], None], def do_consume(self, consume_func: Callable[[List[Dict[str, Any]]], None],
events: List[Dict[str, Any]]) -> None: events: List[Dict[str, Any]]) -> None:
consume_time_seconds: Optional[float] = None consume_time_seconds: Optional[float] = None
with configure_scope() as scope:
scope.clear_breadcrumbs()
add_breadcrumb(
type='debug',
category='queue_processor',
message=f"Consuming {self.queue_name}",
data={"events": events, "queue_size": self.get_remaining_queue_size()},
)
try: try:
time_start = time.time() time_start = time.time()
consume_func(events) consume_func(events)
@@ -253,7 +262,12 @@ class QueueProcessingWorker(ABC):
self.do_consume(consume_func, [data]) self.do_consume(consume_func, [data])
def _handle_consume_exception(self, events: List[Dict[str, Any]]) -> None: def _handle_consume_exception(self, events: List[Dict[str, Any]]) -> None:
self._log_problem() with configure_scope() as scope:
scope.set_context("events", {
"data": events,
"queue_name": self.queue_name,
})
logging.exception("Problem handling data on queue %s", self.queue_name, stack_info=True)
if not os.path.exists(settings.QUEUE_ERROR_DIR): if not os.path.exists(settings.QUEUE_ERROR_DIR):
os.mkdir(settings.QUEUE_ERROR_DIR) # nocoverage os.mkdir(settings.QUEUE_ERROR_DIR) # nocoverage
# Use 'mark_sanitized' to prevent Pysa from detecting this false positive # Use 'mark_sanitized' to prevent Pysa from detecting this false positive
@@ -267,9 +281,6 @@ class QueueProcessingWorker(ABC):
f.write(line.encode('utf-8')) f.write(line.encode('utf-8'))
check_and_send_restart_signal() check_and_send_restart_signal()
def _log_problem(self) -> None:
logging.exception("Problem handling data on queue %s", self.queue_name, stack_info=True)
def setup(self) -> None: def setup(self) -> None:
self.q = SimpleQueueClient() self.q = SimpleQueueClient()