diff --git a/zerver/lib/actions.py b/zerver/lib/actions.py index df8170944a..4950a2bb35 100644 --- a/zerver/lib/actions.py +++ b/zerver/lib/actions.py @@ -17,7 +17,6 @@ from analytics.lib.counts import COUNT_STATS, do_increment_logging_stat, \ from zerver.lib.bugdown import ( version as bugdown_version, url_embed_preview_enabled, - convert as bugdown_convert, ) from zerver.lib.addressee import Addressee from zerver.lib.bot_config import ( @@ -155,6 +154,9 @@ from zerver.lib.upload import claim_attachment, delete_message_image, \ from zerver.lib.video_calls import request_zoom_video_call_url from zerver.tornado.event_queue import send_event from zerver.lib.types import ProfileFieldData +from zerver.lib.streams import access_stream_for_send_message, subscribed_to_stream, check_stream_name, \ + create_stream_if_needed, get_default_value_for_history_public_to_subscribers, \ + render_stream_description, send_stream_creation_event from analytics.models import StreamCount @@ -1810,73 +1812,6 @@ def check_send_typing_notification(sender: UserProfile, notification_to: Union[S operator=operator, ) -def send_stream_creation_event(stream: Stream, user_ids: List[int]) -> None: - event = dict(type="stream", op="create", - streams=[stream.to_dict()]) - send_event(stream.realm, event, user_ids) - -def get_default_value_for_history_public_to_subscribers( - realm: Realm, - invite_only: bool, - history_public_to_subscribers: Optional[bool] -) -> bool: - if invite_only: - if history_public_to_subscribers is None: - # A private stream's history is non-public by default - history_public_to_subscribers = False - else: - # If we later decide to support public streams without - # history, we can remove this code path. - history_public_to_subscribers = True - - if realm.is_zephyr_mirror_realm: - # In the Zephyr mirroring model, history is unconditionally - # not public to subscribers, even for public streams. - history_public_to_subscribers = False - - return history_public_to_subscribers - -def render_stream_description(text: str) -> str: - return bugdown_convert(text, no_previews=True) - -def create_stream_if_needed(realm: Realm, - stream_name: str, - *, - invite_only: bool=False, - stream_post_policy: int=Stream.STREAM_POST_POLICY_EVERYONE, - history_public_to_subscribers: Optional[bool]=None, - stream_description: str="") -> Tuple[Stream, bool]: - - history_public_to_subscribers = get_default_value_for_history_public_to_subscribers( - realm, invite_only, history_public_to_subscribers) - - (stream, created) = Stream.objects.get_or_create( - realm=realm, - name__iexact=stream_name, - defaults = dict( - name=stream_name, - description=stream_description, - invite_only=invite_only, - stream_post_policy=stream_post_policy, - history_public_to_subscribers=history_public_to_subscribers, - is_in_zephyr_realm=realm.is_zephyr_mirror_realm - ) - ) - - if created: - recipient = Recipient.objects.create(type_id=stream.id, type=Recipient.STREAM) - - stream.recipient = recipient - stream.rendered_description = render_stream_description(stream_description) - stream.save(update_fields=["recipient", "rendered_description"]) - - if stream.is_public(): - send_stream_creation_event(stream, active_non_guest_user_ids(stream.realm_id)) - else: - realm_admin_ids = [user.id for user in - stream.realm.get_admin_users_and_bots()] - send_stream_creation_event(stream, realm_admin_ids) - return stream, created def ensure_stream(realm: Realm, stream_name: str, @@ -1886,30 +1821,6 @@ def ensure_stream(realm: Realm, invite_only=invite_only, stream_description=stream_description)[0] -def create_streams_if_needed(realm: Realm, - stream_dicts: List[Mapping[str, Any]]) -> Tuple[List[Stream], List[Stream]]: - """Note that stream_dict["name"] is assumed to already be stripped of - whitespace""" - added_streams = [] # type: List[Stream] - existing_streams = [] # type: List[Stream] - for stream_dict in stream_dicts: - stream, created = create_stream_if_needed( - realm, - stream_dict["name"], - invite_only=stream_dict.get("invite_only", False), - stream_post_policy=stream_dict.get("stream_post_policy", Stream.STREAM_POST_POLICY_EVERYONE), - history_public_to_subscribers=stream_dict.get("history_public_to_subscribers"), - stream_description=stream_dict.get("description", "") - ) - - if created: - added_streams.append(stream) - else: - existing_streams.append(stream) - - return added_streams, existing_streams - - def get_recipient_from_user_profiles(recipient_profiles: Sequence[UserProfile], forwarded_mirror_message: bool, forwarder_user_profile: Optional[UserProfile], @@ -2163,14 +2074,6 @@ def check_schedule_message(sender: UserProfile, client: Client, return do_schedule_messages([message])[0] -def check_stream_name(stream_name: str) -> None: - if stream_name.strip() == "": - raise JsonableError(_("Invalid stream name '%s'") % (stream_name,)) - if len(stream_name) > Stream.MAX_NAME_LENGTH: - raise JsonableError(_("Stream name too long (limit: %s characters).") % (Stream.MAX_NAME_LENGTH,)) - for i in stream_name: - if ord(i) == 0: - raise JsonableError(_("Stream name '%s' contains NULL (0x00) characters.") % (stream_name,)) def check_default_stream_group_name(group_name: str) -> None: if group_name.strip() == "": @@ -2252,56 +2155,6 @@ def send_pm_if_empty_stream(stream: Optional[Stream], send_rate_limited_pm_notification_to_bot_owner(sender, realm, content) -def validate_sender_can_write_to_stream(sender: UserProfile, - stream: Stream, - forwarder_user_profile: Optional[UserProfile]) -> None: - # Our caller is responsible for making sure that `stream` actually - # matches the realm of the sender. - - # Organization admins can send to any stream, irrespective of the stream_post_policy value. - if sender.is_realm_admin or is_cross_realm_bot_email(sender.delivery_email): - pass - elif sender.is_bot and (sender.bot_owner is not None and - sender.bot_owner.is_realm_admin): - pass - elif stream.stream_post_policy == Stream.STREAM_POST_POLICY_ADMINS: - raise JsonableError(_("Only organization administrators can send to this stream.")) - elif stream.stream_post_policy == Stream.STREAM_POST_POLICY_RESTRICT_NEW_MEMBERS: - if sender.is_bot and (sender.bot_owner is not None and - sender.bot_owner.is_new_member): - raise JsonableError(_("New members cannot send to this stream.")) - elif sender.is_new_member: - raise JsonableError(_("New members cannot send to this stream.")) - - if not (stream.invite_only or sender.is_guest): - # This is a public stream and sender is not a guest user - return - - if subscribed_to_stream(sender, stream.id): - # It is private, but your are subscribed - return - - if sender.is_api_super_user: - return - - if (forwarder_user_profile is not None and forwarder_user_profile.is_api_super_user): - return - - if sender.is_bot and (sender.bot_owner is not None and - subscribed_to_stream(sender.bot_owner, stream.id)): - # Bots can send to any stream their owner can. - return - - if sender.delivery_email == settings.WELCOME_BOT: - # The welcome bot welcomes folks to the stream. - return - - if sender.delivery_email == settings.NOTIFICATION_BOT: - return - - # All other cases are an error. - raise JsonableError(_("Not authorized to send to stream '%s'") % (stream.name,)) - def validate_stream_name_with_pm_notification(stream_name: str, realm: Realm, sender: UserProfile) -> Stream: stream_name = stream_name.strip() @@ -2382,11 +2235,12 @@ def check_message(sender: UserProfile, client: Client, addressee: Addressee, recipient = stream.recipient # This will raise JsonableError if there are problems. - validate_sender_can_write_to_stream( - sender=sender, - stream=stream, - forwarder_user_profile=forwarder_user_profile - ) + + if sender.bot_type != sender.OUTGOING_WEBHOOK_BOT: + access_stream_for_send_message( + sender=sender, + stream=stream, + forwarder_user_profile=forwarder_user_profile) elif addressee.is_private(): user_profiles = addressee.user_profiles() @@ -4344,12 +4198,6 @@ def do_update_message_flags(user_profile: UserProfile, statsd.incr("flags.%s.%s" % (flag, operation), count) return count -def subscribed_to_stream(user_profile: UserProfile, stream_id: int) -> bool: - return Subscription.objects.filter( - user_profile=user_profile, - active=True, - recipient__type=Recipient.STREAM, - recipient__type_id=stream_id).exists() def truncate_content(content: str, max_length: int, truncation_message: str) -> str: if len(content) > max_length: diff --git a/zerver/lib/bulk_create.py b/zerver/lib/bulk_create.py index cb29292dcb..cac6c1a83a 100644 --- a/zerver/lib/bulk_create.py +++ b/zerver/lib/bulk_create.py @@ -7,6 +7,7 @@ from zerver.models import Realm, Stream, UserProfile, \ Subscription, Recipient, RealmAuditLog from zerver.lib.create_user import create_user_profile, \ get_display_email_address +from zerver.lib.streams import render_stream_description def bulk_create_users(realm: Realm, users_raw: Set[Tuple[str, str, str, bool]], @@ -111,7 +112,6 @@ def bulk_create_streams(realm: Realm, options['history_public_to_subscribers'] = ( not options.get("invite_only", False) and not realm.is_zephyr_mirror_realm) if name.lower() not in existing_streams: - from zerver.lib.actions import render_stream_description streams_to_create.append( Stream( realm=realm, diff --git a/zerver/lib/import_realm.py b/zerver/lib/import_realm.py index 34a9c57d9f..740af9c323 100644 --- a/zerver/lib/import_realm.py +++ b/zerver/lib/import_realm.py @@ -24,7 +24,7 @@ from zerver.lib.export import DATE_FIELDS, \ Record, TableData, TableName, Field, Path from zerver.lib.message import do_render_markdown from zerver.lib.bugdown import version as bugdown_version -from zerver.lib.actions import render_stream_description +from zerver.lib.streams import render_stream_description from zerver.lib.upload import random_name, sanitize_name, \ guess_type, BadImageError from zerver.lib.utils import generate_api_key, process_list_in_batches diff --git a/zerver/lib/streams.py b/zerver/lib/streams.py index 6bd7aed9c7..c41537e4e4 100644 --- a/zerver/lib/streams.py +++ b/zerver/lib/streams.py @@ -1,15 +1,176 @@ from typing import Any, Iterable, List, Mapping, Set, Tuple, Optional, Union from django.utils.translation import ugettext as _ +from django.conf import settings -from zerver.lib.actions import check_stream_name, create_streams_if_needed from zerver.lib.request import JsonableError -from zerver.models import UserProfile, Stream, Subscription, \ - Realm, Recipient, get_stream, \ - bulk_get_streams, get_realm_stream, DefaultStreamGroup, get_stream_by_id_in_realm +from zerver.models import ( + UserProfile, Stream, Subscription, Realm, Recipient, get_stream, + bulk_get_streams, get_realm_stream, DefaultStreamGroup, get_stream_by_id_in_realm, + is_cross_realm_bot_email, active_non_guest_user_ids, +) +from zerver.lib.bugdown import convert as bugdown_convert +from zerver.tornado.event_queue import send_event from django.db.models.query import QuerySet + +def get_default_value_for_history_public_to_subscribers( + realm: Realm, + invite_only: bool, + history_public_to_subscribers: Optional[bool] +) -> bool: + if invite_only: + if history_public_to_subscribers is None: + # A private stream's history is non-public by default + history_public_to_subscribers = False + else: + # If we later decide to support public streams without + # history, we can remove this code path. + history_public_to_subscribers = True + + if realm.is_zephyr_mirror_realm: + # In the Zephyr mirroring model, history is unconditionally + # not public to subscribers, even for public streams. + history_public_to_subscribers = False + + return history_public_to_subscribers + +def render_stream_description(text: str) -> str: + return bugdown_convert(text, no_previews=True) + +def send_stream_creation_event(stream: Stream, user_ids: List[int]) -> None: + event = dict(type="stream", op="create", + streams=[stream.to_dict()]) + send_event(stream.realm, event, user_ids) + +def create_stream_if_needed(realm: Realm, + stream_name: str, + *, + invite_only: bool=False, + stream_post_policy: int=Stream.STREAM_POST_POLICY_EVERYONE, + history_public_to_subscribers: Optional[bool]=None, + stream_description: str="") -> Tuple[Stream, bool]: + history_public_to_subscribers = get_default_value_for_history_public_to_subscribers( + realm, invite_only, history_public_to_subscribers) + + (stream, created) = Stream.objects.get_or_create( + realm=realm, + name__iexact=stream_name, + defaults = dict( + name=stream_name, + description=stream_description, + invite_only=invite_only, + stream_post_policy=stream_post_policy, + history_public_to_subscribers=history_public_to_subscribers, + is_in_zephyr_realm=realm.is_zephyr_mirror_realm + ) + ) + + if created: + recipient = Recipient.objects.create(type_id=stream.id, type=Recipient.STREAM) + + stream.recipient = recipient + stream.rendered_description = render_stream_description(stream_description) + stream.save(update_fields=["recipient", "rendered_description"]) + + if stream.is_public(): + send_stream_creation_event(stream, active_non_guest_user_ids(stream.realm_id)) + else: + realm_admin_ids = [user.id for user in + stream.realm.get_admin_users_and_bots()] + send_stream_creation_event(stream, realm_admin_ids) + return stream, created + +def create_streams_if_needed(realm: Realm, + stream_dicts: List[Mapping[str, Any]]) -> Tuple[List[Stream], List[Stream]]: + """Note that stream_dict["name"] is assumed to already be stripped of + whitespace""" + added_streams = [] # type: List[Stream] + existing_streams = [] # type: List[Stream] + for stream_dict in stream_dicts: + stream, created = create_stream_if_needed( + realm, + stream_dict["name"], + invite_only=stream_dict.get("invite_only", False), + stream_post_policy=stream_dict.get("stream_post_policy", Stream.STREAM_POST_POLICY_EVERYONE), + history_public_to_subscribers=stream_dict.get("history_public_to_subscribers"), + stream_description=stream_dict.get("description", "") + ) + + if created: + added_streams.append(stream) + else: + existing_streams.append(stream) + + return added_streams, existing_streams + +def check_stream_name(stream_name: str) -> None: + if stream_name.strip() == "": + raise JsonableError(_("Invalid stream name '%s'") % (stream_name,)) + if len(stream_name) > Stream.MAX_NAME_LENGTH: + raise JsonableError(_("Stream name too long (limit: %s characters).") % (Stream.MAX_NAME_LENGTH,)) + for i in stream_name: + if ord(i) == 0: + raise JsonableError(_("Stream name '%s' contains NULL (0x00) characters.") % (stream_name,)) + +def subscribed_to_stream(user_profile: UserProfile, stream_id: int) -> bool: + return Subscription.objects.filter( + user_profile=user_profile, + active=True, + recipient__type=Recipient.STREAM, + recipient__type_id=stream_id).exists() + +def access_stream_for_send_message(sender: UserProfile, + stream: Stream, + forwarder_user_profile: Optional[UserProfile]) -> None: + # Our caller is responsible for making sure that `stream` actually + # matches the realm of the sender. + + # Organization admins can send to any stream, irrespective of the stream_post_policy value. + if sender.is_realm_admin or is_cross_realm_bot_email(sender.delivery_email): + pass + elif sender.is_bot and (sender.bot_owner is not None and + sender.bot_owner.is_realm_admin): + pass + elif stream.stream_post_policy == Stream.STREAM_POST_POLICY_ADMINS: + raise JsonableError(_("Only organization administrators can send to this stream.")) + elif stream.stream_post_policy == Stream.STREAM_POST_POLICY_RESTRICT_NEW_MEMBERS: + if sender.is_bot and (sender.bot_owner is not None and + sender.bot_owner.is_new_member): + raise JsonableError(_("New members cannot send to this stream.")) + elif sender.is_new_member: + raise JsonableError(_("New members cannot send to this stream.")) + + if not (stream.invite_only or sender.is_guest): + # This is a public stream and sender is not a guest user + return + + if subscribed_to_stream(sender, stream.id): + # It is private, but your are subscribed + return + + if sender.is_api_super_user: + return + + if (forwarder_user_profile is not None and forwarder_user_profile.is_api_super_user): + return + + if sender.is_bot and (sender.bot_owner is not None and + subscribed_to_stream(sender.bot_owner, stream.id)): + # Bots can send to any stream their owner can. + return + + if sender.delivery_email == settings.WELCOME_BOT: + # The welcome bot welcomes folks to the stream. + return + + if sender.delivery_email == settings.NOTIFICATION_BOT: + return + + # All other cases are an error. + raise JsonableError(_("Not authorized to send to stream '%s'") % (stream.name,)) + def check_for_exactly_one_stream_arg(stream_id: Optional[int], stream: Optional[str]) -> None: if stream_id is None and stream is None: raise JsonableError(_("Please supply 'stream'.")) @@ -18,7 +179,6 @@ def check_for_exactly_one_stream_arg(stream_id: Optional[int], stream: Optional[ raise JsonableError(_("Please choose one: 'stream' or 'stream_id'.")) def access_stream_for_delete_or_update(user_profile: UserProfile, stream_id: int) -> Stream: - # We should only ever use this for realm admins, who are allowed # to delete or update all streams on their realm, even private streams # to which they are not subscribed. We do an assert here, because diff --git a/zerver/lib/test_classes.py b/zerver/lib/test_classes.py index d86fe9a540..85c2959786 100644 --- a/zerver/lib/test_classes.py +++ b/zerver/lib/test_classes.py @@ -28,12 +28,14 @@ from zerver.lib.sessions import get_session_dict_user from zerver.lib.webhooks.common import get_fixture_http_headers, standardize_headers from zerver.lib.actions import ( - check_send_message, create_stream_if_needed, bulk_add_subscriptions, + check_send_message, bulk_add_subscriptions, bulk_remove_subscriptions, check_send_stream_message, gather_subscriptions, - get_default_value_for_history_public_to_subscribers, ) - +from zerver.lib.streams import ( + create_stream_if_needed, + get_default_value_for_history_public_to_subscribers +) from zerver.lib.stream_subscription import ( get_stream_subscriptions_for_user, ) diff --git a/zerver/management/commands/create_stream.py b/zerver/management/commands/create_stream.py index 5c6058c38f..7bec5f8934 100644 --- a/zerver/management/commands/create_stream.py +++ b/zerver/management/commands/create_stream.py @@ -1,8 +1,8 @@ from argparse import ArgumentParser from typing import Any -from zerver.lib.actions import create_stream_if_needed from zerver.lib.management import ZulipBaseCommand +from zerver.lib.streams import create_stream_if_needed class Command(ZulipBaseCommand): diff --git a/zerver/tests/test_digest.py b/zerver/tests/test_digest.py index a2b49425ab..a201b07f93 100644 --- a/zerver/tests/test_digest.py +++ b/zerver/tests/test_digest.py @@ -7,9 +7,10 @@ from django.test import override_settings from django.utils.timezone import now as timezone_now from confirmation.models import one_click_unsubscribe_link -from zerver.lib.actions import create_stream_if_needed, do_create_user +from zerver.lib.actions import do_create_user from zerver.lib.digest import gather_new_streams, handle_digest_email, enqueue_emails, \ exclude_subscription_modified_streams +from zerver.lib.streams import create_stream_if_needed from zerver.lib.test_classes import ZulipTestCase from zerver.lib.test_helpers import queries_captured from zerver.models import get_client, get_realm, flush_per_request_caches, \ diff --git a/zerver/tests/test_import_export.py b/zerver/tests/test_import_export.py index f333e84ca4..9c143ebc7f 100644 --- a/zerver/tests/test_import_export.py +++ b/zerver/tests/test_import_export.py @@ -52,13 +52,12 @@ from zerver.lib.bot_config import ( from zerver.lib.actions import ( do_create_user, do_add_reaction, - create_stream_if_needed, do_change_icon_source, do_change_logo_source, do_update_user_presence, do_change_plan_type, ) - +from zerver.lib.streams import create_stream_if_needed from zerver.lib.test_runner import slow from zerver.models import ( diff --git a/zerver/tests/test_middleware.py b/zerver/tests/test_middleware.py index b22c9d36c9..228e44bfd2 100644 --- a/zerver/tests/test_middleware.py +++ b/zerver/tests/test_middleware.py @@ -5,8 +5,8 @@ from bs4 import BeautifulSoup from django.conf import settings from django.test import override_settings from unittest.mock import Mock, patch -from zerver.lib.actions import create_stream_if_needed from zerver.lib.realm_icon import get_realm_icon_url +from zerver.lib.streams import create_stream_if_needed from zerver.lib.test_classes import ZulipTestCase from zerver.middleware import is_slow_query, write_log_line from zerver.models import get_realm, get_system_bot diff --git a/zerver/tests/test_narrow.py b/zerver/tests/test_narrow.py index 1f7d19193e..668238a9d4 100644 --- a/zerver/tests/test_narrow.py +++ b/zerver/tests/test_narrow.py @@ -13,7 +13,6 @@ from zerver.models import ( from zerver.lib.actions import ( do_set_realm_property, do_deactivate_user, - create_streams_if_needed ) from zerver.lib.message import ( MessageDict, @@ -47,6 +46,7 @@ from zerver.views.messages import ( find_first_unread_anchor, LARGER_THAN_MAX_MESSAGE_ID, ) +from zerver.lib.streams import create_streams_if_needed from typing import Dict, Mapping, List, Sequence, Tuple, Union, Any, Optional import mock diff --git a/zerver/tests/test_queue_worker.py b/zerver/tests/test_queue_worker.py index c1e2733ca9..007170d09f 100644 --- a/zerver/tests/test_queue_worker.py +++ b/zerver/tests/test_queue_worker.py @@ -9,13 +9,13 @@ from django.test import override_settings from mock import patch, MagicMock from typing import Any, Callable, Dict, List, Mapping, Tuple -from zerver.lib.actions import create_stream_if_needed from zerver.lib.email_mirror import RateLimitedRealmMirror from zerver.lib.email_mirror_helpers import encode_email_address from zerver.lib.queue import MAX_REQUEST_RETRIES from zerver.lib.rate_limiter import RateLimiterLockingException from zerver.lib.remote_server import PushNotificationBouncerRetryLaterError from zerver.lib.send_email import FromAddress +from zerver.lib.streams import create_stream_if_needed from zerver.lib.test_helpers import simulated_queue_client from zerver.lib.test_classes import ZulipTestCase from zerver.models import get_client, UserActivity, PreregistrationUser, \ diff --git a/zerver/tests/test_realm.py b/zerver/tests/test_realm.py index ae0d8db657..c904316315 100644 --- a/zerver/tests/test_realm.py +++ b/zerver/tests/test_realm.py @@ -14,7 +14,6 @@ from zerver.lib.actions import ( do_deactivate_stream, do_create_realm, do_scrub_realm, - create_stream_if_needed, do_change_plan_type, do_send_realm_reactivation_email ) @@ -22,6 +21,7 @@ from zerver.lib.actions import ( from confirmation.models import create_confirmation_link, Confirmation from zerver.lib.realm_description import get_realm_rendered_description, get_realm_text_description from zerver.lib.send_email import send_future_email +from zerver.lib.streams import create_stream_if_needed from zerver.lib.test_classes import ZulipTestCase from zerver.lib.test_helpers import ( reset_emails_in_zulip_realm, diff --git a/zerver/tests/test_signup.py b/zerver/tests/test_signup.py index 500ca53e65..2dcced49a1 100644 --- a/zerver/tests/test_signup.py +++ b/zerver/tests/test_signup.py @@ -35,7 +35,6 @@ from zerver.models import ( Stream, Subscription, flush_per_request_caches, get_system_bot, ) from zerver.lib.actions import ( - create_stream_if_needed, do_change_is_admin, get_stream, do_create_default_stream_group, @@ -59,6 +58,7 @@ from zerver.lib.email_notifications import enqueue_welcome_emails, \ from zerver.lib.rate_limiter import add_ratelimit_rule, remove_ratelimit_rule from zerver.lib.subdomains import is_root_domain_available from zerver.lib.stream_subscription import get_stream_subscriptions_for_user +from zerver.lib.streams import create_stream_if_needed from zerver.lib.test_helpers import find_key_by_email, queries_captured, \ HostRequestMock, load_subdomain_token from zerver.lib.test_classes import ( diff --git a/zerver/tests/test_subs.py b/zerver/tests/test_subs.py index e8165dd8fb..d8361ae380 100644 --- a/zerver/tests/test_subs.py +++ b/zerver/tests/test_subs.py @@ -27,7 +27,7 @@ from zerver.lib.response import ( from zerver.lib.streams import ( access_stream_by_id, access_stream_by_name, filter_stream_authorization, - list_to_streams, + list_to_streams, create_streams_if_needed ) from zerver.lib.stream_subscription import ( @@ -52,7 +52,7 @@ from zerver.lib.actions import ( gather_subscriptions_helper, bulk_add_subscriptions, bulk_remove_subscriptions, gather_subscriptions, get_default_streams_for_realm, get_stream, do_get_streams, do_change_is_guest, - create_stream_if_needed, create_streams_if_needed, + create_stream_if_needed, ensure_stream, do_deactivate_stream, do_deactivate_user,