actions: Move part into zerver.lib.subscription_info.

Signed-off-by: Anders Kaseorg <anders@zulip.com>
(cherry picked from commit 9dd7e34ab3)
This commit is contained in:
Anders Kaseorg
2022-04-14 14:45:12 -07:00
committed by Tim Abbott
parent 508c676f61
commit b54240d6cf
6 changed files with 332 additions and 333 deletions

View File

@@ -1,10 +1,8 @@
import datetime
import hashlib
import itertools
import logging
from collections import defaultdict
from dataclasses import asdict, dataclass, field
from operator import itemgetter
from typing import (
AbstractSet,
Any,
@@ -24,15 +22,13 @@ from typing import (
import orjson
from django.conf import settings
from django.core.exceptions import ValidationError
from django.db import IntegrityError, connection, transaction
from django.db import IntegrityError, transaction
from django.db.models import F
from django.db.models.query import QuerySet
from django.utils.html import escape
from django.utils.timezone import now as timezone_now
from django.utils.translation import gettext as _
from django.utils.translation import gettext_lazy
from django.utils.translation import override as override_language
from psycopg2.sql import SQL
from typing_extensions import TypedDict
from analytics.lib.counts import COUNT_STATS, do_increment_logging_stat
@@ -122,7 +118,6 @@ from zerver.lib.stream_subscription import (
bulk_get_subscriber_peer_info,
get_active_subscriptions_for_stream_id,
get_bulk_stream_subscriber_info,
get_stream_subscriptions_for_user,
get_subscriptions_for_send_message,
get_used_colors_for_user_ids,
num_subscribers_for_stream_id,
@@ -142,10 +137,9 @@ from zerver.lib.streams import (
get_stream_permission_policy_name,
render_stream_description,
send_stream_creation_event,
subscribed_to_stream,
)
from zerver.lib.string_validation import check_stream_name, check_stream_topic
from zerver.lib.subscription_info import build_stream_dict_for_never_sub, build_stream_dict_for_sub
from zerver.lib.subscription_info import get_subscribers_query
from zerver.lib.timestamp import datetime_to_timestamp, timestamp_to_datetime
from zerver.lib.timezone import canonicalize_timezone
from zerver.lib.topic import (
@@ -160,16 +154,7 @@ from zerver.lib.topic import (
update_edit_history,
update_messages_for_topic_edit,
)
from zerver.lib.types import (
EditHistoryEvent,
NeverSubscribedStreamDict,
ProfileDataElementValue,
ProfileFieldData,
RawStreamDict,
RawSubscriptionDict,
SubscriptionInfo,
SubscriptionStreamDict,
)
from zerver.lib.types import EditHistoryEvent, ProfileDataElementValue, ProfileFieldData
from zerver.lib.upload import delete_avatar_image
from zerver.lib.user_counts import realm_user_count, realm_user_count_by_role
from zerver.lib.user_groups import (
@@ -224,7 +209,6 @@ from zerver.models import (
active_user_ids,
bot_owner_user_ids,
custom_profile_fields_for_realm,
get_active_streams,
get_bot_dicts_in_realm,
get_bot_services,
get_client,
@@ -3324,168 +3308,6 @@ def internal_send_huddle_message(
return message_ids[0]
def validate_user_access_to_subscribers(
user_profile: Optional[UserProfile], stream: Stream
) -> None:
"""Validates whether the user can view the subscribers of a stream. Raises a JsonableError if:
* The user and the stream are in different realms
* The realm is MIT and the stream is not invite only.
* The stream is invite only, requesting_user is passed, and that user
does not subscribe to the stream.
"""
validate_user_access_to_subscribers_helper(
user_profile,
{
"realm_id": stream.realm_id,
"is_web_public": stream.is_web_public,
"invite_only": stream.invite_only,
},
# We use a lambda here so that we only compute whether the
# user is subscribed if we have to
lambda user_profile: subscribed_to_stream(user_profile, stream.id),
)
def validate_user_access_to_subscribers_helper(
user_profile: Optional[UserProfile],
stream_dict: Mapping[str, Any],
check_user_subscribed: Callable[[UserProfile], bool],
) -> None:
"""Helper for validate_user_access_to_subscribers that doesn't require
a full stream object. This function is a bit hard to read,
because it is carefully optimized for performance in the two code
paths we call it from:
* In `bulk_get_subscriber_user_ids`, we already know whether the
user was subscribed via `sub_dict`, and so we want to avoid a
database query at all (especially since it calls this in a loop);
* In `validate_user_access_to_subscribers`, we want to only check
if the user is subscribed when we absolutely have to, since it
costs a database query.
The `check_user_subscribed` argument is a function that reports
whether the user is subscribed to the stream.
Note also that we raise a ValidationError in cases where the
caller is doing the wrong thing (maybe these should be
AssertionErrors), and JsonableError for 400 type errors.
"""
if user_profile is None:
raise ValidationError("Missing user to validate access for")
if user_profile.realm_id != stream_dict["realm_id"]:
raise ValidationError("Requesting user not in given realm")
# Even guest users can access subscribers to web-public streams,
# since they can freely become subscribers to these streams.
if stream_dict["is_web_public"]:
return
# With the exception of web-public streams, a guest must
# be subscribed to a stream (even a public one) in order
# to see subscribers.
if user_profile.is_guest:
if check_user_subscribed(user_profile):
return
# We could explicitly handle the case where guests aren't
# subscribed here in an `else` statement or we can fall
# through to the subsequent logic. Tim prefers the latter.
# Adding an `else` would ensure better code coverage.
if not user_profile.can_access_public_streams() and not stream_dict["invite_only"]:
raise JsonableError(_("Subscriber data is not available for this stream"))
# Organization administrators can view subscribers for all streams.
if user_profile.is_realm_admin:
return
if stream_dict["invite_only"] and not check_user_subscribed(user_profile):
raise JsonableError(_("Unable to retrieve subscribers for private stream"))
def bulk_get_subscriber_user_ids(
stream_dicts: Collection[Mapping[str, Any]],
user_profile: UserProfile,
subscribed_stream_ids: Set[int],
) -> Dict[int, List[int]]:
"""sub_dict maps stream_id => whether the user is subscribed to that stream."""
target_stream_dicts = []
for stream_dict in stream_dicts:
stream_id = stream_dict["id"]
is_subscribed = stream_id in subscribed_stream_ids
try:
validate_user_access_to_subscribers_helper(
user_profile,
stream_dict,
lambda user_profile: is_subscribed,
)
except JsonableError:
continue
target_stream_dicts.append(stream_dict)
recip_to_stream_id = {stream["recipient_id"]: stream["id"] for stream in target_stream_dicts}
recipient_ids = sorted(stream["recipient_id"] for stream in target_stream_dicts)
result: Dict[int, List[int]] = {stream["id"]: [] for stream in stream_dicts}
if not recipient_ids:
return result
"""
The raw SQL below leads to more than a 2x speedup when tested with
20k+ total subscribers. (For large realms with lots of default
streams, this function deals with LOTS of data, so it is important
to optimize.)
"""
query = SQL(
"""
SELECT
zerver_subscription.recipient_id,
zerver_subscription.user_profile_id
FROM
zerver_subscription
WHERE
zerver_subscription.recipient_id in %(recipient_ids)s AND
zerver_subscription.active AND
zerver_subscription.is_user_active
ORDER BY
zerver_subscription.recipient_id,
zerver_subscription.user_profile_id
"""
)
cursor = connection.cursor()
cursor.execute(query, {"recipient_ids": tuple(recipient_ids)})
rows = cursor.fetchall()
cursor.close()
"""
Using groupby/itemgetter here is important for performance, at scale.
It makes it so that all interpreter overhead is just O(N) in nature.
"""
for recip_id, recip_rows in itertools.groupby(rows, itemgetter(0)):
user_profile_ids = [r[1] for r in recip_rows]
stream_id = recip_to_stream_id[recip_id]
result[stream_id] = list(user_profile_ids)
return result
def get_subscribers_query(stream: Stream, requesting_user: Optional[UserProfile]) -> QuerySet:
# TODO: Make a generic stub for QuerySet
"""Build a query to get the subscribers list for a stream, raising a JsonableError if:
'realm' is optional in stream.
The caller can refine this query with select_related(), values(), etc. depending
on whether it wants objects or just certain fields
"""
validate_user_access_to_subscribers(requesting_user, stream)
return get_active_subscriptions_for_stream_id(stream.id, include_deactivated_users=False)
def get_subscriber_ids(stream: Stream, requesting_user: Optional[UserProfile] = None) -> List[str]:
subscriptions_query = get_subscribers_query(stream, requesting_user)
return subscriptions_query.values_list("user_profile_id", flat=True)
@@ -6455,146 +6277,6 @@ def do_delete_messages_by_sender(user: UserProfile) -> None:
move_messages_to_archive(message_ids, chunk_size=retention.STREAM_MESSAGE_BATCH_SIZE)
# In general, it's better to avoid using .values() because it makes
# the code pretty ugly, but in this case, it has significant
# performance impact for loading / for users with large numbers of
# subscriptions, so it's worth optimizing.
def gather_subscriptions_helper(
user_profile: UserProfile,
include_subscribers: bool = True,
) -> SubscriptionInfo:
realm = user_profile.realm
all_streams: QuerySet[RawStreamDict] = get_active_streams(realm).values(
*Stream.API_FIELDS,
# The realm_id and recipient_id are generally not needed in the API.
"realm_id",
"recipient_id",
# email_token isn't public to some users with access to
# the stream, so doesn't belong in API_FIELDS.
"email_token",
)
recip_id_to_stream_id: Dict[int, int] = {
stream["recipient_id"]: stream["id"] for stream in all_streams
}
all_streams_map: Dict[int, RawStreamDict] = {stream["id"]: stream for stream in all_streams}
sub_dicts_query: Iterable[RawSubscriptionDict] = (
get_stream_subscriptions_for_user(user_profile)
.values(
*Subscription.API_FIELDS,
"recipient_id",
"active",
)
.order_by("recipient_id")
)
# We only care about subscriptions for active streams.
sub_dicts: List[RawSubscriptionDict] = [
sub_dict
for sub_dict in sub_dicts_query
if recip_id_to_stream_id.get(sub_dict["recipient_id"])
]
def get_stream_id(sub_dict: RawSubscriptionDict) -> int:
return recip_id_to_stream_id[sub_dict["recipient_id"]]
traffic_stream_ids = {get_stream_id(sub_dict) for sub_dict in sub_dicts}
recent_traffic = get_streams_traffic(stream_ids=traffic_stream_ids)
# Okay, now we finally get to populating our main results, which
# will be these three lists.
subscribed: List[SubscriptionStreamDict] = []
unsubscribed: List[SubscriptionStreamDict] = []
never_subscribed: List[NeverSubscribedStreamDict] = []
sub_unsub_stream_ids = set()
for sub_dict in sub_dicts:
stream_id = get_stream_id(sub_dict)
sub_unsub_stream_ids.add(stream_id)
raw_stream_dict = all_streams_map[stream_id]
stream_dict = build_stream_dict_for_sub(
user=user_profile,
sub_dict=sub_dict,
raw_stream_dict=raw_stream_dict,
recent_traffic=recent_traffic,
)
# is_active is represented in this structure by which list we include it in.
is_active = sub_dict["active"]
if is_active:
subscribed.append(stream_dict)
else:
unsubscribed.append(stream_dict)
if user_profile.can_access_public_streams():
never_subscribed_stream_ids = set(all_streams_map) - sub_unsub_stream_ids
else:
web_public_stream_ids = {stream["id"] for stream in all_streams if stream["is_web_public"]}
never_subscribed_stream_ids = web_public_stream_ids - sub_unsub_stream_ids
never_subscribed_streams = [
all_streams_map[stream_id] for stream_id in never_subscribed_stream_ids
]
for raw_stream_dict in never_subscribed_streams:
is_public = not raw_stream_dict["invite_only"]
if is_public or user_profile.is_realm_admin:
slim_stream_dict = build_stream_dict_for_never_sub(
raw_stream_dict=raw_stream_dict, recent_traffic=recent_traffic
)
never_subscribed.append(slim_stream_dict)
if include_subscribers:
# The highly optimized bulk_get_subscriber_user_ids wants to know which
# streams we are subscribed to, for validation purposes, and it uses that
# info to know if it's allowed to find OTHER subscribers.
subscribed_stream_ids = {
get_stream_id(sub_dict) for sub_dict in sub_dicts if sub_dict["active"]
}
subscriber_map = bulk_get_subscriber_user_ids(
all_streams,
user_profile,
subscribed_stream_ids,
)
for lst in [subscribed, unsubscribed]:
for stream_dict in lst:
assert isinstance(stream_dict["stream_id"], int)
stream_id = stream_dict["stream_id"]
stream_dict["subscribers"] = subscriber_map[stream_id]
for slim_stream_dict in never_subscribed:
assert isinstance(slim_stream_dict["stream_id"], int)
stream_id = slim_stream_dict["stream_id"]
slim_stream_dict["subscribers"] = subscriber_map[stream_id]
subscribed.sort(key=lambda x: x["name"])
unsubscribed.sort(key=lambda x: x["name"])
never_subscribed.sort(key=lambda x: x["name"])
return SubscriptionInfo(
subscriptions=subscribed,
unsubscribed=unsubscribed,
never_subscribed=never_subscribed,
)
def gather_subscriptions(
user_profile: UserProfile,
include_subscribers: bool = False,
) -> Tuple[List[SubscriptionStreamDict], List[SubscriptionStreamDict]]:
helper_result = gather_subscriptions_helper(
user_profile,
include_subscribers=include_subscribers,
)
subscribed = helper_result.subscriptions
unsubscribed = helper_result.unsubscribed
return (subscribed, unsubscribed)
class ActivePresenceIdleUserData(TypedDict):
alerted: bool
notifications_data: UserMessageNotificationsData