Files
zulip/zerver/management/commands/update_subscriber_counts.py
Anders Kaseorg 4569dd5a29 update_subscriber_counts: Fix union typing.
Signed-off-by: Anders Kaseorg <anders@zulip.com>
(cherry picked from commit 7f3bb4523c)
2025-10-22 16:08:09 -04:00

121 lines
4.7 KiB
Python

import argparse
import logging
from datetime import timedelta
from typing import Any
from django.conf import settings
from django.db import transaction
from django.db.models import F, QuerySet
from django.utils.timezone import now as timezone_now
from typing_extensions import override
from zerver.lib.logging_util import log_to_file
from zerver.lib.management import ZulipBaseCommand
from zerver.models import RealmAuditLog, Stream, Subscription
from zerver.models.realm_audit_logs import AuditLogEventType
## Logging setup ##
logger = logging.getLogger(__name__)
log_to_file(logger, settings.DIGEST_LOG_PATH)
class Command(ZulipBaseCommand):
help = """Update the `Stream.subscriber_count` field based on current subscribers.
There may be race conditions with keeping the cached subscriber count
accurate; this command is run as a daily cron job to ensure the number is accurate.
"""
@override
def add_arguments(self, parser: argparse.ArgumentParser) -> None:
parser.add_argument(
"--since",
type=int,
help="Only examine channels with changed subscribers in this many hours",
)
self.add_realm_args(parser, help="The optional name of the realm to limit to")
@override
def handle(self, *args: Any, **options: Any) -> None:
realm = self.get_realm(options)
streams = Stream.objects.all()
if options["since"]:
since_time = timezone_now() - timedelta(hours=options["since"])
# Two ways the count can change -- via a subscription
# being changed, or via a user being (de)activated.
changed_subs = RealmAuditLog.objects.filter(
event_type__in=(
AuditLogEventType.SUBSCRIPTION_CREATED,
AuditLogEventType.SUBSCRIPTION_ACTIVATED,
AuditLogEventType.SUBSCRIPTION_DEACTIVATED,
),
event_time__gte=since_time,
)
if realm:
changed_subs = changed_subs.filter(realm=realm)
# Find all users changed in the time period, join those to
# their subscriptions and distinct recipients, and thence
# to streams.
changed_users = RealmAuditLog.objects.filter(
event_type__in=(
AuditLogEventType.USER_CREATED,
AuditLogEventType.USER_DEACTIVATED,
AuditLogEventType.USER_ACTIVATED,
AuditLogEventType.USER_REACTIVATED,
),
event_time__gte=since_time,
)
if realm:
changed_users = changed_users.filter(realm=realm)
changed_user_ids = (
changed_users.values_list("modified_user_id", flat=True)
.distinct()
.order_by("modified_user_id")
)
changed_user_subs = (
Subscription.objects.filter(user_profile_id__in=changed_user_ids)
.values_list("recipient_id", flat=True)
.distinct()
.order_by("recipient_id")
)
streams_from_users = Stream.objects.filter(recipient_id__in=changed_user_subs)
if realm:
streams_from_users = streams_from_users.filter(realm=realm)
stream_ids: QuerySet[Any, int] = (
changed_subs.distinct("modified_stream_id")
.order_by("modified_stream_id")
.annotate(stream_id=F("modified_stream_id"))
.union(streams_from_users.annotate(stream_id="id"))
.values_list("stream_id", flat=True)
)
elif realm := self.get_realm(options):
stream_ids = streams.filter(realm=realm).values_list("id", flat=True)
else:
stream_ids = streams.all().values_list("id", flat=True)
for stream_id in stream_ids.iterator():
with transaction.atomic(durable=True):
stream = Stream.objects.select_for_update().get(id=stream_id)
actual_subscriber_count = Subscription.objects.filter(
active=True,
recipient__type=2,
recipient__type_id=stream_id,
is_user_active=True,
).count()
db_count = stream.subscriber_count
if actual_subscriber_count == db_count:
continue
stream.subscriber_count = actual_subscriber_count
stream.save(update_fields=["subscriber_count"])
logging.info(
"Updated subscriber count of %s, #%s: from %d to %d",
stream.realm.string_id,
stream.name,
db_count,
actual_subscriber_count,
)