mirror of
				https://github.com/zulip/zulip.git
				synced 2025-10-31 03:53:50 +00:00 
			
		
		
		
	The query plan for fetching recent messages from the arbitrary set of streams formed by the intersection of 30 random users can be quite bad, and can descend into a sequential scan on `zerver_recipient`. Worse, this work of pulling recent messages out is redone if the stream appears in the next batch of 30 users. Instead, pull the recent messages for a stream on a one-by-one basis, but cache them in an in-memory cache. Since digests are enqueued in 30-user batches but still one-realm-at-a-time, work will be saved both in terms of faster query plans whose results can also be reused across batches. This requires that we pull the stream-id to stream-name mapping for _all_ streams in the realm at once, but that is well-indexed and unlikely to cause performance issues -- in fact, it may be faster than pulling a random subset of the streams in the realm.
		
			
				
	
	
		
			424 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			424 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import datetime
 | |
| import functools
 | |
| import heapq
 | |
| import logging
 | |
| from collections import defaultdict
 | |
| from typing import Any, Collection, Dict, List, Set, Tuple
 | |
| 
 | |
| from django.conf import settings
 | |
| from django.db import transaction
 | |
| from django.db.models import Exists, OuterRef
 | |
| from django.utils.timezone import now as timezone_now
 | |
| from typing_extensions import TypeAlias
 | |
| 
 | |
| from confirmation.models import one_click_unsubscribe_link
 | |
| from zerver.context_processors import common_context
 | |
| from zerver.lib.email_notifications import build_message_list
 | |
| from zerver.lib.logging_util import log_to_file
 | |
| from zerver.lib.message import get_last_message_id
 | |
| from zerver.lib.queue import queue_json_publish
 | |
| from zerver.lib.send_email import FromAddress, send_future_email
 | |
| from zerver.lib.url_encoding import encode_stream
 | |
| from zerver.models import (
 | |
|     Message,
 | |
|     Realm,
 | |
|     RealmAuditLog,
 | |
|     Recipient,
 | |
|     Stream,
 | |
|     Subscription,
 | |
|     UserActivityInterval,
 | |
|     UserProfile,
 | |
|     get_active_streams,
 | |
| )
 | |
| 
 | |
| logger = logging.getLogger(__name__)
 | |
| log_to_file(logger, settings.DIGEST_LOG_PATH)
 | |
| 
 | |
| DIGEST_CUTOFF = 5
 | |
| MAX_HOT_TOPICS_TO_BE_INCLUDED_IN_DIGEST = 4
 | |
| 
 | |
| TopicKey: TypeAlias = Tuple[int, str]
 | |
| 
 | |
| 
 | |
| class DigestTopic:
 | |
|     def __init__(self, topic_key: TopicKey) -> None:
 | |
|         self.topic_key = topic_key
 | |
|         self.human_senders: Set[str] = set()
 | |
|         self.sample_messages: List[Message] = []
 | |
|         self.num_human_messages = 0
 | |
| 
 | |
|     def stream_id(self) -> int:
 | |
|         # topic_key is (stream_id, topic_name)
 | |
|         return self.topic_key[0]
 | |
| 
 | |
|     def add_message(self, message: Message) -> None:
 | |
|         if len(self.sample_messages) < 2:
 | |
|             self.sample_messages.append(message)
 | |
| 
 | |
|         if message.sent_by_human():
 | |
|             self.human_senders.add(message.sender.full_name)
 | |
|             self.num_human_messages += 1
 | |
| 
 | |
|     def length(self) -> int:
 | |
|         return self.num_human_messages
 | |
| 
 | |
|     def diversity(self) -> int:
 | |
|         return len(self.human_senders)
 | |
| 
 | |
|     def teaser_data(self, user: UserProfile, stream_id_map: Dict[int, Stream]) -> Dict[str, Any]:
 | |
|         teaser_count = self.num_human_messages - len(self.sample_messages)
 | |
|         first_few_messages = build_message_list(
 | |
|             user=user,
 | |
|             messages=self.sample_messages,
 | |
|             stream_id_map=stream_id_map,
 | |
|         )
 | |
