diff --git a/puppet/zulip/templates/supervisor/zulip.conf.template.erb b/puppet/zulip/templates/supervisor/zulip.conf.template.erb index 10f978f4e9..4bfa8a1b65 100644 --- a/puppet/zulip/templates/supervisor/zulip.conf.template.erb +++ b/puppet/zulip/templates/supervisor/zulip.conf.template.erb @@ -85,6 +85,20 @@ stdout_logfile_maxbytes=20MB ; max # logfile bytes b4 rotation (default 50MB) stdout_logfile_backups=3 ; # of stdout logfile backups (default 10) directory=/home/zulip/deployments/current/ +[program:zulip_deliver_scheduled_messages] +command=/home/zulip/deployments/current/manage.py deliver_scheduled_messages +priority=350 ; the relative start priority (default 999) +autostart=true ; start at supervisord start (default: true) +autorestart=true ; whether/when to restart (default: unexpected) +stopsignal=TERM ; signal used to kill process (default TERM) +topwaitsecs=30 ; max num secs to wait b4 SIGKILL (default 10) +user=zulip ; setuid to this UNIX account to run the program +redirect_stderr=true ; redirect proc stderr to stdout (default false) +stdout_logfile=/var/log/zulip/scheduled_message_deliverer.log ; stdout log path, NONE for none; default AUTO +stdout_logfile_maxbytes=20MB ; max # logfile bytes b4 rotation (default 50MB) +stdout_logfile_backups=3 ; # of stdout logfile backups (default 10) +directory=/home/zulip/deployments/current/ + [program:zulip_events_message_sender] command=/home/zulip/deployments/current/manage.py process_queue --queue_name=message_sender --worker_num=%(process_num)s process_name=%(program_name)s-%(process_num)s @@ -108,9 +122,9 @@ numprocs=<%= @message_sender_processes %> [group:zulip-workers] <% if @queues_multiprocess %> ; each refers to 'x' in [program:x] definitions -programs=zulip_deliver_enqueued_emails, <% @queues.each_with_index do |queue, i| -%>zulip_events_<%= queue %><%= ',' if i < (@queues.size - 1) %> <% end -%> +programs=zulip_deliver_enqueued_emails, zulip_deliver_scheduled_messages, <% @queues.each_with_index do |queue, i| -%>zulip_events_<%= queue %><%= ',' if i < (@queues.size - 1) %> <% end -%> <% else %> -programs=zulip_deliver_enqueued_emails, zulip_events +programs=zulip_deliver_enqueued_emails, zulip_events, zulip_deliver_scheduled_messages <% end %> [group:zulip-senders] diff --git a/tools/run-dev.py b/tools/run-dev.py index 6a875185f6..731c500b98 100755 --- a/tools/run-dev.py +++ b/tools/run-dev.py @@ -151,7 +151,8 @@ cmds = [['./tools/compile-handlebars-templates', 'forever'], manage_args + ['127.0.0.1:%d' % (tornado_port,)], ['./tools/run-dev-queue-processors'] + manage_args, ['env', 'PGHOST=127.0.0.1', # Force password authentication using .pgpass - './puppet/zulip/files/postgresql/process_fts_updates']] + './puppet/zulip/files/postgresql/process_fts_updates'], + ['./manage.py', 'deliver_scheduled_messages']] if options.test: # Webpack doesn't support 2 copies running on the same system, so # in order to support running the Casper tests while a Zulip diff --git a/zerver/management/commands/deliver_scheduled_messages.py b/zerver/management/commands/deliver_scheduled_messages.py new file mode 100644 index 0000000000..0011d0b9e3 --- /dev/null +++ b/zerver/management/commands/deliver_scheduled_messages.py @@ -0,0 +1,56 @@ +import logging +import time +from typing import Any, Dict +from datetime import timedelta + +from django.conf import settings +from django.core.management.base import BaseCommand +from django.db import transaction +from django.utils.timezone import now as timezone_now + +from zerver.lib.context_managers import lockfile +from zerver.lib.logging_util import log_to_file +from zerver.models import ScheduledMessage, Message +from zerver.lib.actions import do_send_messages +from zerver.lib.addressee import Addressee + +## Setup ## +logger = logging.getLogger(__name__) +log_to_file(logger, settings.SCHEDULED_MESSAGE_DELIVERER_LOG_PATH) + +class Command(BaseCommand): + help = """Deliver scheduled messages from the ScheduledMessage table. +Run this command under supervisor. + +Usage: ./manage.py deliver_scheduled_messages +""" + + def construct_message(self, scheduled_message: ScheduledMessage) -> Dict[str, Any]: + message = Message() + message.sender = scheduled_message.sender + message.content = scheduled_message.content + message.recipient = scheduled_message.recipient + message.subject = scheduled_message.subject + message.pub_date = timezone_now() + message.sending_client = scheduled_message.sending_client + + return {'message': message, 'stream': scheduled_message.stream, + 'realm': scheduled_message.realm} + + def handle(self, *args: Any, **options: Any) -> None: + with lockfile("/tmp/zulip_scheduled_message_deliverer.lockfile"): + while True: + messages_to_deliver = ScheduledMessage.objects.filter( + scheduled_timestamp__lte=timezone_now(), + delivered=False) + if messages_to_deliver: + for message in messages_to_deliver: + with transaction.atomic(): + do_send_messages([self.construct_message(message)]) + message.delivered = True + message.save(update_fields=['delivered']) + + cur_time = timezone_now() + time_next_min = (cur_time + timedelta(minutes=1)).replace(second=0, microsecond=0) + sleep_time = (time_next_min - cur_time).total_seconds() + time.sleep(sleep_time) diff --git a/zproject/settings.py b/zproject/settings.py index 612fa7b752..f29b08c568 100644 --- a/zproject/settings.py +++ b/zproject/settings.py @@ -1203,6 +1203,8 @@ ZULIP_PATHS = [ ("API_KEY_ONLY_WEBHOOK_LOG_PATH", "/var/log/zulip/webhooks_errors.log"), ("SOFT_DEACTIVATION_LOG_PATH", "/var/log/zulip/soft_deactivation.log"), ("TRACEMALLOC_DUMP_DIR", "/var/log/zulip/tracemalloc"), + ("SCHEDULED_MESSAGE_DELIVERER_LOG_PATH", + "/var/log/zulip/scheduled_message_deliverer.log"), ] # The Event log basically logs most significant database changes, @@ -1399,6 +1401,9 @@ LOGGING = { 'zerver.management.commands.enqueue_digest_emails': { 'level': 'DEBUG', }, + 'zerver.management.commands.deliver_scheduled_messages': { + 'level': 'DEBUG', + }, 'zulip.management': { 'handlers': ['file', 'errors_file'], 'propagate': False,