digest: Enqueue emails as we generate the contexts.

There is now no longer any reason to have the scheduled_email
enqueuing wait until all of the users' contexts have been generated.
Switch to returning the contexts as an iterator, and send them as we
compute them.
This commit is contained in:
Alex Vandiver
2023-09-08 19:36:13 +00:00
committed by Tim Abbott
parent b9f72bdd68
commit 39358f77dd

View File

@@ -311,7 +311,7 @@ def get_slim_stream_id_map(realm: Realm) -> Dict[int, Stream]:
def bulk_get_digest_context(
users: Collection[UserProfile], cutoff: float
) -> Dict[int, Dict[str, Any]]:
) -> Iterator[Tuple[UserProfile, Dict[str, Any]]]:
# We expect a non-empty list of users all from the same realm.
assert users
realm = next(iter(users)).realm
@@ -327,7 +327,6 @@ def bulk_get_digest_context(
user_ids = [user.id for user in users]
user_stream_map = get_user_stream_map(user_ids, cutoff_date)
result: Dict[int, Dict[str, Any]] = {}
for user in users:
stream_ids = user_stream_map[user.id]
@@ -357,13 +356,13 @@ def bulk_get_digest_context(
context["new_streams"] = new_streams
context["new_streams_count"] = new_streams_count
result[user.id] = context
return result
yield user, context
def get_digest_context(user: UserProfile, cutoff: float) -> Dict[str, Any]:
return bulk_get_digest_context([user], cutoff)[user.id]
for _, context in bulk_get_digest_context([user], cutoff):
return context
raise AssertionError("Unreachable")
@transaction.atomic
@@ -375,13 +374,9 @@ def bulk_handle_digest_email(user_ids: List[int], cutoff: float) -> None:
.order_by("id")
.select_related("realm")
)
context_map = bulk_get_digest_context(users, cutoff)
digest_users = []
for user in users:
context = context_map[user.id]
for user, context in bulk_get_digest_context(users, cutoff):
# We don't want to send emails containing almost no information.
if not enough_traffic(context["hot_conversations"], context["new_streams_count"]):
continue