|         return {
 | |
|             "participants": sorted(self.human_senders),
 | |
|             "count": teaser_count,
 | |
|             "first_few_messages": first_few_messages,
 | |
|         }
 | |
| 
 | |
| 
 | |
| # Digests accumulate 2 types of interesting traffic for a user:
 | |
| # 1. New streams
 | |
| # 2. Interesting stream traffic, as determined by the longest and most
 | |
| #    diversely comment upon topics.
 | |
| 
 | |
| 
 | |
| # Changes to this should also be reflected in
 | |
| # zerver/worker/queue_processors.py:DigestWorker.consume()
 | |
| def queue_digest_user_ids(user_ids: List[int], cutoff: datetime.datetime) -> None:
 | |
|     # Convert cutoff to epoch seconds for transit.
 | |
|     event = {"user_ids": user_ids, "cutoff": cutoff.strftime("%s")}
 | |
|     queue_json_publish("digest_emails", event)
 | |
| 
 | |
| 
 | |
| def enqueue_emails(cutoff: datetime.datetime) -> None:
 | |
|     if not settings.SEND_DIGEST_EMAILS:
 | |
|         return
 | |
| 
 | |
|     weekday = timezone_now().weekday()
 | |
|     for realm in Realm.objects.filter(
 | |
|         deactivated=False, digest_emails_enabled=True, digest_weekday=weekday
 | |
|     ).exclude(string_id__in=settings.SYSTEM_ONLY_REALMS):
 | |
|         _enqueue_emails_for_realm(realm, cutoff)
 | |
| 
 | |
| 
 | |
| def _enqueue_emails_for_realm(realm: Realm, cutoff: datetime.datetime) -> None:
 | |
|     # This should only be called directly by tests.  Use enqueue_emails
 | |
|     # to process all realms that are set up for processing on any given day.
 | |
|     twelve_hours_ago = timezone_now() - datetime.timedelta(hours=12)
 | |
| 
 | |
|     target_users = (
 | |
|         UserProfile.objects.filter(
 | |
|             realm=realm,
 | |
|             is_active=True,
 | |
|             is_bot=False,
 | |
|             enable_digest_emails=True,
 | |
|         )
 | |
|         .alias(
 | |
|             recent_activity=Exists(
 | |
|                 UserActivityInterval.objects.filter(user_profile_id=OuterRef("id"), end__gt=cutoff)
 | |
|             )
 | |
|         )
 | |
|         .filter(recent_activity=False)
 | |
|         .alias(
 | |
|             sent_recent_digest=Exists(
 | |
|                 RealmAuditLog.objects.filter(
 | |
|                     realm_id=realm.id,
 | |
|                     event_type=RealmAuditLog.USER_DIGEST_EMAIL_CREATED,
 | |
|                     event_time__gt=twelve_hours_ago,
 | |
|                     modified_user_id=OuterRef("id"),
 | |
|                 )
 | |
|             )
 | |
|         )
 | |
|         .filter(sent_recent_digest=False)
 | |
|     )
 | |
| 
 | |
|     user_ids = target_users.order_by("id").values_list("id", flat=True)
 | |
| 
 | |
|     # We process batches of 30.  We want a big enough batch
 | |
|     # to amortize work, but not so big that a single item
 | |
|     # from the queue takes too long to process.
 | |
|     chunk_size = 30
 | |
|     for i in range(0, len(user_ids), chunk_size):
 | |
|         chunk_user_ids = list(user_ids[i : i + chunk_size])
 | |
|         queue_digest_user_ids(chunk_user_ids, cutoff)
 | |
|         logger.info(
 | |
|             "Queuing user_ids for potential digest: %s",
 | |
|             chunk_user_ids,
 | |
|         )
 | |
| 
 | |
| 
 | |
| # We cache both by stream-id and cutoff, which ensures the per-stream
 | |
| # cache also does not contain data from old digests
 | |
| @functools.lru_cache(maxsize=500)
 | |
| def get_recent_topics(
 | |
|     realm_id: int,
 | |
|     stream_id: int,
 | |
|     cutoff_date: datetime.datetime,
 | |
| ) -> List[DigestTopic]:
 | |
