mirror of
				https://github.com/zulip/zulip.git
				synced 2025-11-04 05:53:43 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			446 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			446 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import datetime
 | 
						|
import heapq
 | 
						|
import logging
 | 
						|
from collections import defaultdict
 | 
						|
from typing import Any, Collection, Dict, List, Set, Tuple
 | 
						|
 | 
						|
from django.conf import settings
 | 
						|
from django.db import transaction
 | 
						|
from django.utils.timezone import now as timezone_now
 | 
						|
 | 
						|
from confirmation.models import one_click_unsubscribe_link
 | 
						|
from zerver.context_processors import common_context
 | 
						|
from zerver.lib.email_notifications import build_message_list
 | 
						|
from zerver.lib.logging_util import log_to_file
 | 
						|
from zerver.lib.message import get_last_message_id
 | 
						|
from zerver.lib.queue import queue_json_publish
 | 
						|
from zerver.lib.send_email import FromAddress, send_future_email
 | 
						|
from zerver.lib.url_encoding import encode_stream
 | 
						|
from zerver.models import (
 | 
						|
    Message,
 | 
						|
    Realm,
 | 
						|
    RealmAuditLog,
 | 
						|
    Recipient,
 | 
						|
    Stream,
 | 
						|
    Subscription,
 | 
						|
    UserActivityInterval,
 | 
						|
    UserProfile,
 | 
						|
    get_active_streams,
 | 
						|
)
 | 
						|
 | 
						|
logger = logging.getLogger(__name__)
 | 
						|
log_to_file(logger, settings.DIGEST_LOG_PATH)
 | 
						|
 | 
						|
DIGEST_CUTOFF = 5
 | 
						|
MAX_HOT_TOPICS_TO_BE_INCLUDED_IN_DIGEST = 4
 | 
						|
 | 
						|
TopicKey = Tuple[int, str]
 | 
						|
 | 
						|
 | 
						|
class DigestTopic:
 | 
						|
    def __init__(self, topic_key: TopicKey) -> None:
 | 
						|
        self.topic_key = topic_key
 | 
						|
        self.human_senders: Set[str] = set()
 | 
						|
        self.sample_messages: List[Message] = []
 | 
						|
        self.num_human_messages = 0
 | 
						|
 | 
						|
    def stream_id(self) -> int:
 | 
						|
        # topic_key is (stream_id, topic_name)
 | 
						|
        return self.topic_key[0]
 | 
						|
 | 
						|
    def add_message(self, message: Message) -> None:
 | 
						|
        if len(self.sample_messages) < 2:
 | 
						|
            self.sample_messages.append(message)
 | 
						|
 | 
						|
        if message.sent_by_human():
 | 
						|
            self.human_senders.add(message.sender.full_name)
 | 
						|
            self.num_human_messages += 1
 | 
						|
 | 
						|
    def length(self) -> int:
 | 
						|
        return self.num_human_messages
 | 
						|
 | 
						|
    def diversity(self) -> int:
 | 
						|
        return len(self.human_senders)
 | 
						|
 | 
						|
    def teaser_data(self, user: UserProfile, stream_map: Dict[int, Stream]) -> Dict[str, Any]:
 | 
						|
        teaser_count = self.num_human_messages - len(self.sample_messages)
 | 
						|
        first_few_messages = build_message_list(
 | 
						|
            user=user,
 | 
						|
            messages=self.sample_messages,
 | 
						|
            stream_map=stream_map,
 | 
						|
        )
 | 
						|
        return {
 | 
						|
            "participants": sorted(self.human_senders),
 | 
						|
            "count": teaser_count,
 | 
						|
            "first_few_messages": first_few_messages,
 | 
						|
        }
 | 
						|
 | 
						|
 | 
						|
# Digests accumulate 2 types of interesting traffic for a user:
 | 
						|
# 1. New streams
 | 
						|
# 2. Interesting stream traffic, as determined by the longest and most
 | 
						|
#    diversely comment upon topics.
 | 
						|
 | 
						|
 | 
						|
def should_process_digest(realm_str: str) -> bool:
 | 
						|
    if realm_str in settings.SYSTEM_ONLY_REALMS:
 | 
						|
        # Don't try to send emails to system-only realms
 | 
						|
        return False
 | 
						|
    return True
 | 
						|
 | 
						|
 | 
						|
# Changes to this should also be reflected in
 | 
						|
