mirror of
https://github.com/zulip/zulip.git
synced 2025-10-23 04:52:12 +00:00
update_subscriber_counts: Add tool to re-sync subscriber counts.
(cherry picked from commit f915498e48
)
This commit is contained in:
committed by
Alex Vandiver
parent
0d253b0989
commit
23de6ae1dd
118
zerver/management/commands/update_subscriber_counts.py
Normal file
118
zerver/management/commands/update_subscriber_counts.py
Normal file
@@ -0,0 +1,118 @@
|
||||
import argparse
|
||||
import logging
|
||||
from datetime import timedelta
|
||||
from typing import Any
|
||||
|
||||
from django.conf import settings
|
||||
from django.db import transaction
|
||||
from django.db.models import QuerySet
|
||||
from django.utils.timezone import now as timezone_now
|
||||
from typing_extensions import override
|
||||
|
||||
from zerver.lib.logging_util import log_to_file
|
||||
from zerver.lib.management import ZulipBaseCommand
|
||||
from zerver.models import RealmAuditLog, Stream, Subscription
|
||||
from zerver.models.realm_audit_logs import AuditLogEventType
|
||||
|
||||
## Logging setup ##
|
||||
logger = logging.getLogger(__name__)
|
||||
log_to_file(logger, settings.DIGEST_LOG_PATH)
|
||||
|
||||
|
||||
class Command(ZulipBaseCommand):
|
||||
help = """Update the `Stream.subscriber_count` field based on current subscribers.
|
||||
|
||||
There may be race conditions with keeping the cached subscriber count
|
||||
accurate; this command is run as a daily cron job to ensure the number is accurate.
|
||||
"""
|
||||
|
||||
@override
|
||||
def add_arguments(self, parser: argparse.ArgumentParser) -> None:
|
||||
parser.add_argument(
|
||||
"--since",
|
||||
type=int,
|
||||
help="Only examine channels with changed subscribers in this many hours",
|
||||
)
|
||||
self.add_realm_args(parser, help="The optional name of the realm to limit to")
|
||||
|
||||
@override
|
||||
def handle(self, *args: Any, **options: Any) -> None:
|
||||
realm = self.get_realm(options)
|
||||
streams = Stream.objects.all()
|
||||
if realm := self.get_realm(options):
|
||||
stream_ids: QuerySet[Any, int] = streams.filter(realm=realm).values_list(
|
||||
"id", flat=True
|
||||
)
|
||||
if options["since"]:
|
||||
since_time = timezone_now() - timedelta(hours=options["since"])
|
||||
# Two ways the count can change -- via a subscription
|
||||
# being changed, or via a user being (de)activated.
|
||||
changed_subs = RealmAuditLog.objects.filter(
|
||||
event_type__in=(
|
||||
AuditLogEventType.SUBSCRIPTION_CREATED,
|
||||
AuditLogEventType.SUBSCRIPTION_ACTIVATED,
|
||||
AuditLogEventType.SUBSCRIPTION_DEACTIVATED,
|
||||
),
|
||||
event_time__gte=since_time,
|
||||
)
|
||||
if realm:
|
||||
changed_subs = changed_subs.filter(realm=realm)
|
||||
|
||||
# Find all users changed in the time period, join those to
|
||||
# their subscriptions and distinct recipients, and thence
|
||||
# to streams.
|
||||
changed_users = RealmAuditLog.objects.filter(
|
||||
event_type__in=(
|
||||
AuditLogEventType.USER_CREATED,
|
||||
AuditLogEventType.USER_DEACTIVATED,
|
||||
AuditLogEventType.USER_ACTIVATED,
|
||||
AuditLogEventType.USER_REACTIVATED,
|
||||
),
|
||||
event_time__gte=since_time,
|
||||
)
|
||||
if realm:
|
||||
changed_users = changed_users.filter(realm=realm)
|
||||
changed_user_ids = (
|
||||
changed_users.values_list("modified_user_id", flat=True)
|
||||
.distinct()
|
||||
.order_by("modified_user_id")
|
||||
)
|
||||
|
||||
changed_user_subs = (
|
||||
Subscription.objects.filter(user_profile_id__in=changed_user_ids)
|
||||
.values_list("recipient_id", flat=True)
|
||||
.distinct()
|
||||
.order_by("recipient_id")
|
||||
)
|
||||
streams_from_users = Stream.objects.filter(recipient_id__in=changed_user_subs)
|
||||
if realm:
|
||||
streams_from_users = streams_from_users.filter(realm=realm)
|
||||
|
||||
stream_ids = (
|
||||
changed_subs.values_list("modified_stream_id", flat=True)
|
||||
.distinct()
|
||||
.order_by("modified_stream_id")
|
||||
).union(streams_from_users.values_list("id", flat=True))
|
||||
|
||||
for stream_id in stream_ids.iterator():
|
||||
with transaction.atomic(durable=True):
|
||||
stream = Stream.objects.select_for_update().get(id=stream_id)
|
||||
actual_subscriber_count = Subscription.objects.filter(
|
||||
active=True,
|
||||
recipient__type=2,
|
||||
recipient__type_id=stream_id,
|
||||
is_user_active=True,
|
||||
).count()
|
||||
db_count = stream.subscriber_count
|
||||
if actual_subscriber_count == db_count:
|
||||
continue
|
||||
stream.subscriber_count = actual_subscriber_count
|
||||
stream.save(update_fields=["subscriber_count"])
|
||||
|
||||
logging.info(
|
||||
"Updated subscriber count of %s, #%s: from %d to %d",
|
||||
stream.realm.string_id,
|
||||
stream.name,
|
||||
db_count,
|
||||
actual_subscriber_count,
|
||||
)
|
Reference in New Issue
Block a user