|     # Gather information about topic conversations, then
 | |
|     # classify by:
 | |
|     #   * topic length
 | |
|     #   * number of senders
 | |
| 
 | |
|     messages = (
 | |
|         # Uses index: zerver_message_realm_recipient_date_sent
 | |
|         Message.objects.filter(
 | |
|             realm_id=realm_id,
 | |
|             recipient__type=Recipient.STREAM,
 | |
|             recipient__type_id=stream_id,
 | |
|             date_sent__gt=cutoff_date,
 | |
|         )
 | |
|         .order_by(
 | |
|             "id",  # we will sample the first few messages
 | |
|         )
 | |
|         .select_related(
 | |
|             "recipient",  # build_message_list looks up recipient.type
 | |
|             "sender",  # we need the sender's full name
 | |
|             "sending_client",  # for Message.sent_by_human
 | |
|         )
 | |
|         .defer(
 | |
|             # This construction, to only fetch the sender's full_name,
 | |
|             # is because `.only()` doesn't work with select_related tables.
 | |
|             *{f"sender__{f.name}" for f in UserProfile._meta.fields if f.name not in {"full_name"}}
 | |
|         )
 | |
|     )
 | |
| 
 | |
|     digest_topic_map: Dict[TopicKey, DigestTopic] = {}
 | |
|     for message in messages:
 | |
|         topic_key = (stream_id, message.topic_name())
 | |
| 
 | |
|         if topic_key not in digest_topic_map:
 | |
|             digest_topic_map[topic_key] = DigestTopic(topic_key)
 | |
| 
 | |
|         digest_topic_map[topic_key].add_message(message)
 | |
| 
 | |
|     topics = list(digest_topic_map.values())
 | |
| 
 | |
|     return topics
 | |
| 
 | |
| 
 | |
| def get_hot_topics(
 | |
|     all_topics: List[DigestTopic],
 | |
|     stream_ids: Set[int],
 | |
| ) -> List[DigestTopic]:
 | |
|     topics = [topic for topic in all_topics if topic.stream_id() in stream_ids]
 | |
| 
 | |
|     hot_topics = heapq.nlargest(2, topics, key=DigestTopic.diversity)
 | |
| 
 | |
|     for topic in heapq.nlargest(
 | |
|         MAX_HOT_TOPICS_TO_BE_INCLUDED_IN_DIGEST, topics, key=DigestTopic.length
 | |
|     ):
 | |
|         if topic not in hot_topics:
 | |
|             hot_topics.append(topic)
 | |
|         if len(hot_topics) == MAX_HOT_TOPICS_TO_BE_INCLUDED_IN_DIGEST:
 | |
|             break
 | |
| 
 | |
|     return hot_topics
 | |
| 
 | |
| 
 | |
| def get_recently_created_streams(realm: Realm, threshold: datetime.datetime) -> List[Stream]:
 | |
|     fields = ["id", "name", "is_web_public", "invite_only"]
 | |
|     return list(get_active_streams(realm).filter(date_created__gt=threshold).only(*fields))
 | |
| 
 | |
| 
 | |
| def gather_new_streams(
 | |
|     realm: Realm,
 | |
|     recently_created_streams: List[Stream],  # streams only need id and name
 | |
|     can_access_public: bool,
 | |
| ) -> Tuple[int, Dict[str, List[str]]]:
 | |
|     if can_access_public:
 | |
|         new_streams = [stream for stream in recently_created_streams if not stream.invite_only]
 | |
|     else:
 | |
|         new_streams = [stream for stream in recently_created_streams if stream.is_web_public]
 | |
| 
 | |
|     base_url = f"{realm.uri}/#narrow/stream/"
 | |
| 
 | |
|     streams_html = []
 | |
|     streams_plain = []
 | |
| 
 | |
|     for stream in new_streams:
 | |
|         narrow_url = base_url + encode_stream(stream.id, stream.name)
 | |
|         stream_link = f"<a href='{narrow_url}'>{stream.name}</a>"
 | |
|         streams_html.append(stream_link)
 | |
|         streams_plain.append(stream.name)
 | |