# zerver/worker/queue_processors.py:DigestWorker.consume()
 | 
						|
def queue_digest_user_ids(user_ids: List[int], cutoff: datetime.datetime) -> None:
 | 
						|
    # Convert cutoff to epoch seconds for transit.
 | 
						|
    event = {"user_ids": user_ids, "cutoff": cutoff.strftime("%s")}
 | 
						|
    queue_json_publish("digest_emails", event)
 | 
						|
 | 
						|
 | 
						|
def enqueue_emails(cutoff: datetime.datetime) -> None:
 | 
						|
    if not settings.SEND_DIGEST_EMAILS:
 | 
						|
        return
 | 
						|
 | 
						|
    weekday = timezone_now().weekday()
 | 
						|
    for realm in Realm.objects.filter(
 | 
						|
        deactivated=False, digest_emails_enabled=True, digest_weekday=weekday
 | 
						|
    ):
 | 
						|
        if should_process_digest(realm.string_id):
 | 
						|
            _enqueue_emails_for_realm(realm, cutoff)
 | 
						|
 | 
						|
 | 
						|
def _enqueue_emails_for_realm(realm: Realm, cutoff: datetime.datetime) -> None:
 | 
						|
    # This should only be called directly by tests.  Use enqueue_emails
 | 
						|
    # to process all realms that are set up for processing on any given day.
 | 
						|
    realm_user_ids = set(
 | 
						|
        UserProfile.objects.filter(
 | 
						|
            realm=realm,
 | 
						|
            is_active=True,
 | 
						|
            is_bot=False,
 | 
						|
            enable_digest_emails=True,
 | 
						|
        ).values_list("id", flat=True)
 | 
						|
    )
 | 
						|
 | 
						|
    twelve_hours_ago = timezone_now() - datetime.timedelta(hours=12)
 | 
						|
 | 
						|
    recent_user_ids = set(
 | 
						|
        RealmAuditLog.objects.filter(
 | 
						|
            realm_id=realm.id,
 | 
						|
            event_type=RealmAuditLog.USER_DIGEST_EMAIL_CREATED,
 | 
						|
            event_time__gt=twelve_hours_ago,
 | 
						|
        )
 | 
						|
        .values_list("modified_user_id", flat=True)
 | 
						|
        .distinct()
 | 
						|
    )
 | 
						|
 | 
						|
    realm_user_ids -= recent_user_ids
 | 
						|
 | 
						|
    active_user_ids = set(
 | 
						|
        UserActivityInterval.objects.filter(
 | 
						|
            user_profile_id__in=realm_user_ids,
 | 
						|
            end__gt=cutoff,
 | 
						|
        )
 | 
						|
        .values_list("user_profile_id", flat=True)
 | 
						|
        .distinct()
 | 
						|
    )
 | 
						|
 | 
						|
    user_ids = list(realm_user_ids - active_user_ids)
 | 
						|
    user_ids.sort()
 | 
						|
 | 
						|
    # We process batches of 30.  We want a big enough batch
 | 
						|
    # to amortize work, but not so big that a single item
 | 
						|
    # from the queue takes too long to process.
 | 
						|
    chunk_size = 30
 | 
						|
    for i in range(0, len(user_ids), chunk_size):
 | 
						|
        chunk_user_ids = user_ids[i : i + chunk_size]
 | 
						|
        queue_digest_user_ids(chunk_user_ids, cutoff)
 | 
						|
        logger.info(
 | 
						|
            "Queuing user_ids for potential digest: %s",
 | 
						|
            chunk_user_ids,
 | 
						|
        )
 | 
						|
 | 
						|
 | 
						|
