diff --git a/tools/run-dev-queue-processors b/tools/run-dev-queue-processors index 4172ff3d33..1add2ddd81 100755 --- a/tools/run-dev-queue-processors +++ b/tools/run-dev-queue-processors @@ -16,6 +16,5 @@ from zerver.worker.queue_processors import get_active_worker_queues queues = get_active_worker_queues() args = sys.argv[1:] -for queue in queues: - subprocess.Popen(['python', 'manage.py', 'process_queue'] + args + [queue], - stderr=subprocess.STDOUT) +subprocess.Popen(['python', 'manage.py', 'process_queue', '--all'] + args, + stderr=subprocess.STDOUT) diff --git a/zerver/lib/db.py b/zerver/lib/db.py index 3aad593bbb..ec5e6112b9 100644 --- a/zerver/lib/db.py +++ b/zerver/lib/db.py @@ -41,4 +41,5 @@ class TimeTrackingConnection(connection): def reset_queries(): from django.db import connections for conn in connections.all(): - conn.connection.queries = [] + if conn.connection is not None: + conn.connection.queries = [] diff --git a/zerver/management/commands/process_queue.py b/zerver/management/commands/process_queue.py index 8141c6fefe..32b37a148a 100644 --- a/zerver/management/commands/process_queue.py +++ b/zerver/management/commands/process_queue.py @@ -3,39 +3,57 @@ from __future__ import absolute_import from django.core.management.base import BaseCommand from django.core.management import CommandError from django.conf import settings -from zerver.worker.queue_processors import get_worker +from zerver.worker.queue_processors import get_worker, get_active_worker_queues import sys import signal import logging +import threading class Command(BaseCommand): def add_arguments(self, parser): - parser.add_argument('queue_name', metavar='', type=str, + parser.add_argument('--queue_name', metavar='', type=str, help="queue to process") - parser.add_argument('worker_num', metavar='', type=int, nargs='?', default=0, + parser.add_argument('--worker_num', metavar='', type=int, nargs='?', default=0, help="worker label") + parser.add_argument('--all', dest="all", action="store_true", default=False, + help="run all queues") help = "Runs a queue processing worker" def handle(self, *args, **options): logging.basicConfig() logger = logging.getLogger('process_queue') - queue_name = options['queue_name'] - worker_num = options['worker_num'] - if not settings.USING_RABBITMQ: logger.error("Cannot run a queue processor when USING_RABBITMQ is False!") sys.exit(1) - logger.info("Worker %d connecting to queue %s" % (worker_num, queue_name)) - worker = get_worker(queue_name) + if options['all']: + for queue_name in get_active_worker_queues(): + logger.info('launching queue worker thread ' + queue_name) + td = Threaded_worker(queue_name) + td.start() + else: + queue_name = options['queue_name'] + worker_num = options['worker_num'] - def signal_handler(signal, frame): - logger.info("Worker %d disconnecting from queue %s" % (worker_num, queue_name)) - worker.stop() - sys.exit(0) - signal.signal(signal.SIGTERM, signal_handler) - signal.signal(signal.SIGINT, signal_handler) + logger.info("Worker %d connecting to queue %s" % (worker_num, queue_name)) + worker = get_worker(queue_name) - worker.start() + def signal_handler(signal, frame): + logger.info("Worker %d disconnecting from queue %s" % (worker_num, queue_name)) + worker.stop() + sys.exit(0) + signal.signal(signal.SIGTERM, signal_handler) + signal.signal(signal.SIGINT, signal_handler) + worker.start() + +class Threaded_worker(threading.Thread): + def __init__(self, queue_name): + threading.Thread.__init__(self) + self.worker = get_worker(queue_name) + + def run(self): + self.worker.setup() + logging.debug('starting consuming ' + self.worker.queue_name) + self.worker.start() diff --git a/zerver/tests.py b/zerver/tests.py index 0fab24285c..12f163092f 100644 --- a/zerver/tests.py +++ b/zerver/tests.py @@ -232,6 +232,7 @@ class WorkerTest(TestCase): with simulated_queue_client(lambda: fake_client): worker = queue_processors.UserActivityWorker() + worker.setup() worker.start() activity_records = UserActivity.objects.filter( user_profile = user.id, @@ -266,6 +267,7 @@ class WorkerTest(TestCase): with simulated_queue_client(lambda: fake_client): worker = UnreliableWorker() + worker.setup() worker.start() self.assertEqual(processed, ['good', 'fine', 'back to normal']) diff --git a/zerver/worker/queue_processors.py b/zerver/worker/queue_processors.py index 0a24459313..2dc332b8bb 100644 --- a/zerver/worker/queue_processors.py +++ b/zerver/worker/queue_processors.py @@ -59,7 +59,7 @@ class QueueProcessingWorker(object): queue_name = None def __init__(self): - self.q = SimpleQueueClient() + self.q = None if self.queue_name is None: raise WorkerDeclarationException("Queue worker declared without queue_name") @@ -85,6 +85,9 @@ class QueueProcessingWorker(object): def _log_problem(self): logging.exception("Problem handling data on queue %s" % (self.queue_name,)) + def setup(self): + self.q = SimpleQueueClient() + def start(self): self.q.register_json_consumer(self.queue_name, self.consume_wrapper) self.q.start_consuming()