| 
 | |
|     return len(new_streams), {"html": streams_html, "plain": streams_plain}
 | |
| 
 | |
| 
 | |
| def enough_traffic(hot_conversations: str, new_streams: int) -> bool:
 | |
|     return bool(hot_conversations or new_streams)
 | |
| 
 | |
| 
 | |
| def get_user_stream_map(user_ids: List[int], cutoff_date: datetime.datetime) -> Dict[int, Set[int]]:
 | |
|     """Skipping streams where the user's subscription status has changed
 | |
|     when constructing digests is critical to ensure correctness for
 | |
|     streams without shared history, guest users, and long-term idle
 | |
|     users, because it means that every user has the same view of the
 | |
|     history of a given stream whose message history is being included
 | |
|     (and thus we can share a lot of work).
 | |
| 
 | |
|     The downside is that newly created streams are never included in
 | |
|     the first digest email after their creation.  Should we wish to
 | |
|     change that, we will need to be very careful to avoid creating
 | |
|     bugs for any of those classes of users.
 | |
|     """
 | |
|     events = [
 | |
|         RealmAuditLog.SUBSCRIPTION_CREATED,
 | |
|         RealmAuditLog.SUBSCRIPTION_ACTIVATED,
 | |
|         RealmAuditLog.SUBSCRIPTION_DEACTIVATED,
 | |
|     ]
 | |
|     # This uses the zerver_realmauditlog_user_subscriptions_idx
 | |
|     # partial index on RealmAuditLog which is specifically for those
 | |
|     # three event types.
 | |
|     rows = (
 | |
|         Subscription.objects.filter(
 | |
|             user_profile_id__in=user_ids,
 | |
|             recipient__type=Recipient.STREAM,
 | |
|             active=True,
 | |
|             is_muted=False,
 | |
|         )
 | |
|         .alias(
 | |
|             was_modified=Exists(
 | |
|                 RealmAuditLog.objects.filter(
 | |
|                     modified_stream_id=OuterRef("recipient__type_id"),
 | |
|                     modified_user_id=OuterRef("user_profile_id"),
 | |
|                     event_time__gt=cutoff_date,
 | |
|                     event_type__in=events,
 | |
|                 )
 | |
|             )
 | |
|         )
 | |
|         .filter(was_modified=False)
 | |
|         .values("user_profile_id", "recipient__type_id")
 | |
|     )
 | |
| 
 | |
|     # maps user_id -> {stream_id, stream_id, ...}
 | |
|     dct: Dict[int, Set[int]] = defaultdict(set)
 | |
|     for row in rows:
 | |
|         dct[row["user_profile_id"]].add(row["recipient__type_id"])
 | |
| 
 | |
|     return dct
 | |
| 
 | |
| 
 | |
| def get_slim_stream_id_map(realm: Realm) -> Dict[int, Stream]:
 | |
|     # "slim" because it only fetches the names of the stream objects,
 | |
|     # suitable for passing into build_message_list.
 | |
|     streams = get_active_streams(realm).only("id", "name")
 | |
|     return {stream.id: stream for stream in streams}
 | |
| 
 | |
| 
 | |
| def bulk_get_digest_context(
 | |
|     users: Collection[UserProfile], cutoff: float
 | |
| ) -> Dict[int, Dict[str, Any]]:
 | |
|     # We expect a non-empty list of users all from the same realm.
 | |
|     assert users
 | |
|     realm = next(iter(users)).realm
 | |
|     for user in users:
 | |
|         assert user.realm_id == realm.id
 | |
| 
 | |
|     # Convert from epoch seconds to a datetime object.
 | |
|     cutoff_date = datetime.datetime.fromtimestamp(int(cutoff), tz=datetime.timezone.utc)
 | |
| 
 | |
|     stream_id_map = get_slim_stream_id_map(realm)
 | |
|     recently_created_streams = get_recently_created_streams(realm, cutoff_date)
 | |
| 
 | |
|     user_ids = [user.id for user in users]
 | |
|     user_stream_map = get_user_stream_map(user_ids, cutoff_date)
 | |
| 
 | |
|     result: Dict[int, Dict[str, Any]] = {}
 | |
