From 940cf9db3b42ed41ededa3d580370f5ef0214a51 Mon Sep 17 00:00:00 2001 From: rht Date: Sun, 28 May 2017 08:17:29 +0200 Subject: [PATCH] Run queue processors multithreaded in production if system memory <3.5GB. While running queue processors multithreaded will limit the performance available to very small systems, it's easy to fix that by adding more RAM, and previously, Zulip didn't work on such systems at all, so this is unambiguously an improvement there. Fixes #32. Fixes #34. (Commit message expanded significantly by tabbott.) --- puppet/zulip/manifests/app_frontend_base.pp | 4 ++++ .../supervisor/zulip.conf.template.erb | 20 +++++++++++++++++++ zerver/management/commands/process_queue.py | 18 ++++++++++++----- 3 files changed, 37 insertions(+), 5 deletions(-) diff --git a/puppet/zulip/manifests/app_frontend_base.pp b/puppet/zulip/manifests/app_frontend_base.pp index 6a071a3aa4..c4cb3d3c9a 100644 --- a/puppet/zulip/manifests/app_frontend_base.pp +++ b/puppet/zulip/manifests/app_frontend_base.pp @@ -58,6 +58,10 @@ class zulip::app_frontend_base { } } + # This determines whether we run queue processors multithreaded or + # multiprocess. Multiprocess scales much better, but requires more + # RAM; we just auto-detect based on available system RAM. + $queues_multiprocess = $::memorysize_mb > 3500 $queues = $zulip::base::normal_queues file { "/etc/supervisor/conf.d/zulip.conf": require => Package[supervisor], diff --git a/puppet/zulip/templates/supervisor/zulip.conf.template.erb b/puppet/zulip/templates/supervisor/zulip.conf.template.erb index 2227f3740b..902ea405de 100644 --- a/puppet/zulip/templates/supervisor/zulip.conf.template.erb +++ b/puppet/zulip/templates/supervisor/zulip.conf.template.erb @@ -37,6 +37,7 @@ stdout_logfile_maxbytes=1GB ; max # logfile bytes b4 rotation (default 50MB) stdout_logfile_backups=10 ; # of stdout logfile backups (default 10) directory=/home/zulip/deployments/current/ +<% if @queues_multiprocess %> <% @queues.each do |queue| -%> [program:zulip_events_<%= queue %>] command=/home/zulip/deployments/current/manage.py process_queue --queue_name=<%= queue %> @@ -52,6 +53,21 @@ stdout_logfile_maxbytes=1GB ; max # logfile bytes b4 rotation (default 50MB) stdout_logfile_backups=10 ; # of stdout logfile backups (default 10) directory=/home/zulip/deployments/current/ <% end -%> +<% else %> +[program:zulip_events] +command=/home/zulip/deployments/current/manage.py process_queue --multi_threaded <%= @queues.join(' ') %> +priority=300 ; the relative start priority (default 999) +autostart=true ; start at supervisord start (default: true) +autorestart=true ; whether/when to restart (default: unexpected) +stopsignal=TERM ; signal used to kill process (default TERM) +stopwaitsecs=30 ; max num secs to wait b4 SIGKILL (default 10) +user=zulip ; setuid to this UNIX account to run the program +redirect_stderr=true ; redirect proc stderr to stdout (default false) +stdout_logfile=/var/log/zulip/events.log ; stdout log path, NONE for none; default AUTO +stdout_logfile_maxbytes=1GB ; max # logfile bytes b4 rotation (default 50MB) +stdout_logfile_backups=10 ; # of stdout logfile backups (default 10) +directory=/home/zulip/deployments/current/ +<% end %> [program:zulip_deliver_enqueued_emails] command=/home/zulip/deployments/current/manage.py deliver_email @@ -88,8 +104,12 @@ numprocs=5 ; process groups. [group:zulip-workers] +<% if @queues_multiprocess %> ; each refers to 'x' in [program:x] definitions programs=zulip_deliver_enqueued_emails, <% @queues.each_with_index do |queue, i| -%>zulip_events_<%= queue %><%= ',' if i < (@queues.size - 1) %> <% end -%> +<% else %> +programs=zulip_deliver_enqueued_emails, zulip_events +<% end %> [group:zulip-senders] programs=zulip_events_message_sender diff --git a/zerver/management/commands/process_queue.py b/zerver/management/commands/process_queue.py index 0093d30896..23dd24fece 100644 --- a/zerver/management/commands/process_queue.py +++ b/zerver/management/commands/process_queue.py @@ -1,7 +1,7 @@ from __future__ import absolute_import from types import FrameType -from typing import Any +from typing import Any, List from argparse import ArgumentParser from django.core.management.base import BaseCommand @@ -23,6 +23,10 @@ class Command(BaseCommand): help="worker label") parser.add_argument('--all', dest="all", action="store_true", default=False, help="run all queues") + parser.add_argument('--multi_threaded', nargs='+', + metavar='', + type=str, required=False, + help="list of queue to process") help = "Runs a queue processing worker" @@ -39,19 +43,23 @@ class Command(BaseCommand): logger.error("Cannot run a queue processor when USING_RABBITMQ is False!") sys.exit(1) - def run_threaded_workers(logger): - # type: (logging.Logger) -> None + def run_threaded_workers(queues, logger): + # type: (List[str], logging.Logger) -> None cnt = 0 - for queue_name in get_active_worker_queues(): + for queue_name in queues: if not settings.DEVELOPMENT: logger.info('launching queue worker thread ' + queue_name) cnt += 1 td = Threaded_worker(queue_name) td.start() + assert len(queues) == cnt logger.info('%d queue worker threads were launched' % (cnt,)) if options['all']: - autoreload.main(run_threaded_workers, (logger,)) + autoreload.main(run_threaded_workers, (get_active_worker_queues(), logger)) + elif options['multi_threaded']: + queues = options['multi_threaded'] + autoreload.main(run_threaded_workers, (queues, logger)) else: queue_name = options['queue_name'] worker_num = options['worker_num']