def get_recent_topics(
 | 
						|
    stream_ids: List[int],
 | 
						|
    cutoff_date: datetime.datetime,
 | 
						|
) -> List[DigestTopic]:
 | 
						|
    # Gather information about topic conversations, then
 | 
						|
    # classify by:
 | 
						|
    #   * topic length
 | 
						|
    #   * number of senders
 | 
						|
 | 
						|
    messages = (
 | 
						|
        Message.objects.filter(
 | 
						|
            recipient__type=Recipient.STREAM,
 | 
						|
            recipient__type_id__in=stream_ids,
 | 
						|
            date_sent__gt=cutoff_date,
 | 
						|
        )
 | 
						|
        .order_by(
 | 
						|
            "id",  # we will sample the first few messages
 | 
						|
        )
 | 
						|
        .select_related(
 | 
						|
            "recipient",  # we need stream_id
 | 
						|
            "sender",  # we need the sender's full name
 | 
						|
            "sending_client",  # for Message.sent_by_human
 | 
						|
        )
 | 
						|
    )
 | 
						|
 | 
						|
    digest_topic_map: Dict[TopicKey, DigestTopic] = {}
 | 
						|
    for message in messages:
 | 
						|
        topic_key = (message.recipient.type_id, message.topic_name())
 | 
						|
 | 
						|
        if topic_key not in digest_topic_map:
 | 
						|
            digest_topic_map[topic_key] = DigestTopic(topic_key)
 | 
						|
 | 
						|
        digest_topic_map[topic_key].add_message(message)
 | 
						|
 | 
						|
    topics = list(digest_topic_map.values())
 | 
						|
 | 
						|
    return topics
 | 
						|
 | 
						|
 | 
						|
def get_hot_topics(
 | 
						|
    all_topics: List[DigestTopic],
 | 
						|
    stream_ids: Set[int],
 | 
						|
) -> List[DigestTopic]:
 | 
						|
    topics = [topic for topic in all_topics if topic.stream_id() in stream_ids]
 | 
						|
 | 
						|
    hot_topics = heapq.nlargest(2, topics, key=DigestTopic.diversity)
 | 
						|
 | 
						|
    for topic in heapq.nlargest(
 | 
						|
        MAX_HOT_TOPICS_TO_BE_INCLUDED_IN_DIGEST, topics, key=DigestTopic.length
 | 
						|
    ):
 | 
						|
        if topic not in hot_topics:
 | 
						|
            hot_topics.append(topic)
 | 
						|
        if len(hot_topics) == MAX_HOT_TOPICS_TO_BE_INCLUDED_IN_DIGEST:
 | 
						|
            break
 | 
						|
 | 
						|
    return hot_topics
 | 
						|
 | 
						|
 | 
						|
def get_recent_streams(realm: Realm, threshold: datetime.datetime) -> List[Stream]:
 | 
						|
    fields = ["id", "name", "is_web_public", "invite_only"]
 | 
						|
    return list(get_active_streams(realm).filter(date_created__gt=threshold).only(*fields))
 | 
						|
 | 
						|
 | 
						|
def gather_new_streams(
 | 
						|
    realm: Realm,
 | 
						|
    recent_streams: List[Stream],  # streams only need id and name
 | 
						|
    can_access_public: bool,
 | 
						|
) -> Tuple[int, Dict[str, List[str]]]:
 | 
						|
    if can_access_public:
 | 
						|
        new_streams = [stream for stream in recent_streams if not stream.invite_only]
 | 
						|
    else:
 | 
						|
        new_streams = [stream for stream in recent_streams if stream.is_web_public]
 | 
						|
 | 
						|
    base_url = f"{realm.uri}/#narrow/stream/"
 | 
						|
 | 
						|
    streams_html = []
 | 
						|
    streams_plain = []
 | 
						|
 | 
						|
    for stream in new_streams:
 | 
						|
        narrow_url = base_url + encode_stream(stream.id, stream.name)
 | 
						|
        stream_link = f"<a href='{narrow_url}'>{stream.name}</a>"
 | 
						|
        streams_html.append(stream_link)
 | 
						|
        streams_plain.append(stream.name)
 | 
						|
 | 
						|
    return len(new_streams), {"html": streams_html, "plain": streams_plain}
 | 
						|
 | 
						|
 | 
						|
def enough_traffic(hot_conversations: str, new_streams: int) -> bool:
 | 
						|
    return bool(hot_conversations or new_streams)
 | 
						|
 | 
						|
 | 
						|
def get_user_stream_map(user_ids: List[int]) -> Dict[int, Set[int]]:
 | 
						|
    rows = Subscription.objects.filter(
 | 
						|
        user_profile_id__in=user_ids,
 | 
						|
        recipient__type=Recipient.STREAM,
 | 
						|
        active=True,
 | 
						|
        is_muted=False,
 | 
						|
    ).values("user_profile_id", "recipient__type_id")
 | 
						|
 | 
						|
    # maps user_id -> {stream_id, stream_id, ...}
 | 
						|
    dct: Dict[int, Set[int]] = defaultdict(set)
 | 
						|
    for row in rows:
 | 
						|
        dct[row["user_profile_id"]].add(row["recipient__type_id"])
 | 
						|
 | 
						|
    return dct
 | 
						|
 | 
						|
 | 
						|