|     for user in users:
 | |
|         stream_ids = user_stream_map[user.id]
 | |
| 
 | |
|         recent_topics = []
 | |
|         for stream_id in stream_ids:
 | |
|             recent_topics += get_recent_topics(realm.id, stream_id, cutoff_date)
 | |
| 
 | |
|         hot_topics = get_hot_topics(recent_topics, stream_ids)
 | |
| 
 | |
|         context = common_context(user)
 | |
| 
 | |
|         # Start building email template data.
 | |
|         unsubscribe_link = one_click_unsubscribe_link(user, "digest")
 | |
|         context.update(unsubscribe_link=unsubscribe_link)
 | |
| 
 | |
|         # Get context data for hot conversations.
 | |
|         context["hot_conversations"] = [
 | |
|             hot_topic.teaser_data(user, stream_id_map) for hot_topic in hot_topics
 | |
|         ]
 | |
| 
 | |
|         # Gather new streams.
 | |
|         new_streams_count, new_streams = gather_new_streams(
 | |
|             realm=realm,
 | |
|             recently_created_streams=recently_created_streams,
 | |
|             can_access_public=user.can_access_public_streams(),
 | |
|         )
 | |
|         context["new_streams"] = new_streams
 | |
|         context["new_streams_count"] = new_streams_count
 | |
| 
 | |
|         result[user.id] = context
 | |
| 
 | |
|     return result
 | |
| 
 | |
| 
 | |
| def get_digest_context(user: UserProfile, cutoff: float) -> Dict[str, Any]:
 | |
|     return bulk_get_digest_context([user], cutoff)[user.id]
 | |
| 
 | |
| 
 | |
| @transaction.atomic
 | |
| def bulk_handle_digest_email(user_ids: List[int], cutoff: float) -> None:
 | |
|     # We go directly to the database to get user objects,
 | |
|     # since inactive users are likely to not be in the cache.
 | |
|     users = (
 | |
|         UserProfile.objects.filter(id__in=user_ids, is_active=True, realm__deactivated=False)
 | |
|         .order_by("id")
 | |
|         .select_related("realm")
 | |
|     )
 | |
|     context_map = bulk_get_digest_context(users, cutoff)
 | |
| 
 | |
|     digest_users = []
 | |
| 
 | |
|     for user in users:
 | |
|         context = context_map[user.id]
 | |
| 
 | |
|         # We don't want to send emails containing almost no information.
 | |
|         if enough_traffic(context["hot_conversations"], context["new_streams_count"]):
 | |
|             digest_users.append(user)
 | |
|             logger.info("Sending digest email for user %s", user.id)
 | |
|             # Send now, as a ScheduledEmail
 | |
|             send_future_email(
 | |
|                 "zerver/emails/digest",
 | |
|                 user.realm,
 | |
|                 to_user_ids=[user.id],
 | |
|                 from_name="Zulip Digest",
 | |
|                 from_address=FromAddress.no_reply_placeholder,
 | |
|                 context=context,
 | |
|             )
 | |
| 
 | |
|     bulk_write_realm_audit_logs(digest_users)
 | |
| 
 | |
| 
 | |
| def bulk_write_realm_audit_logs(users: List[UserProfile]) -> None:
 | |
|     if not users:
 | |
|         return
 | |
| 
 | |
|     # We write RealmAuditLog rows for auditing, and we will also
 | |
|     # use these rows during the next run to possibly exclude the
 | |
|     # users (if insufficient time has passed).
 | |
|     last_message_id = get_last_message_id()
 | |
|     now = timezone_now()
 | |
| 
 | |
|     log_rows = [
 | |
|         RealmAuditLog(
 | |
|             realm_id=user.realm_id,
 | |
|             modified_user_id=user.id,
 | |
|             event_last_message_id=last_message_id,
 | |
|             event_time=now,
 | |
|             event_type=RealmAuditLog.USER_DIGEST_EMAIL_CREATED,
 | |
|         )
 | |
|         for user in users
 | |
|     ]
 | |
| 
 | |
|     RealmAuditLog.objects.bulk_create(log_rows)
 |