process_queue: Recover gracefully after PostgreSQL restart.

- For threaded workers:
Django's autoreloader catches SIGQUIT(3) to reload the program. If
a process being watched by autoreloader exits with status code 3,
reloader will restart the process. To reload, we send SIGUSR1(10)
signal from consumers to a handler in process_queue which then
exits with status code 3.

- For single worker per process:
Catch the SIGUSR1 and quit; supervisorctl will restart the worker
automatically.

Fixes #5512
This commit is contained in:
Umair Khan
2017-07-03 15:52:55 +05:00
committed by Tim Abbott
parent 28861c225b
commit 0e8231d0f1
2 changed files with 32 additions and 1 deletions

View File

@@ -9,10 +9,12 @@ from django.core.management import CommandError
from django.conf import settings from django.conf import settings
from django.utils import autoreload from django.utils import autoreload
from zerver.worker.queue_processors import get_worker, get_active_worker_queues from zerver.worker.queue_processors import get_worker, get_active_worker_queues
import os
import sys import sys
import signal import signal
import logging import logging
import threading import threading
import subprocess
class Command(BaseCommand): class Command(BaseCommand):
def add_arguments(self, parser): def add_arguments(self, parser):
@@ -35,6 +37,15 @@ class Command(BaseCommand):
logging.basicConfig() logging.basicConfig()
logger = logging.getLogger('process_queue') logger = logging.getLogger('process_queue')
def exit_with_three(signal, frame):
# type: (int, FrameType) -> None
"""
This process is watched by Django's autoreload, so exiting
with status code 3 will cause this process to restart.
"""
logger.warn("SIGUSR1 received. Restarting this queue processor.")
sys.exit(3)
if not settings.USING_RABBITMQ: if not settings.USING_RABBITMQ:
# Make the warning silent when running the tests # Make the warning silent when running the tests
if settings.TEST_SUITE: if settings.TEST_SUITE:
@@ -56,8 +67,10 @@ class Command(BaseCommand):
logger.info('%d queue worker threads were launched' % (cnt,)) logger.info('%d queue worker threads were launched' % (cnt,))
if options['all']: if options['all']:
signal.signal(signal.SIGUSR1, exit_with_three)
autoreload.main(run_threaded_workers, (get_active_worker_queues(), logger)) autoreload.main(run_threaded_workers, (get_active_worker_queues(), logger))
elif options['multi_threaded']: elif options['multi_threaded']:
signal.signal(signal.SIGUSR1, exit_with_three)
queues = options['multi_threaded'] queues = options['multi_threaded']
autoreload.main(run_threaded_workers, (queues, logger)) autoreload.main(run_threaded_workers, (queues, logger))
else: else:
@@ -75,6 +88,7 @@ class Command(BaseCommand):
sys.exit(0) sys.exit(0)
signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGUSR1, signal_handler)
worker.start() worker.start()

View File

@@ -2,12 +2,14 @@
from __future__ import absolute_import from __future__ import absolute_import
from typing import Any, Callable, Dict, List, Mapping, Optional, cast from typing import Any, Callable, Dict, List, Mapping, Optional, cast
import signal
import sys import sys
import os import os
sys.path.insert(0, os.path.normpath(os.path.join(os.path.dirname(os.path.abspath(__file__)), '../../api'))) sys.path.insert(0, os.path.normpath(os.path.join(os.path.dirname(os.path.abspath(__file__)), '../../api')))
from bots_api.bot_lib import ExternalBotHandler, StateHandler from bots_api.bot_lib import ExternalBotHandler, StateHandler
from django.conf import settings from django.conf import settings
from django.db import connection
from django.core.handlers.wsgi import WSGIRequest from django.core.handlers.wsgi import WSGIRequest
from django.core.handlers.base import BaseHandler from django.core.handlers.base import BaseHandler
from zerver.models import \ from zerver.models import \
@@ -91,6 +93,15 @@ def get_active_worker_queues(queue_type=None):
return list(worker_classes.keys()) return list(worker_classes.keys())
return list(queues[queue_type].keys()) return list(queues[queue_type].keys())
def check_and_send_restart_signal():
# type: () -> None
try:
if not connection.is_usable():
logging.warning("*** Sending self SIGUSR1 to trigger a restart.")
os.kill(os.getpid(), signal.SIGUSR1)
except Exception:
pass
class QueueProcessingWorker(object): class QueueProcessingWorker(object):
queue_name = None # type: str queue_name = None # type: str
@@ -119,7 +130,9 @@ class QueueProcessingWorker(object):
with lockfile(lock_fn): with lockfile(lock_fn):
with open(fn, 'ab') as f: with open(fn, 'ab') as f:
f.write(line.encode('utf-8')) f.write(line.encode('utf-8'))
reset_queries() check_and_send_restart_signal()
finally:
reset_queries()
def _log_problem(self): def _log_problem(self):
# type: () -> None # type: () -> None
@@ -363,6 +376,10 @@ class MessageSenderWorker(QueueProcessingWorker):
server_meta['worker_log_data'] = request._log_data server_meta['worker_log_data'] = request._log_data
resp_content = resp.content.decode('utf-8') resp_content = resp.content.decode('utf-8')
response_data = ujson.loads(resp_content)
if response_data['result'] == 'error':
check_and_send_restart_signal()
result = {'response': ujson.loads(resp_content), 'req_id': event['req_id'], result = {'response': ujson.loads(resp_content), 'req_id': event['req_id'],
'server_meta': server_meta} 'server_meta': server_meta}