def get_slim_stream_map(stream_ids: Set[int]) -> Dict[int, Stream]:
 | 
						|
    # This can be passed to build_message_list.
 | 
						|
    streams = Stream.objects.filter(
 | 
						|
        id__in=stream_ids,
 | 
						|
    ).only("id", "name")
 | 
						|
 | 
						|
    return {stream.id: stream for stream in streams}
 | 
						|
 | 
						|
 | 
						|
def bulk_get_digest_context(
 | 
						|
    users: Collection[UserProfile], cutoff: float
 | 
						|
) -> Dict[int, Dict[str, Any]]:
 | 
						|
    # We expect a non-empty list of users all from the same realm.
 | 
						|
    assert users
 | 
						|
    realm = next(iter(users)).realm
 | 
						|
    for user in users:
 | 
						|
        assert user.realm_id == realm.id
 | 
						|
 | 
						|
    # Convert from epoch seconds to a datetime object.
 | 
						|
    cutoff_date = datetime.datetime.fromtimestamp(int(cutoff), tz=datetime.timezone.utc)
 | 
						|
 | 
						|
    result: Dict[int, Dict[str, Any]] = {}
 | 
						|
 | 
						|
    user_ids = [user.id for user in users]
 | 
						|
 | 
						|
    user_stream_map = get_user_stream_map(user_ids)
 | 
						|
 | 
						|
    recently_modified_streams = get_modified_streams(user_ids, cutoff_date)
 | 
						|
 | 
						|
    all_stream_ids = set()
 | 
						|
 | 
						|
    for user in users:
 | 
						|
        stream_ids = user_stream_map[user.id]
 | 
						|
        stream_ids -= recently_modified_streams.get(user.id, set())
 | 
						|
        all_stream_ids |= stream_ids
 | 
						|
 | 
						|
    # Get all the recent topics for all the users.  This does the heavy
 | 
						|
    # lifting of making an expensive query to the Message table.  Then
 | 
						|
    # for each user, we filter to just the streams they care about.
 | 
						|
    recent_topics = get_recent_topics(sorted(all_stream_ids), cutoff_date)
 | 
						|
 | 
						|
    stream_map = get_slim_stream_map(all_stream_ids)
 | 
						|
 | 
						|
    recent_streams = get_recent_streams(realm, cutoff_date)
 | 
						|
 | 
						|
    for user in users:
 | 
						|
        stream_ids = user_stream_map[user.id]
 | 
						|
 | 
						|
        hot_topics = get_hot_topics(recent_topics, stream_ids)
 | 
						|
 | 
						|
        context = common_context(user)
 | 
						|
 | 
						|
        # Start building email template data.
 | 
						|
        unsubscribe_link = one_click_unsubscribe_link(user, "digest")
 | 
						|
        context.update(unsubscribe_link=unsubscribe_link)
 | 
						|
 | 
						|
        # Get context data for hot conversations.
 | 
						|
        context["hot_conversations"] = [
 | 
						|
            hot_topic.teaser_data(user, stream_map) for hot_topic in hot_topics
 | 
						|
        ]
 | 
						|
 | 
						|
        # Gather new streams.
 | 
						|
        new_streams_count, new_streams = gather_new_streams(
 | 
						|
            realm=realm,
 | 
						|
            recent_streams=recent_streams,
 | 
						|
            can_access_public=user.can_access_public_streams(),
 | 
						|
        )
 | 
						|
        context["new_streams"] = new_streams
 | 
						|
        context["new_streams_count"] = new_streams_count
 | 
						|
 | 
						|
        result[user.id] = context
 | 
						|
 | 
						|
    return result
 | 
						|
 | 
						|
 | 
						|
def get_digest_context(user: UserProfile, cutoff: float) -> Dict[str, Any]:
 | 
						|
    return bulk_get_digest_context([user], cutoff)[user.id]
 | 
						|
 | 
						|
 | 
						|
@transaction.atomic
 | 
						|
