diff --git a/zerver/management/commands/process_queue.py b/zerver/management/commands/process_queue.py index 23dd24fece..3ee3f7145d 100644 --- a/zerver/management/commands/process_queue.py +++ b/zerver/management/commands/process_queue.py @@ -9,10 +9,12 @@ from django.core.management import CommandError from django.conf import settings from django.utils import autoreload from zerver.worker.queue_processors import get_worker, get_active_worker_queues +import os import sys import signal import logging import threading +import subprocess class Command(BaseCommand): def add_arguments(self, parser): @@ -35,6 +37,15 @@ class Command(BaseCommand): logging.basicConfig() logger = logging.getLogger('process_queue') + def exit_with_three(signal, frame): + # type: (int, FrameType) -> None + """ + This process is watched by Django's autoreload, so exiting + with status code 3 will cause this process to restart. + """ + logger.warn("SIGUSR1 received. Restarting this queue processor.") + sys.exit(3) + if not settings.USING_RABBITMQ: # Make the warning silent when running the tests if settings.TEST_SUITE: @@ -56,8 +67,10 @@ class Command(BaseCommand): logger.info('%d queue worker threads were launched' % (cnt,)) if options['all']: + signal.signal(signal.SIGUSR1, exit_with_three) autoreload.main(run_threaded_workers, (get_active_worker_queues(), logger)) elif options['multi_threaded']: + signal.signal(signal.SIGUSR1, exit_with_three) queues = options['multi_threaded'] autoreload.main(run_threaded_workers, (queues, logger)) else: @@ -75,6 +88,7 @@ class Command(BaseCommand): sys.exit(0) signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGUSR1, signal_handler) worker.start() diff --git a/zerver/worker/queue_processors.py b/zerver/worker/queue_processors.py index 2476760b41..09cc18136c 100644 --- a/zerver/worker/queue_processors.py +++ b/zerver/worker/queue_processors.py @@ -2,12 +2,14 @@ from __future__ import absolute_import from typing import Any, Callable, Dict, List, Mapping, Optional, cast +import signal import sys import os sys.path.insert(0, os.path.normpath(os.path.join(os.path.dirname(os.path.abspath(__file__)), '../../api'))) from bots_api.bot_lib import ExternalBotHandler, StateHandler from django.conf import settings +from django.db import connection from django.core.handlers.wsgi import WSGIRequest from django.core.handlers.base import BaseHandler from zerver.models import \ @@ -91,6 +93,15 @@ def get_active_worker_queues(queue_type=None): return list(worker_classes.keys()) return list(queues[queue_type].keys()) +def check_and_send_restart_signal(): + # type: () -> None + try: + if not connection.is_usable(): + logging.warning("*** Sending self SIGUSR1 to trigger a restart.") + os.kill(os.getpid(), signal.SIGUSR1) + except Exception: + pass + class QueueProcessingWorker(object): queue_name = None # type: str @@ -119,7 +130,9 @@ class QueueProcessingWorker(object): with lockfile(lock_fn): with open(fn, 'ab') as f: f.write(line.encode('utf-8')) - reset_queries() + check_and_send_restart_signal() + finally: + reset_queries() def _log_problem(self): # type: () -> None @@ -363,6 +376,10 @@ class MessageSenderWorker(QueueProcessingWorker): server_meta['worker_log_data'] = request._log_data resp_content = resp.content.decode('utf-8') + response_data = ujson.loads(resp_content) + if response_data['result'] == 'error': + check_and_send_restart_signal() + result = {'response': ujson.loads(resp_content), 'req_id': event['req_id'], 'server_meta': server_meta}