diff --git a/puppet/zulip/manifests/app_frontend_base.pp b/puppet/zulip/manifests/app_frontend_base.pp index 8532a0dc06..0d9b35600b 100644 --- a/puppet/zulip/manifests/app_frontend_base.pp +++ b/puppet/zulip/manifests/app_frontend_base.pp @@ -188,6 +188,7 @@ class zulip::app_frontend_base { [regsubst($key, '_workers$', ''), Integer(zulipconf('application_server', $key, 1))] }) $mobile_notification_shards = Integer(zulipconf('application_server', 'mobile_notification_shards', 1)) + $user_activity_shards = Integer(zulipconf('application_server', 'user_activity_shards', 1)) $tornado_ports = $zulip::tornado_sharding::tornado_ports $proxy_host = zulipconf('http_proxy', 'host', 'localhost') diff --git a/puppet/zulip/templates/supervisor/zulip.conf.template.erb b/puppet/zulip/templates/supervisor/zulip.conf.template.erb index 337f6486f5..96612944c0 100644 --- a/puppet/zulip/templates/supervisor/zulip.conf.template.erb +++ b/puppet/zulip/templates/supervisor/zulip.conf.template.erb @@ -84,6 +84,9 @@ directory=/home/zulip/deployments/current/ if queue == "missedmessage_mobile_notifications" numprocs = @mobile_notification_shards term = "shard" + elsif queue == "user_activity" + numprocs = @user_activity_shards + term = "shard" elsif @worker_counts.has_key?(queue) numprocs = @worker_counts[queue] end -%> diff --git a/scripts/lib/check_rabbitmq_queue.py b/scripts/lib/check_rabbitmq_queue.py index fca71f0a8d..3cc6e3dbea 100644 --- a/scripts/lib/check_rabbitmq_queue.py +++ b/scripts/lib/check_rabbitmq_queue.py @@ -31,6 +31,9 @@ normal_queues = [ mobile_notification_shards = int( get_config(get_config_file(), "application_server", "mobile_notification_shards", "1") ) +user_activity_shards = int( + get_config(get_config_file(), "application_server", "user_activity_shards", "1") +) OK = 0 WARNING = 1 @@ -175,6 +178,8 @@ def check_rabbitmq_queues() -> None: f"missedmessage_mobile_notifications_shard{d}" for d in range(1, mobile_notification_shards + 1) ] + if user_activity_shards > 1: + check_queues += [f"user_activity_shard{d}" for d in range(1, user_activity_shards + 1)] queues_to_check = set(check_queues).intersection(set(queues_with_consumers)) for queue in queues_to_check: diff --git a/scripts/nagios/check-rabbitmq-consumers b/scripts/nagios/check-rabbitmq-consumers index ff465716a1..529e75aee3 100755 --- a/scripts/nagios/check-rabbitmq-consumers +++ b/scripts/nagios/check-rabbitmq-consumers @@ -62,6 +62,10 @@ for queue_name, count in consumers.items(): target_count = int( get_config(config_file, "application_server", "mobile_notification_shards", "1") ) + elif queue_name == "user_activity": + target_count = int( + get_config(config_file, "application_server", "user_activity_shards", "1") + ) else: target_count = int( get_config(config_file, "application_server", f"{queue_name}_workers", "1") diff --git a/zerver/decorator.py b/zerver/decorator.py index a12a434f9d..0733210bd0 100644 --- a/zerver/decorator.py +++ b/zerver/decorator.py @@ -91,7 +91,13 @@ def update_user_activity( "time": datetime_to_timestamp(timezone_now()), "client_id": request_notes.client.id, } - queue_json_publish_rollback_unsafe("user_activity", event, lambda event: None) + + queue_name = "user_activity" + if settings.USER_ACTIVITY_SHARDS > 1: # nocoverage + shard_id = user_profile.id % settings.USER_ACTIVITY_SHARDS + 1 + queue_name = f"user_activity_shard{shard_id}" + + queue_json_publish_rollback_unsafe(queue_name, event, lambda event: None) # Based on django.views.decorators.http.require_http_methods diff --git a/zerver/worker/user_activity.py b/zerver/worker/user_activity.py index 36c2acceba..417b1ef5ff 100644 --- a/zerver/worker/user_activity.py +++ b/zerver/worker/user_activity.py @@ -2,6 +2,7 @@ import logging from typing import Any +from django.conf import settings from django.db import connection from psycopg2.sql import SQL, Literal from typing_extensions import override @@ -33,6 +34,17 @@ class UserActivityWorker(LoopQueueProcessingWorker): client_id_map: dict[str, int] = {} + @override + def __init__( + self, + threaded: bool = False, + disable_timeout: bool = False, + worker_num: int | None = None, + ) -> None: + if settings.USER_ACTIVITY_SHARDS > 1 and worker_num is not None: # nocoverage + self.queue_name += f"_shard{worker_num}" + super().__init__(threaded, disable_timeout, worker_num) + @override def start(self) -> None: # For our unit tests to make sense, we need to clear this on startup. diff --git a/zproject/computed_settings.py b/zproject/computed_settings.py index 2815949255..74fbaf149b 100644 --- a/zproject/computed_settings.py +++ b/zproject/computed_settings.py @@ -1279,6 +1279,7 @@ CROSS_REALM_BOT_EMAILS = { MOBILE_NOTIFICATIONS_SHARDS = int( get_config("application_server", "mobile_notification_shards", "1") ) +USER_ACTIVITY_SHARDS = int(get_config("application_server", "user_activity_shards", "1")) TWO_FACTOR_PATCH_ADMIN = False