actions: Move part into zerver.lib.streams.

Signed-off-by: Anders Kaseorg <anders@zulip.com>
(cherry picked from commit a29f1b39da)
This commit is contained in:
Anders Kaseorg
2022-04-14 14:42:50 -07:00
committed by Tim Abbott
parent 5bcb52390c
commit 025219da16
12 changed files with 148 additions and 150 deletions

View File

@@ -1,18 +1,23 @@
from typing import Collection, List, Optional, Set, Tuple, Union
from typing import Any, Collection, Dict, List, Optional, Set, Tuple, Union
from django.db import transaction
from django.db.models import Exists, OuterRef, Q
from django.db.models.query import QuerySet
from django.utils.timezone import now as timezone_now
from django.utils.translation import gettext as _
from typing_extensions import TypedDict
from zerver.actions.default_streams import get_default_streams_for_realm
from zerver.lib.exceptions import (
JsonableError,
OrganizationOwnerRequired,
StreamAdministratorRequired,
)
from zerver.lib.markdown import markdown_convert
from zerver.lib.stream_subscription import get_active_subscriptions_for_stream_id
from zerver.lib.stream_subscription import (
get_active_subscriptions_for_stream_id,
get_subscribed_stream_ids_for_user,
)
from zerver.lib.string_validation import check_stream_name
from zerver.models import (
DefaultStreamGroup,
@@ -738,3 +743,126 @@ def get_stream_by_narrow_operand_access_unchecked(operand: Union[str, int], real
if isinstance(operand, str):
return get_stream(operand, realm)
return get_stream_by_id_in_realm(operand, realm)
def get_signups_stream(realm: Realm) -> Stream:
# This one-liner helps us work around a lint rule.
return get_stream("signups", realm)
def ensure_stream(
realm: Realm,
stream_name: str,
invite_only: bool = False,
stream_description: str = "",
*,
acting_user: Optional[UserProfile],
) -> Stream:
return create_stream_if_needed(
realm,
stream_name,
invite_only=invite_only,
stream_description=stream_description,
acting_user=acting_user,
)[0]
def get_occupied_streams(realm: Realm) -> QuerySet:
# TODO: Make a generic stub for QuerySet
"""Get streams with subscribers"""
exists_expression = Exists(
Subscription.objects.filter(
active=True,
is_user_active=True,
user_profile__realm=realm,
recipient_id=OuterRef("recipient_id"),
),
)
occupied_streams = (
Stream.objects.filter(realm=realm, deactivated=False)
.annotate(occupied=exists_expression)
.filter(occupied=True)
)
return occupied_streams
def get_web_public_streams(realm: Realm) -> List[Dict[str, Any]]: # nocoverage
query = get_web_public_streams_queryset(realm)
streams = Stream.get_client_data(query)
return streams
def do_get_streams(
user_profile: UserProfile,
include_public: bool = True,
include_web_public: bool = False,
include_subscribed: bool = True,
include_all_active: bool = False,
include_default: bool = False,
include_owner_subscribed: bool = False,
) -> List[Dict[str, Any]]:
# This function is only used by API clients now.
if include_all_active and not user_profile.is_realm_admin:
raise JsonableError(_("User not authorized for this query"))
include_public = include_public and user_profile.can_access_public_streams()
# Start out with all active streams in the realm.
query = Stream.objects.filter(realm=user_profile.realm, deactivated=False)
if include_all_active:
streams = Stream.get_client_data(query)
else:
# We construct a query as the or (|) of the various sources
# this user requested streams from.
query_filter: Optional[Q] = None
def add_filter_option(option: Q) -> None:
nonlocal query_filter
if query_filter is None:
query_filter = option
else:
query_filter |= option
if include_subscribed:
subscribed_stream_ids = get_subscribed_stream_ids_for_user(user_profile)
recipient_check = Q(id__in=set(subscribed_stream_ids))
add_filter_option(recipient_check)
if include_public:
invite_only_check = Q(invite_only=False)
add_filter_option(invite_only_check)
if include_web_public:
# This should match get_web_public_streams_queryset
web_public_check = Q(
is_web_public=True,
invite_only=False,
history_public_to_subscribers=True,
deactivated=False,
)
add_filter_option(web_public_check)
if include_owner_subscribed and user_profile.is_bot:
bot_owner = user_profile.bot_owner
assert bot_owner is not None
owner_stream_ids = get_subscribed_stream_ids_for_user(bot_owner)
owner_subscribed_check = Q(id__in=set(owner_stream_ids))
add_filter_option(owner_subscribed_check)
if query_filter is not None:
query = query.filter(query_filter)
streams = Stream.get_client_data(query)
else:
# Don't bother going to the database with no valid sources
streams = []
streams.sort(key=lambda elt: elt["name"])
if include_default:
is_default = {}
default_streams = get_default_streams_for_realm(user_profile.realm_id)
for default_stream in default_streams:
is_default[default_stream.id] = True
for stream in streams:
stream["is_default"] = is_default.get(stream["stream_id"], False)
return streams