mirror of
https://github.com/zulip/zulip.git
synced 2025-11-21 15:09:34 +00:00
email digests: Introduce bulk methods for digest.
Note that we are not changing anything semantically or algorithmically yet. The only overhead here for the single-user case is boxing and unboxing data into single-item dicts and lists. The interfaces for callers in the view and the queue processor remain the same for now.
This commit is contained in:
@@ -218,58 +218,71 @@ def gather_new_streams(user_profile: UserProfile,
|
|||||||
def enough_traffic(hot_conversations: str, new_streams: int) -> bool:
|
def enough_traffic(hot_conversations: str, new_streams: int) -> bool:
|
||||||
return bool(hot_conversations or new_streams)
|
return bool(hot_conversations or new_streams)
|
||||||
|
|
||||||
def get_digest_context(user: UserProfile, cutoff: float) -> Dict[str, Any]:
|
def bulk_get_digest_context(users: List[UserProfile], cutoff: float) -> Dict[int, Dict[str, Any]]:
|
||||||
# Convert from epoch seconds to a datetime object.
|
# Convert from epoch seconds to a datetime object.
|
||||||
cutoff_date = datetime.datetime.fromtimestamp(int(cutoff), tz=datetime.timezone.utc)
|
cutoff_date = datetime.datetime.fromtimestamp(int(cutoff), tz=datetime.timezone.utc)
|
||||||
|
|
||||||
context = common_context(user)
|
result: Dict[int, Dict[str, Any]] = {}
|
||||||
|
|
||||||
# Start building email template data.
|
for user in users:
|
||||||
unsubscribe_link = one_click_unsubscribe_link(user, "digest")
|
context = common_context(user)
|
||||||
context.update(unsubscribe_link=unsubscribe_link)
|
|
||||||
|
|
||||||
home_view_streams = Subscription.objects.filter(
|
# Start building email template data.
|
||||||
user_profile=user,
|
unsubscribe_link = one_click_unsubscribe_link(user, "digest")
|
||||||
recipient__type=Recipient.STREAM,
|
context.update(unsubscribe_link=unsubscribe_link)
|
||||||
active=True,
|
|
||||||
is_muted=False,
|
|
||||||
).values_list('recipient__type_id', flat=True)
|
|
||||||
|
|
||||||
if not user.long_term_idle:
|
home_view_streams = Subscription.objects.filter(
|
||||||
stream_ids = home_view_streams
|
user_profile=user,
|
||||||
else:
|
recipient__type=Recipient.STREAM,
|
||||||
stream_ids = exclude_subscription_modified_streams(user, home_view_streams, cutoff_date)
|
active=True,
|
||||||
|
is_muted=False,
|
||||||
|
).values_list('recipient__type_id', flat=True)
|
||||||
|
|
||||||
topic_activity = get_recent_topic_activity(stream_ids, cutoff_date)
|
if not user.long_term_idle:
|
||||||
hot_topics = get_hot_topics(topic_activity)
|
stream_ids = home_view_streams
|
||||||
|
else:
|
||||||
|
stream_ids = exclude_subscription_modified_streams(user, home_view_streams, cutoff_date)
|
||||||
|
|
||||||
# Gather hot conversations.
|
topic_activity = get_recent_topic_activity(stream_ids, cutoff_date)
|
||||||
context["hot_conversations"] = gather_hot_topics(user, hot_topics, topic_activity)
|
hot_topics = get_hot_topics(topic_activity)
|
||||||
|
|
||||||
# Gather new streams.
|
# Gather hot conversations.
|
||||||
new_streams_count, new_streams = gather_new_streams(user, cutoff_date)
|
context["hot_conversations"] = gather_hot_topics(user, hot_topics, topic_activity)
|
||||||
context["new_streams"] = new_streams
|
|
||||||
context["new_streams_count"] = new_streams_count
|
|
||||||
|
|
||||||
return context
|
# Gather new streams.
|
||||||
|
new_streams_count, new_streams = gather_new_streams(user, cutoff_date)
|
||||||
|
context["new_streams"] = new_streams
|
||||||
|
context["new_streams_count"] = new_streams_count
|
||||||
|
|
||||||
|
result[user.id] = context
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
def get_digest_context(user: UserProfile, cutoff: float) -> Dict[str, Any]:
|
||||||
|
return bulk_get_digest_context([user], cutoff)[user.id]
|
||||||
|
|
||||||
|
def bulk_handle_digest_email(user_ids: List[int], cutoff: float) -> None:
|
||||||
|
users = [get_user_profile_by_id(user_id) for user_id in user_ids]
|
||||||
|
context_map = bulk_get_digest_context(users, cutoff)
|
||||||
|
|
||||||
|
for user in users:
|
||||||
|
context = context_map[user.id]
|
||||||
|
|
||||||
|
# We don't want to send emails containing almost no information.
|
||||||
|
if enough_traffic(context["hot_conversations"], context["new_streams_count"]):
|
||||||
|
logger.info("Sending digest email for user %s", user.id)
|
||||||
|
# Send now, as a ScheduledEmail
|
||||||
|
send_future_email(
|
||||||
|
'zerver/emails/digest',
|
||||||
|
user.realm,
|
||||||
|
to_user_ids=[user.id],
|
||||||
|
from_name="Zulip Digest",
|
||||||
|
from_address=FromAddress.no_reply_placeholder,
|
||||||
|
context=context,
|
||||||
|
)
|
||||||
|
|
||||||
def handle_digest_email(user_id: int, cutoff: float) -> None:
|
def handle_digest_email(user_id: int, cutoff: float) -> None:
|
||||||
user = get_user_profile_by_id(user_id)
|
bulk_handle_digest_email([user_id], cutoff)
|
||||||
context = get_digest_context(user, cutoff)
|
|
||||||
|
|
||||||
# We don't want to send emails containing almost no information.
|
|
||||||
if enough_traffic(context["hot_conversations"], context["new_streams_count"]):
|
|
||||||
logger.info("Sending digest email for user %s", user.id)
|
|
||||||
# Send now, as a ScheduledEmail
|
|
||||||
send_future_email(
|
|
||||||
'zerver/emails/digest',
|
|
||||||
user.realm,
|
|
||||||
to_user_ids=[user.id],
|
|
||||||
from_name="Zulip Digest",
|
|
||||||
from_address=FromAddress.no_reply_placeholder,
|
|
||||||
context=context,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def exclude_subscription_modified_streams(user_profile: UserProfile,
|
def exclude_subscription_modified_streams(user_profile: UserProfile,
|
||||||
stream_ids: List[int],
|
stream_ids: List[int],
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ from django.utils.timezone import now as timezone_now
|
|||||||
from confirmation.models import one_click_unsubscribe_link
|
from confirmation.models import one_click_unsubscribe_link
|
||||||
from zerver.lib.actions import do_create_user
|
from zerver.lib.actions import do_create_user
|
||||||
from zerver.lib.digest import (
|
from zerver.lib.digest import (
|
||||||
|
bulk_handle_digest_email,
|
||||||
enqueue_emails,
|
enqueue_emails,
|
||||||
exclude_subscription_modified_streams,
|
exclude_subscription_modified_streams,
|
||||||
gather_new_streams,
|
gather_new_streams,
|
||||||
@@ -168,17 +169,14 @@ class TestDigestEmailMessages(ZulipTestCase):
|
|||||||
one_click_unsubscribe_link(digest_users[0], 'digest')
|
one_click_unsubscribe_link(digest_users[0], 'digest')
|
||||||
|
|
||||||
with mock.patch('zerver.lib.digest.send_future_email') as mock_send_future_email:
|
with mock.patch('zerver.lib.digest.send_future_email') as mock_send_future_email:
|
||||||
keep_cache_warm = False
|
digest_user_ids = [user.id for user in digest_users]
|
||||||
|
|
||||||
for digest_user in digest_users:
|
with queries_captured() as queries:
|
||||||
with queries_captured(keep_cache_warm=keep_cache_warm) as queries:
|
with cache_tries_captured() as cache_tries:
|
||||||
with cache_tries_captured() as cache_tries:
|
bulk_handle_digest_email(digest_user_ids, cutoff)
|
||||||
handle_digest_email(digest_user.id, cutoff)
|
|
||||||
|
|
||||||
keep_cache_warm = True
|
self.assert_length(queries, 40)
|
||||||
|
self.assert_length(cache_tries, 4)
|
||||||
self.assert_length(queries, 10)
|
|
||||||
self.assert_length(cache_tries, 1)
|
|
||||||
|
|
||||||
self.assertEqual(mock_send_future_email.call_count, len(digest_users))
|
self.assertEqual(mock_send_future_email.call_count, len(digest_users))
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user