tornado: Reuse retry_event functions for failures in tornado queues.

We use retry_event in queue_processors.py to handle trying on failures,
without getting stuck in permanent retry loops if the event ends up
leading to failure on every attempt and we just keep sending NACK to
rabbitmq forever (or until the channel crashes). Tornado queues haven't
been using this, but they should.
This commit is contained in:
Mateusz Mandera
2020-04-09 18:31:04 +02:00
committed by Tim Abbott
parent 1939f42af1
commit 4283a513d4
2 changed files with 20 additions and 4 deletions

View File

@@ -23,7 +23,7 @@ from zerver.tornado.application import create_tornado_application, \
setup_tornado_rabbitmq
from zerver.tornado.autoreload import start as zulip_autoreload_start
from zerver.tornado.event_queue import add_client_gc_hook, \
missedmessage_hook, process_notification, setup_event_queue
missedmessage_hook, get_wrapped_process_notification, setup_event_queue
from zerver.tornado.sharding import notify_tornado_queue_name
if settings.USING_RABBITMQ:
@@ -89,8 +89,9 @@ class Command(BaseCommand):
if settings.USING_RABBITMQ:
queue_client = get_queue_client()
# Process notifications received via RabbitMQ
queue_client.register_json_consumer(notify_tornado_queue_name(int(port)),
process_notification)
queue_name = notify_tornado_queue_name(int(port))
queue_client.register_json_consumer(queue_name,
get_wrapped_process_notification(queue_name))
try:
# Application is an instance of Django's standard wsgi handler.

View File

@@ -15,6 +15,7 @@ import requests
import atexit
import sys
import signal
import traceback
import tornado.ioloop
import random
from zerver.models import UserProfile, Client, Realm
@@ -25,7 +26,7 @@ from zerver.lib.utils import statsd
from zerver.middleware import async_request_timer_restart
from zerver.lib.message import MessageDict
from zerver.lib.narrow import build_narrow_filter
from zerver.lib.queue import queue_json_publish
from zerver.lib.queue import queue_json_publish, retry_event
from zerver.lib.request import JsonableError
from zerver.tornado.descriptors import clear_descriptor_by_handler_id, set_descriptor_by_handler_id
from zerver.tornado.exceptions import BadEventQueueIdError
@@ -1084,6 +1085,20 @@ def process_notification(notice: Mapping[str, Any]) -> None:
logging.debug("Tornado: Event %s for %s users took %sms" % (
event['type'], len(users), int(1000 * (time.time() - start_time))))
def get_wrapped_process_notification(queue_name: str) -> Callable[[Dict[str, Any]], None]:
def failure_processor(notice: Dict[str, Any]) -> None:
logging.error(
"Maximum retries exceeded for Tornado notice:%s\nStack trace:\n%s\n" % (
notice, traceback.format_exc()))
def wrapped_process_notification(notice: Dict[str, Any]) -> None:
try:
process_notification(notice)
except Exception:
retry_event(queue_name, notice, failure_processor)
return wrapped_process_notification
# Runs in the Django process to send a notification to Tornado.
#
# We use JSON rather than bare form parameters, so that we can represent