def bulk_handle_digest_email(user_ids: List[int], cutoff: float) -> None:
 | 
						|
    # We go directly to the database to get user objects,
 | 
						|
    # since inactive users are likely to not be in the cache.
 | 
						|
    users = (
 | 
						|
        UserProfile.objects.filter(id__in=user_ids, is_active=True, realm__deactivated=False)
 | 
						|
        .order_by("id")
 | 
						|
        .select_related("realm")
 | 
						|
    )
 | 
						|
    context_map = bulk_get_digest_context(users, cutoff)
 | 
						|
 | 
						|
    digest_users = []
 | 
						|
 | 
						|
    for user in users:
 | 
						|
        context = context_map[user.id]
 | 
						|
 | 
						|
        # We don't want to send emails containing almost no information.
 | 
						|
        if enough_traffic(context["hot_conversations"], context["new_streams_count"]):
 | 
						|
            digest_users.append(user)
 | 
						|
            logger.info("Sending digest email for user %s", user.id)
 | 
						|
            # Send now, as a ScheduledEmail
 | 
						|
            send_future_email(
 | 
						|
                "zerver/emails/digest",
 | 
						|
                user.realm,
 | 
						|
                to_user_ids=[user.id],
 | 
						|
                from_name="Zulip Digest",
 | 
						|
                from_address=FromAddress.no_reply_placeholder,
 | 
						|
                context=context,
 | 
						|
            )
 | 
						|
 | 
						|
    bulk_write_realm_audit_logs(digest_users)
 | 
						|
 | 
						|
 | 
						|
def bulk_write_realm_audit_logs(users: List[UserProfile]) -> None:
 | 
						|
    if not users:
 | 
						|
        return
 | 
						|
 | 
						|
    # We write RealmAuditLog rows for auditing, and we will also
 | 
						|
    # use these rows during the next run to possibly exclude the
 | 
						|
    # users (if insufficient time has passed).
 | 
						|
    last_message_id = get_last_message_id()
 | 
						|
    now = timezone_now()
 | 
						|
 | 
						|
    log_rows = [
 | 
						|
        RealmAuditLog(
 | 
						|
            realm_id=user.realm_id,
 | 
						|
            modified_user_id=user.id,
 | 
						|
            event_last_message_id=last_message_id,
 | 
						|
            event_time=now,
 | 
						|
            event_type=RealmAuditLog.USER_DIGEST_EMAIL_CREATED,
 | 
						|
        )
 | 
						|
        for user in users
 | 
						|
    ]
 | 
						|
 | 
						|
    RealmAuditLog.objects.bulk_create(log_rows)
 | 
						|
 | 
						|
 | 
						|
def get_modified_streams(
 | 
						|
    user_ids: List[int], cutoff_date: datetime.datetime
 | 
						|
) -> Dict[int, Set[int]]:
 | 
						|
    """Skipping streams where the user's subscription status has changed
 | 
						|
    when constructing digests is critical to ensure correctness for
 | 
						|
    streams without shared history, guest users, and long-term idle
 | 
						|
    users, because it means that every user has the same view of the
 | 
						|
    history of a given stream whose message history is being included
 | 
						|
    (and thus we can share a lot of work).
 | 
						|
 | 
						|
    The downside is that newly created streams are never included in
 | 
						|
    the first digest email after their creation.  Should we wish to
 | 
						|
    change that, we will need to be very careful to avoid creating
 | 
						|
    bugs for any of those classes of users.
 | 
						|
    """
 | 
						|
    events = [
 | 
						|
        RealmAuditLog.SUBSCRIPTION_CREATED,
 | 
						|
        RealmAuditLog.SUBSCRIPTION_ACTIVATED,
 | 
						|
        RealmAuditLog.SUBSCRIPTION_DEACTIVATED,
 | 
						|
    ]
 | 
						|
 | 
						|
    # Get rows where the users' subscriptions have changed.
 | 
						|
    rows = (
 | 
						|
        RealmAuditLog.objects.filter(
 | 
						|
            modified_user_id__in=user_ids,
 | 
						|
            event_time__gt=cutoff_date,
 | 
						|
            event_type__in=events,
 | 
						|
        )
 | 
						|
        .values("modified_user_id", "modified_stream_id")
 | 
						|
        .distinct()
 | 
						|
    )
 | 
						|
 | 
						|
    result: Dict[int, Set[int]] = defaultdict(set)
 | 
						|
 | 
						|
    for row in rows:
 | 
						|
        user_id = row["modified_user_id"]
 | 
						|
        stream_id = row["modified_stream_id"]
 | 
						|
        result[user_id].add(stream_id)
 | 
						|
 | 
						|
    return result
 |