queue: Allow sharding user_activity worker.

This follows the existing patterns for the sharded mobile
notifications worker.
This commit is contained in:
Tim Abbott
2025-06-05 23:28:55 -07:00
parent 9b43e5e3b3
commit 0ec07fe4c8
7 changed files with 33 additions and 1 deletions

View File

@@ -188,6 +188,7 @@ class zulip::app_frontend_base {
[regsubst($key, '_workers$', ''), Integer(zulipconf('application_server', $key, 1))]
})
$mobile_notification_shards = Integer(zulipconf('application_server', 'mobile_notification_shards', 1))
$user_activity_shards = Integer(zulipconf('application_server', 'user_activity_shards', 1))
$tornado_ports = $zulip::tornado_sharding::tornado_ports
$proxy_host = zulipconf('http_proxy', 'host', 'localhost')

View File

@@ -84,6 +84,9 @@ directory=/home/zulip/deployments/current/
if queue == "missedmessage_mobile_notifications"
numprocs = @mobile_notification_shards
term = "shard"
elsif queue == "user_activity"
numprocs = @user_activity_shards
term = "shard"
elsif @worker_counts.has_key?(queue)
numprocs = @worker_counts[queue]
end -%>

View File

@@ -31,6 +31,9 @@ normal_queues = [
mobile_notification_shards = int(
get_config(get_config_file(), "application_server", "mobile_notification_shards", "1")
)
user_activity_shards = int(
get_config(get_config_file(), "application_server", "user_activity_shards", "1")
)
OK = 0
WARNING = 1
@@ -175,6 +178,8 @@ def check_rabbitmq_queues() -> None:
f"missedmessage_mobile_notifications_shard{d}"
for d in range(1, mobile_notification_shards + 1)
]
if user_activity_shards > 1:
check_queues += [f"user_activity_shard{d}" for d in range(1, user_activity_shards + 1)]
queues_to_check = set(check_queues).intersection(set(queues_with_consumers))
for queue in queues_to_check:

View File

@@ -62,6 +62,10 @@ for queue_name, count in consumers.items():
target_count = int(
get_config(config_file, "application_server", "mobile_notification_shards", "1")
)
elif queue_name == "user_activity":
target_count = int(
get_config(config_file, "application_server", "user_activity_shards", "1")
)
else:
target_count = int(
get_config(config_file, "application_server", f"{queue_name}_workers", "1")

View File

@@ -91,7 +91,13 @@ def update_user_activity(
"time": datetime_to_timestamp(timezone_now()),
"client_id": request_notes.client.id,
}
queue_json_publish_rollback_unsafe("user_activity", event, lambda event: None)
queue_name = "user_activity"
if settings.USER_ACTIVITY_SHARDS > 1: # nocoverage
shard_id = user_profile.id % settings.USER_ACTIVITY_SHARDS + 1
queue_name = f"user_activity_shard{shard_id}"
queue_json_publish_rollback_unsafe(queue_name, event, lambda event: None)
# Based on django.views.decorators.http.require_http_methods

View File

@@ -2,6 +2,7 @@
import logging
from typing import Any
from django.conf import settings
from django.db import connection
from psycopg2.sql import SQL, Literal
from typing_extensions import override
@@ -33,6 +34,17 @@ class UserActivityWorker(LoopQueueProcessingWorker):
client_id_map: dict[str, int] = {}
@override
def __init__(
self,
threaded: bool = False,
disable_timeout: bool = False,
worker_num: int | None = None,
) -> None:
if settings.USER_ACTIVITY_SHARDS > 1 and worker_num is not None: # nocoverage
self.queue_name += f"_shard{worker_num}"
super().__init__(threaded, disable_timeout, worker_num)
@override
def start(self) -> None:
# For our unit tests to make sense, we need to clear this on startup.

View File

@@ -1279,6 +1279,7 @@ CROSS_REALM_BOT_EMAILS = {
MOBILE_NOTIFICATIONS_SHARDS = int(
get_config("application_server", "mobile_notification_shards", "1")
)
USER_ACTIVITY_SHARDS = int(get_config("application_server", "user_activity_shards", "1"))
TWO_FACTOR_PATCH_ADMIN = False