mirror of
				https://github.com/zulip/zulip.git
				synced 2025-11-04 05:53:43 +00:00 
			
		
		
		
	Because send_messages_for_new_subscribers also sends channel
notification messages about newly created channels, a clearer
name for is send_user_subscribed_and_new_channel_notifications.
(cherry picked from commit 8b1e536e6b)
		
	
		
			
				
	
	
		
			1488 lines
		
	
	
		
			56 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			1488 lines
		
	
	
		
			56 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import time
 | 
						|
from collections import defaultdict
 | 
						|
from collections.abc import Callable
 | 
						|
from typing import Annotated, Any, Literal
 | 
						|
 | 
						|
import orjson
 | 
						|
from django.conf import settings
 | 
						|
from django.contrib.auth.models import AnonymousUser
 | 
						|
from django.db import transaction
 | 
						|
from django.http import HttpRequest, HttpResponse
 | 
						|
from django.utils.translation import gettext as _
 | 
						|
from django.utils.translation import override as override_language
 | 
						|
from pydantic import BaseModel, Field, Json, NonNegativeInt, StringConstraints, model_validator
 | 
						|
from pydantic.functional_validators import AfterValidator
 | 
						|
from pydantic_partials.sentinels import Missing, MissingType
 | 
						|
 | 
						|
from zerver.actions.default_streams import (
 | 
						|
    do_add_default_stream,
 | 
						|
    do_add_streams_to_default_stream_group,
 | 
						|
    do_change_default_stream_group_description,
 | 
						|
    do_change_default_stream_group_name,
 | 
						|
    do_create_default_stream_group,
 | 
						|
    do_remove_default_stream,
 | 
						|
    do_remove_default_stream_group,
 | 
						|
    do_remove_streams_from_default_stream_group,
 | 
						|
)
 | 
						|
from zerver.actions.message_delete import do_delete_messages
 | 
						|
from zerver.actions.message_send import (
 | 
						|
    do_send_messages,
 | 
						|
    internal_prep_private_message,
 | 
						|
    internal_prep_stream_message,
 | 
						|
)
 | 
						|
from zerver.actions.streams import (
 | 
						|
    bulk_add_subscriptions,
 | 
						|
    bulk_remove_subscriptions,
 | 
						|
    do_change_stream_description,
 | 
						|
    do_change_stream_folder,
 | 
						|
    do_change_stream_group_based_setting,
 | 
						|
    do_change_stream_message_retention_days,
 | 
						|
    do_change_stream_permission,
 | 
						|
    do_change_subscription_property,
 | 
						|
    do_deactivate_stream,
 | 
						|
    do_rename_stream,
 | 
						|
    do_set_stream_property,
 | 
						|
    do_unarchive_stream,
 | 
						|
    get_subscriber_ids,
 | 
						|
)
 | 
						|
from zerver.actions.user_topics import bulk_do_set_user_topic_visibility_policy
 | 
						|
from zerver.context_processors import get_valid_realm_from_request
 | 
						|
from zerver.decorator import (
 | 
						|
    check_if_user_can_manage_default_streams,
 | 
						|
    require_non_guest_user,
 | 
						|
    require_realm_admin,
 | 
						|
)
 | 
						|
from zerver.lib.channel_folders import get_channel_folder_by_id
 | 
						|
from zerver.lib.default_streams import get_default_stream_ids_for_realm
 | 
						|
from zerver.lib.email_mirror_helpers import encode_email_address, get_channel_email_token
 | 
						|
from zerver.lib.exceptions import (
 | 
						|
    CannotManageDefaultChannelError,
 | 
						|
    JsonableError,
 | 
						|
    OrganizationOwnerRequiredError,
 | 
						|
)
 | 
						|
from zerver.lib.mention import MentionBackend, silent_mention_syntax_for_user
 | 
						|
from zerver.lib.message import bulk_access_stream_messages_query
 | 
						|
from zerver.lib.response import json_success
 | 
						|
from zerver.lib.retention import STREAM_MESSAGE_BATCH_SIZE as RETENTION_STREAM_MESSAGE_BATCH_SIZE
 | 
						|
from zerver.lib.retention import parse_message_retention_days
 | 
						|
from zerver.lib.stream_traffic import get_streams_traffic
 | 
						|
from zerver.lib.streams import (
 | 
						|
    StreamDict,
 | 
						|
    access_default_stream_group_by_id,
 | 
						|
    access_stream_by_id,
 | 
						|
    access_stream_by_name,
 | 
						|
    access_stream_for_delete_or_update_requiring_metadata_access,
 | 
						|
    access_web_public_stream,
 | 
						|
    channel_events_topic_name,
 | 
						|
    check_channel_creation_permissions,
 | 
						|
    check_stream_name_available,
 | 
						|
    check_zephyr_realm_invite_conditions,
 | 
						|
    create_stream_if_needed,
 | 
						|
    do_get_streams,
 | 
						|
    filter_stream_authorization_for_adding_subscribers,
 | 
						|
    get_anonymous_group_membership_dict_for_streams,
 | 
						|
    get_stream_permission_default_group,
 | 
						|
    get_stream_permission_policy_key,
 | 
						|
    list_to_streams,
 | 
						|
    stream_to_dict,
 | 
						|
    user_has_content_access,
 | 
						|
    validate_topics_policy,
 | 
						|
)
 | 
						|
from zerver.lib.subscription_info import gather_subscriptions
 | 
						|
from zerver.lib.topic import (
 | 
						|
    get_topic_history_for_public_stream,
 | 
						|
    get_topic_history_for_stream,
 | 
						|
    maybe_rename_general_chat_to_empty_topic,
 | 
						|
    messages_for_topic,
 | 
						|
)
 | 
						|
from zerver.lib.topic_link_util import get_stream_link_syntax
 | 
						|
from zerver.lib.typed_endpoint import ApiParamConfig, PathOnly, typed_endpoint
 | 
						|
from zerver.lib.typed_endpoint_validators import check_color, parse_enum_from_string_value
 | 
						|
from zerver.lib.types import UserGroupMembersData
 | 
						|
from zerver.lib.user_groups import (
 | 
						|
    GroupSettingChangeRequest,
 | 
						|
    UserGroupMembershipDetails,
 | 
						|
    access_user_group_api_value_for_setting,
 | 
						|
    access_user_group_for_setting,
 | 
						|
    get_group_setting_value_for_api,
 | 
						|
    get_role_based_system_groups_dict,
 | 
						|
    get_system_user_group_by_name,
 | 
						|
    parse_group_setting_value,
 | 
						|
    validate_group_setting_value_change,
 | 
						|
)
 | 
						|
from zerver.lib.user_topics import get_users_with_user_topic_visibility_policy
 | 
						|
from zerver.lib.users import access_bot_by_id, bulk_access_users_by_email, bulk_access_users_by_id
 | 
						|
from zerver.lib.utils import assert_is_not_none
 | 
						|
from zerver.models import (
 | 
						|
    ChannelFolder,
 | 
						|
    Realm,
 | 
						|
    Stream,
 | 
						|
    UserGroup,
 | 
						|
    UserMessage,
 | 
						|
    UserProfile,
 | 
						|
    UserTopic,
 | 
						|
)
 | 
						|
from zerver.models.groups import SystemGroups
 | 
						|
from zerver.models.streams import StreamTopicsPolicyEnum
 | 
						|
from zerver.models.users import get_system_bot
 | 
						|
 | 
						|
 | 
						|
def bulk_principals_to_user_profiles(
 | 
						|
    principals: list[str] | list[int],
 | 
						|
    acting_user: UserProfile,
 | 
						|
) -> set[UserProfile]:
 | 
						|
    # Since principals is guaranteed to be non-empty and to have the same type of elements,
 | 
						|
    # the following if/else is safe and enough.
 | 
						|
 | 
						|
    # principals are user emails.
 | 
						|
    if isinstance(principals[0], str):
 | 
						|
        return bulk_access_users_by_email(
 | 
						|
            principals,  # type: ignore[arg-type] # principals guaranteed to be list[str] only.
 | 
						|
            acting_user=acting_user,
 | 
						|
            allow_deactivated=False,
 | 
						|
            allow_bots=True,
 | 
						|
            for_admin=False,
 | 
						|
        )
 | 
						|
 | 
						|
    # principals are user ids.
 | 
						|
    else:
 | 
						|
        return bulk_access_users_by_id(
 | 
						|
            principals,  # type: ignore[arg-type] # principals guaranteed to be list[int] only.
 | 
						|
            acting_user=acting_user,
 | 
						|
            allow_deactivated=False,
 | 
						|
            allow_bots=True,
 | 
						|
            for_admin=False,
 | 
						|
        )
 | 
						|
 | 
						|
 | 
						|
def user_directly_controls_user(user_profile: UserProfile, target: UserProfile) -> bool:
 | 
						|
    """Returns whether the target user is either the current user or a bot
 | 
						|
    owned by the current user"""
 | 
						|
    if user_profile == target:
 | 
						|
        return True
 | 
						|
    if target.is_bot and target.bot_owner_id == user_profile.id:
 | 
						|
        return True
 | 
						|
    return False
 | 
						|
 | 
						|
 | 
						|
def deactivate_stream_backend(
 | 
						|
    request: HttpRequest, user_profile: UserProfile, stream_id: int
 | 
						|
) -> HttpResponse:
 | 
						|
    (stream, sub) = access_stream_for_delete_or_update_requiring_metadata_access(
 | 
						|
        user_profile, stream_id
 | 
						|
    )
 | 
						|
    do_deactivate_stream(stream, acting_user=user_profile)
 | 
						|
    return json_success(request)
 | 
						|
 | 
						|
 | 
						|
@check_if_user_can_manage_default_streams
 | 
						|
@typed_endpoint
 | 
						|
def add_default_stream(
 | 
						|
    request: HttpRequest, user_profile: UserProfile, *, stream_id: Json[int]
 | 
						|
) -> HttpResponse:
 | 
						|
    (stream, sub) = access_stream_by_id(user_profile, stream_id)
 | 
						|
    if stream.invite_only:
 | 
						|
        raise JsonableError(_("Private channels cannot be made default."))
 | 
						|
    do_add_default_stream(stream)
 | 
						|
    return json_success(request)
 | 
						|
 | 
						|
 | 
						|
@check_if_user_can_manage_default_streams
 | 
						|
@typed_endpoint
 | 
						|
def create_default_stream_group(
 | 
						|
    request: HttpRequest,
 | 
						|
    user_profile: UserProfile,
 | 
						|
    *,
 | 
						|
    description: str,
 | 
						|
    group_name: str,
 | 
						|
    stream_names: Json[list[str]],
 | 
						|
) -> HttpResponse:
 | 
						|
    streams = []
 | 
						|
    for stream_name in stream_names:
 | 
						|
        (stream, sub) = access_stream_by_name(user_profile, stream_name)
 | 
						|
        streams.append(stream)
 | 
						|
    do_create_default_stream_group(user_profile.realm, group_name, description, streams)
 | 
						|
    return json_success(request)
 | 
						|
 | 
						|
 | 
						|
@check_if_user_can_manage_default_streams
 | 
						|
@typed_endpoint
 | 
						|
def update_default_stream_group_info(
 | 
						|
    request: HttpRequest,
 | 
						|
    user_profile: UserProfile,
 | 
						|
    *,
 | 
						|
    group_id: PathOnly[int],
 | 
						|
    new_description: str | None = None,
 | 
						|
    new_group_name: str | None = None,
 | 
						|
) -> HttpResponse:
 | 
						|
    if not new_group_name and not new_description:
 | 
						|
        raise JsonableError(_('You must pass "new_description" or "new_group_name".'))
 | 
						|
 | 
						|
    group = access_default_stream_group_by_id(user_profile.realm, group_id)
 | 
						|
    if new_group_name is not None:
 | 
						|
        do_change_default_stream_group_name(user_profile.realm, group, new_group_name)
 | 
						|
    if new_description is not None:
 | 
						|
        do_change_default_stream_group_description(user_profile.realm, group, new_description)
 | 
						|
    return json_success(request)
 | 
						|
 | 
						|
 | 
						|
@check_if_user_can_manage_default_streams
 | 
						|
@typed_endpoint
 | 
						|
def update_default_stream_group_streams(
 | 
						|
    request: HttpRequest,
 | 
						|
    user_profile: UserProfile,
 | 
						|
    *,
 | 
						|
    group_id: PathOnly[int],
 | 
						|
    op: str,
 | 
						|
    stream_names: Json[list[str]],
 | 
						|
) -> HttpResponse:
 | 
						|
    group = access_default_stream_group_by_id(user_profile.realm, group_id)
 | 
						|
    streams = []
 | 
						|
    for stream_name in stream_names:
 | 
						|
        (stream, sub) = access_stream_by_name(user_profile, stream_name)
 | 
						|
        streams.append(stream)
 | 
						|
 | 
						|
    if op == "add":
 | 
						|
        do_add_streams_to_default_stream_group(user_profile.realm, group, streams)
 | 
						|
    elif op == "remove":
 | 
						|
        do_remove_streams_from_default_stream_group(user_profile.realm, group, streams)
 | 
						|
    else:
 | 
						|
        raise JsonableError(_('Invalid value for "op". Specify one of "add" or "remove".'))
 | 
						|
    return json_success(request)
 | 
						|
 | 
						|
 | 
						|
@check_if_user_can_manage_default_streams
 | 
						|
@typed_endpoint
 | 
						|
def remove_default_stream_group(
 | 
						|
    request: HttpRequest, user_profile: UserProfile, *, group_id: PathOnly[int]
 | 
						|
) -> HttpResponse:
 | 
						|
    group = access_default_stream_group_by_id(user_profile.realm, group_id)
 | 
						|
    do_remove_default_stream_group(user_profile.realm, group)
 | 
						|
    return json_success(request)
 | 
						|
 | 
						|
 | 
						|
@check_if_user_can_manage_default_streams
 | 
						|
@typed_endpoint
 | 
						|
def remove_default_stream(
 | 
						|
    request: HttpRequest, user_profile: UserProfile, *, stream_id: Json[int]
 | 
						|
) -> HttpResponse:
 | 
						|
    (stream, sub) = access_stream_by_id(
 | 
						|
        user_profile,
 | 
						|
        stream_id,
 | 
						|
        require_content_access=False,
 | 
						|
    )
 | 
						|
    do_remove_default_stream(stream)
 | 
						|
    return json_success(request)
 | 
						|
 | 
						|
 | 
						|
ChannelDescription = Annotated[
 | 
						|
    str | None,
 | 
						|
    StringConstraints(max_length=Stream.MAX_DESCRIPTION_LENGTH),
 | 
						|
    # We don't allow newline characters in stream descriptions.
 | 
						|
    AfterValidator(lambda val: val.replace("\n", " ") if val is not None else None),
 | 
						|
]
 | 
						|
 | 
						|
 | 
						|
TopicsPolicy = Annotated[
 | 
						|
    str | None,
 | 
						|
    AfterValidator(
 | 
						|
        lambda val: parse_enum_from_string_value(
 | 
						|
            val,
 | 
						|
            "topics_policy",
 | 
						|
            StreamTopicsPolicyEnum,
 | 
						|
        )
 | 
						|
    ),
 | 
						|
]
 | 
						|
 | 
						|
 | 
						|
@typed_endpoint
 | 
						|
def update_stream_backend(
 | 
						|
    request: HttpRequest,
 | 
						|
    user_profile: UserProfile,
 | 
						|
    *,
 | 
						|
    can_add_subscribers_group: Json[GroupSettingChangeRequest] | None = None,
 | 
						|
    can_administer_channel_group: Json[GroupSettingChangeRequest] | None = None,
 | 
						|
    can_delete_any_message_group: Json[GroupSettingChangeRequest] | None = None,
 | 
						|
    can_delete_own_message_group: Json[GroupSettingChangeRequest] | None = None,
 | 
						|
    can_move_messages_out_of_channel_group: Json[GroupSettingChangeRequest] | None = None,
 | 
						|
    can_move_messages_within_channel_group: Json[GroupSettingChangeRequest] | None = None,
 | 
						|
    can_remove_subscribers_group: Json[GroupSettingChangeRequest] | None = None,
 | 
						|
    can_resolve_topics_group: Json[GroupSettingChangeRequest] | None = None,
 | 
						|
    can_send_message_group: Json[GroupSettingChangeRequest] | None = None,
 | 
						|
    can_subscribe_group: Json[GroupSettingChangeRequest] | None = None,
 | 
						|
    description: ChannelDescription = None,
 | 
						|
    folder_id: Json[int | None] | MissingType = Missing,
 | 
						|
    history_public_to_subscribers: Json[bool] | None = None,
 | 
						|
    is_archived: Json[bool] | None = None,
 | 
						|
    is_default_stream: Json[bool] | None = None,
 | 
						|
    is_private: Json[bool] | None = None,
 | 
						|
    is_web_public: Json[bool] | None = None,
 | 
						|
    message_retention_days: Json[str] | Json[int] | None = None,
 | 
						|
    new_name: str | None = None,
 | 
						|
    stream_id: PathOnly[int],
 | 
						|
    topics_policy: TopicsPolicy = None,
 | 
						|
) -> HttpResponse:
 | 
						|
    # Most settings updates only require metadata access, not content
 | 
						|
    # access. We will check for content access further when and where
 | 
						|
    # required.
 | 
						|
    (stream, sub) = access_stream_for_delete_or_update_requiring_metadata_access(
 | 
						|
        user_profile, stream_id
 | 
						|
    )
 | 
						|
    user_group_membership_details = UserGroupMembershipDetails(user_recursive_group_ids=None)
 | 
						|
 | 
						|
    # Validate that the proposed state for permissions settings is permitted.
 | 
						|
    if is_private is not None:
 | 
						|
        proposed_is_private = is_private
 | 
						|
    else:
 | 
						|
        proposed_is_private = stream.invite_only
 | 
						|
 | 
						|
    if is_web_public is not None:
 | 
						|
        proposed_is_web_public = is_web_public
 | 
						|
    else:
 | 
						|
        proposed_is_web_public = stream.is_web_public
 | 
						|
 | 
						|
    if is_default_stream is not None:
 | 
						|
        proposed_is_default_stream = is_default_stream
 | 
						|
    else:
 | 
						|
        default_stream_ids = get_default_stream_ids_for_realm(stream.realm_id)
 | 
						|
        proposed_is_default_stream = stream.id in default_stream_ids
 | 
						|
 | 
						|
    if stream.realm.is_zephyr_mirror_realm:
 | 
						|
        # In the Zephyr mirroring model, history is unconditionally
 | 
						|
        # not public to subscribers, even for public streams.
 | 
						|
        proposed_history_public_to_subscribers = False
 | 
						|
    elif history_public_to_subscribers is not None:
 | 
						|
        proposed_history_public_to_subscribers = history_public_to_subscribers
 | 
						|
    elif is_private is not None:
 | 
						|
        # By default, private streams have protected history while for
 | 
						|
        # public streams history is public by default.
 | 
						|
        proposed_history_public_to_subscribers = not is_private
 | 
						|
    else:
 | 
						|
        proposed_history_public_to_subscribers = stream.history_public_to_subscribers
 | 
						|
 | 
						|
    # Web-public streams must have subscriber-public history.
 | 
						|
    if proposed_is_web_public and not proposed_history_public_to_subscribers:
 | 
						|
        raise JsonableError(_("Invalid parameters"))
 | 
						|
 | 
						|
    # Web-public streams must not be private.
 | 
						|
    if proposed_is_web_public and proposed_is_private:
 | 
						|
        raise JsonableError(_("Invalid parameters"))
 | 
						|
 | 
						|
    # Public streams must be public to subscribers.
 | 
						|
    if not proposed_is_private and not proposed_history_public_to_subscribers:
 | 
						|
        if stream.realm.is_zephyr_mirror_realm:
 | 
						|
            # All Zephyr realm streams violate this rule.
 | 
						|
            pass
 | 
						|
        else:
 | 
						|
            raise JsonableError(_("Invalid parameters"))
 | 
						|
 | 
						|
    # Ensure that a stream cannot be both a default stream for new users and private
 | 
						|
    if proposed_is_private and proposed_is_default_stream:
 | 
						|
        raise JsonableError(_("A default channel cannot be private."))
 | 
						|
 | 
						|
    # Ensure that a moderation request channel isn't set to public.
 | 
						|
    if not proposed_is_private and user_profile.realm.moderation_request_channel == stream:
 | 
						|
        raise JsonableError(_("Moderation request channel must be private."))
 | 
						|
 | 
						|
    if is_private is not None and not user_has_content_access(
 | 
						|
        user_profile,
 | 
						|
        stream,
 | 
						|
        user_group_membership_details,
 | 
						|
        is_subscribed=sub is not None,
 | 
						|
    ):
 | 
						|
        raise JsonableError(_("Channel content access is required."))
 | 
						|
        # In addition to channel administration permissions, changing
 | 
						|
        # public/private status for channels requires content access
 | 
						|
        # to the channel.
 | 
						|
 | 
						|
    if is_private is not None:
 | 
						|
        if is_private and not user_profile.can_create_private_streams():
 | 
						|
            raise JsonableError(_("Insufficient permission"))
 | 
						|
 | 
						|
        if not is_private and not user_profile.can_create_public_streams():
 | 
						|
            raise JsonableError(_("Insufficient permission"))
 | 
						|
 | 
						|
    # Enforce restrictions on creating web-public streams. Since these
 | 
						|
    # checks are only required when changing a stream to be
 | 
						|
    # web-public, we don't use an "is not None" check.
 | 
						|
    if is_web_public:
 | 
						|
        if not user_profile.realm.web_public_streams_enabled():
 | 
						|
            raise JsonableError(_("Web-public channels are not enabled."))
 | 
						|
        if not user_profile.can_create_web_public_streams():
 | 
						|
            raise JsonableError(_("Insufficient permission"))
 | 
						|
 | 
						|
    validated_topics_policy = validate_topics_policy(topics_policy, user_profile, stream)
 | 
						|
    if validated_topics_policy is not None:
 | 
						|
        do_set_stream_property(stream, "topics_policy", validated_topics_policy.value, user_profile)
 | 
						|
 | 
						|
    if (
 | 
						|
        is_private is not None
 | 
						|
        or is_web_public is not None
 | 
						|
        or history_public_to_subscribers is not None
 | 
						|
    ):
 | 
						|
        do_change_stream_permission(
 | 
						|
            stream,
 | 
						|
            invite_only=proposed_is_private,
 | 
						|
            history_public_to_subscribers=proposed_history_public_to_subscribers,
 | 
						|
            is_web_public=proposed_is_web_public,
 | 
						|
            acting_user=user_profile,
 | 
						|
        )
 | 
						|
 | 
						|
    if is_default_stream is not None:
 | 
						|
        if not user_profile.can_manage_default_streams():
 | 
						|
            raise CannotManageDefaultChannelError
 | 
						|
        if is_default_stream:
 | 
						|
            do_add_default_stream(stream)
 | 
						|
        else:
 | 
						|
            do_remove_default_stream(stream)
 | 
						|
 | 
						|
    if message_retention_days is not None:
 | 
						|
        if not user_profile.is_realm_owner:
 | 
						|
            raise OrganizationOwnerRequiredError
 | 
						|
        user_profile.realm.ensure_not_on_limited_plan()
 | 
						|
        new_message_retention_days_value = parse_message_retention_days(
 | 
						|
            message_retention_days, Stream.MESSAGE_RETENTION_SPECIAL_VALUES_MAP
 | 
						|
        )
 | 
						|
        do_change_stream_message_retention_days(
 | 
						|
            stream, user_profile, new_message_retention_days_value
 | 
						|
        )
 | 
						|
 | 
						|
    if is_archived is not None and not is_archived:
 | 
						|
        do_unarchive_stream(stream, stream.name, acting_user=None)
 | 
						|
 | 
						|
    if (
 | 
						|
        can_delete_any_message_group is not None or can_delete_own_message_group is not None
 | 
						|
    ) and not user_profile.can_set_delete_message_policy():
 | 
						|
        raise JsonableError(_("Insufficient permission"))
 | 
						|
 | 
						|
    if description is not None:
 | 
						|
        do_change_stream_description(stream, description, acting_user=user_profile)
 | 
						|
    if new_name is not None:
 | 
						|
        new_name = new_name.strip()
 | 
						|
        if stream.name == new_name:
 | 
						|
            raise JsonableError(_("Channel already has that name."))
 | 
						|
        if stream.name.lower() != new_name.lower():
 | 
						|
            # Check that the stream name is available (unless we are
 | 
						|
            # are only changing the casing of the stream name).
 | 
						|
            check_stream_name_available(user_profile.realm, new_name)
 | 
						|
        do_rename_stream(stream, new_name, user_profile)
 | 
						|
 | 
						|
    if not isinstance(folder_id, MissingType):
 | 
						|
        folder: ChannelFolder | None = None
 | 
						|
        if folder_id is not None:
 | 
						|
            folder = get_channel_folder_by_id(folder_id, user_profile.realm)
 | 
						|
        do_change_stream_folder(stream, folder, acting_user=user_profile)
 | 
						|
 | 
						|
    nobody_group = get_system_user_group_by_name(SystemGroups.NOBODY, user_profile.realm_id)
 | 
						|
    request_settings_dict = locals()
 | 
						|
    for setting_name, permission_configuration in Stream.stream_permission_group_settings.items():
 | 
						|
        assert setting_name in request_settings_dict
 | 
						|
        if request_settings_dict[setting_name] is None:
 | 
						|
            continue
 | 
						|
 | 
						|
        setting_value = request_settings_dict[setting_name]
 | 
						|
        new_setting_value = parse_group_setting_value(setting_value.new, nobody_group)
 | 
						|
 | 
						|
        expected_current_setting_value = None
 | 
						|
        if setting_value.old is not None:
 | 
						|
            expected_current_setting_value = parse_group_setting_value(
 | 
						|
                setting_value.old, nobody_group
 | 
						|
            )
 | 
						|
 | 
						|
        current_value = getattr(stream, setting_name)
 | 
						|
        current_setting_api_value = get_group_setting_value_for_api(current_value)
 | 
						|
 | 
						|
        if validate_group_setting_value_change(
 | 
						|
            current_setting_api_value, new_setting_value, expected_current_setting_value
 | 
						|
        ):
 | 
						|
            if (
 | 
						|
                setting_name in Stream.stream_permission_group_settings_requiring_content_access
 | 
						|
                and not user_has_content_access(
 | 
						|
                    user_profile,
 | 
						|
                    stream,
 | 
						|
                    user_group_membership_details,
 | 
						|
                    is_subscribed=sub is not None,
 | 
						|
                )
 | 
						|
            ):
 | 
						|
                raise JsonableError(_("Channel content access is required."))
 | 
						|
 | 
						|
            with transaction.atomic(durable=True):
 | 
						|
                user_group_api_value_for_setting = access_user_group_api_value_for_setting(
 | 
						|
                    new_setting_value,
 | 
						|
                    user_profile.realm,
 | 
						|
                    setting_name=setting_name,
 | 
						|
                    permission_configuration=permission_configuration,
 | 
						|
                )
 | 
						|
                do_change_stream_group_based_setting(
 | 
						|
                    stream,
 | 
						|
                    setting_name,
 | 
						|
                    new_setting_value=user_group_api_value_for_setting,
 | 
						|
                    old_setting_api_value=current_setting_api_value,
 | 
						|
                    acting_user=user_profile,
 | 
						|
                )
 | 
						|
 | 
						|
    return json_success(request)
 | 
						|
 | 
						|
 | 
						|
def parse_include_subscribers(
 | 
						|
    include_subscribers: Literal["true", "false", "partial"],
 | 
						|
) -> bool | Literal["partial"]:
 | 
						|
    if include_subscribers == "true":
 | 
						|
        return True
 | 
						|
    if include_subscribers == "false":
 | 
						|
        return False
 | 
						|
    return include_subscribers
 | 
						|
 | 
						|
 | 
						|
@typed_endpoint
 | 
						|
def list_subscriptions_backend(
 | 
						|
    request: HttpRequest,
 | 
						|
    user_profile: UserProfile,
 | 
						|
    *,
 | 
						|
    include_subscribers: Literal["true", "false", "partial"] = "false",
 | 
						|
) -> HttpResponse:
 | 
						|
    subscribed, _ = gather_subscriptions(
 | 
						|
        user_profile,
 | 
						|
        include_subscribers=parse_include_subscribers(include_subscribers),
 | 
						|
    )
 | 
						|
    return json_success(request, data={"subscriptions": subscribed})
 | 
						|
 | 
						|
 | 
						|
class AddSubscriptionData(BaseModel):
 | 
						|
    name: str
 | 
						|
    color: str | None = None
 | 
						|
    description: ChannelDescription = None
 | 
						|
 | 
						|
    @model_validator(mode="after")
 | 
						|
    def validate_terms(self) -> "AddSubscriptionData":
 | 
						|
        if self.color is not None:
 | 
						|
            self.color = check_color("add.color", self.color)
 | 
						|
        return self
 | 
						|
 | 
						|
 | 
						|
@typed_endpoint
 | 
						|
def update_subscriptions_backend(
 | 
						|
    request: HttpRequest,
 | 
						|
    user_profile: UserProfile,
 | 
						|
    *,
 | 
						|
    add: Json[list[AddSubscriptionData]] | None = None,
 | 
						|
    delete: Json[list[str]] | None = None,
 | 
						|
) -> HttpResponse:
 | 
						|
    if delete is None:
 | 
						|
        delete = []
 | 
						|
    if add is None:
 | 
						|
        add = []
 | 
						|
    if not add and not delete:
 | 
						|
        raise JsonableError(_('Nothing to do. Specify at least one of "add" or "delete".'))
 | 
						|
 | 
						|
    thunks = [
 | 
						|
        lambda: add_subscriptions_backend(request, user_profile, streams_raw=add),
 | 
						|
        lambda: remove_subscriptions_backend(request, user_profile, streams_raw=delete),
 | 
						|
    ]
 | 
						|
    data = compose_views(thunks)
 | 
						|
 | 
						|
    return json_success(request, data)
 | 
						|
 | 
						|
 | 
						|
def compose_views(thunks: list[Callable[[], HttpResponse]]) -> dict[str, Any]:
 | 
						|
    """
 | 
						|
    This takes a series of thunks and calls them in sequence, and it
 | 
						|
    smushes all the json results into a single response when
 | 
						|
    everything goes right.  (This helps clients avoid extra latency
 | 
						|
    hops.)  It rolls back the transaction when things go wrong in any
 | 
						|
    one of the composed methods.
 | 
						|
    """
 | 
						|
 | 
						|
    json_dict: dict[str, Any] = {}
 | 
						|
    with transaction.atomic(savepoint=False):
 | 
						|
        for thunk in thunks:
 | 
						|
            response = thunk()
 | 
						|
            json_dict.update(orjson.loads(response.content))
 | 
						|
    return json_dict
 | 
						|
 | 
						|
 | 
						|
@typed_endpoint
 | 
						|
def remove_subscriptions_backend(
 | 
						|
    request: HttpRequest,
 | 
						|
    user_profile: UserProfile,
 | 
						|
    *,
 | 
						|
    principals: Json[list[str] | list[int]] | None = None,
 | 
						|
    streams_raw: Annotated[Json[list[str]], ApiParamConfig("subscriptions")],
 | 
						|
) -> HttpResponse:
 | 
						|
    realm = user_profile.realm
 | 
						|
 | 
						|
    streams_as_dict: list[StreamDict] = [
 | 
						|
        {"name": stream_name.strip()} for stream_name in streams_raw
 | 
						|
    ]
 | 
						|
 | 
						|
    unsubscribing_others = False
 | 
						|
    if principals:
 | 
						|
        people_to_unsub = bulk_principals_to_user_profiles(principals, user_profile)
 | 
						|
        unsubscribing_others = any(
 | 
						|
            not user_directly_controls_user(user_profile, target) for target in people_to_unsub
 | 
						|
        )
 | 
						|
 | 
						|
    else:
 | 
						|
        people_to_unsub = {user_profile}
 | 
						|
 | 
						|
    streams, __ = list_to_streams(
 | 
						|
        streams_as_dict,
 | 
						|
        user_profile,
 | 
						|
        unsubscribing_others=unsubscribing_others,
 | 
						|
    )
 | 
						|
 | 
						|
    result: dict[str, list[str]] = dict(removed=[], not_removed=[])
 | 
						|
    (removed, not_subscribed) = bulk_remove_subscriptions(
 | 
						|
        realm, people_to_unsub, streams, acting_user=user_profile
 | 
						|
    )
 | 
						|
 | 
						|
    for subscriber, removed_stream in removed:
 | 
						|
        result["removed"].append(removed_stream.name)
 | 
						|
    for subscriber, not_subscribed_stream in not_subscribed:
 | 
						|
        result["not_removed"].append(not_subscribed_stream.name)
 | 
						|
 | 
						|
    return json_success(request, data=result)
 | 
						|
 | 
						|
 | 
						|
def you_were_just_subscribed_message(
 | 
						|
    acting_user: UserProfile, recipient_user: UserProfile, stream_names: set[str]
 | 
						|
) -> str:
 | 
						|
    subscriptions = sorted(stream_names)
 | 
						|
    if len(subscriptions) == 1:
 | 
						|
        with override_language(recipient_user.default_language):
 | 
						|
            return _("{user_full_name} subscribed you to {channel_name}.").format(
 | 
						|
                user_full_name=silent_mention_syntax_for_user(acting_user),
 | 
						|
                channel_name=f"#**{subscriptions[0]}**",
 | 
						|
            )
 | 
						|
 | 
						|
    with override_language(recipient_user.default_language):
 | 
						|
        message = _("{user_full_name} subscribed you to the following channels:").format(
 | 
						|
            user_full_name=silent_mention_syntax_for_user(acting_user),
 | 
						|
        )
 | 
						|
    message += "\n\n"
 | 
						|
    for channel_name in subscriptions:
 | 
						|
        message += f"* #**{channel_name}**\n"
 | 
						|
    return message
 | 
						|
 | 
						|
 | 
						|
RETENTION_DEFAULT: str | int = "realm_default"
 | 
						|
 | 
						|
 | 
						|
def access_requested_group_permissions(
 | 
						|
    user_profile: UserProfile,
 | 
						|
    realm: Realm,
 | 
						|
    request_settings_dict: dict[str, Any],
 | 
						|
) -> tuple[dict[str, UserGroup], dict[int, UserGroupMembersData]]:
 | 
						|
    anonymous_group_membership = {}
 | 
						|
    group_settings_map = {}
 | 
						|
    system_groups_name_dict = get_role_based_system_groups_dict(realm)
 | 
						|
    for setting_name, permission_configuration in Stream.stream_permission_group_settings.items():
 | 
						|
        assert setting_name in request_settings_dict
 | 
						|
        if request_settings_dict[setting_name] is not None:
 | 
						|
            setting_request_value = request_settings_dict[setting_name]
 | 
						|
            setting_value = parse_group_setting_value(
 | 
						|
                setting_request_value, system_groups_name_dict[SystemGroups.NOBODY]
 | 
						|
            )
 | 
						|
            group_settings_map[setting_name] = access_user_group_for_setting(
 | 
						|
                setting_value,
 | 
						|
                user_profile,
 | 
						|
                setting_name=setting_name,
 | 
						|
                permission_configuration=permission_configuration,
 | 
						|
            )
 | 
						|
            if (
 | 
						|
                setting_name in ["can_delete_any_message_group", "can_delete_own_message_group"]
 | 
						|
                and group_settings_map[setting_name].id
 | 
						|
                != system_groups_name_dict[SystemGroups.NOBODY].id
 | 
						|
                and not user_profile.can_set_delete_message_policy()
 | 
						|
            ):
 | 
						|
                raise JsonableError(_("Insufficient permission"))
 | 
						|
 | 
						|
            if not isinstance(setting_value, int):
 | 
						|
                anonymous_group_membership[group_settings_map[setting_name].id] = setting_value
 | 
						|
        else:
 | 
						|
            group_settings_map[setting_name] = get_stream_permission_default_group(
 | 
						|
                setting_name, system_groups_name_dict, creator=user_profile
 | 
						|
            )
 | 
						|
            if permission_configuration.default_group_name == "stream_creator_or_nobody":
 | 
						|
                # Default for some settings like "can_administer_channel_group"
 | 
						|
                # is anonymous group with stream creator.
 | 
						|
                anonymous_group_membership[group_settings_map[setting_name].id] = (
 | 
						|
                    UserGroupMembersData(direct_subgroups=[], direct_members=[user_profile.id])
 | 
						|
                )
 | 
						|
    return group_settings_map, anonymous_group_membership
 | 
						|
 | 
						|
 | 
						|
@transaction.atomic(savepoint=False)
 | 
						|
@require_non_guest_user
 | 
						|
@typed_endpoint
 | 
						|
def create_channel(
 | 
						|
    request: HttpRequest,
 | 
						|
    user_profile: UserProfile,
 | 
						|
    *,
 | 
						|
    announce: Json[bool] = False,
 | 
						|
    can_add_subscribers_group: Json[int | UserGroupMembersData] | None = None,
 | 
						|
    can_delete_any_message_group: Json[int | UserGroupMembersData] | None = None,
 | 
						|
    can_delete_own_message_group: Json[int | UserGroupMembersData] | None = None,
 | 
						|
    can_administer_channel_group: Json[int | UserGroupMembersData] | None = None,
 | 
						|
    can_move_messages_out_of_channel_group: Json[int | UserGroupMembersData] | None = None,
 | 
						|
    can_move_messages_within_channel_group: Json[int | UserGroupMembersData] | None = None,
 | 
						|
    can_remove_subscribers_group: Json[int | UserGroupMembersData] | None = None,
 | 
						|
    can_resolve_topics_group: Json[int | UserGroupMembersData] | None = None,
 | 
						|
    can_send_message_group: Json[int | UserGroupMembersData] | None = None,
 | 
						|
    can_subscribe_group: Json[int | UserGroupMembersData] | None = None,
 | 
						|
    description: ChannelDescription = None,
 | 
						|
    folder_id: Json[int] | None = None,
 | 
						|
    history_public_to_subscribers: Json[bool] | None = None,
 | 
						|
    invite_only: Json[bool] = False,
 | 
						|
    is_web_public: Json[bool] = False,
 | 
						|
    is_default_stream: Json[bool] = False,
 | 
						|
    message_retention_days: Json[str] | Json[int] = RETENTION_DEFAULT,
 | 
						|
    name: Annotated[str, StringConstraints(strip_whitespace=True, min_length=1)],
 | 
						|
    send_new_subscription_messages: Json[bool] = True,
 | 
						|
    subscribers: Json[list[int]],
 | 
						|
    topics_policy: Json[TopicsPolicy] = None,
 | 
						|
) -> HttpResponse:
 | 
						|
    realm = user_profile.realm
 | 
						|
    request_settings_dict = locals()
 | 
						|
 | 
						|
    check_stream_name_available(realm, name)
 | 
						|
 | 
						|
    folder: ChannelFolder | None = None
 | 
						|
    if folder_id is not None:
 | 
						|
        folder = get_channel_folder_by_id(folder_id, realm)
 | 
						|
 | 
						|
    if description is None:
 | 
						|
        description = ""
 | 
						|
 | 
						|
    parsed_message_retention_days = parse_message_retention_days(
 | 
						|
        message_retention_days, Stream.MESSAGE_RETENTION_SPECIAL_VALUES_MAP
 | 
						|
    )
 | 
						|
 | 
						|
    topics_policy_value = None
 | 
						|
    validated_topics_policy = validate_topics_policy(topics_policy, user_profile)
 | 
						|
    if validated_topics_policy is not None:
 | 
						|
        topics_policy_value = validated_topics_policy.value
 | 
						|
 | 
						|
    is_subscribing_other_users = False
 | 
						|
    if len(subscribers) > 0 and not all(user_id == user_profile.id for user_id in subscribers):
 | 
						|
        is_subscribing_other_users = True
 | 
						|
 | 
						|
    check_zephyr_realm_invite_conditions(is_subscribing_other_users, realm, invite_only)
 | 
						|
 | 
						|
    check_channel_creation_permissions(
 | 
						|
        user_profile,
 | 
						|
        is_default_stream=is_default_stream,
 | 
						|
        invite_only=invite_only,
 | 
						|
        is_web_public=is_web_public,
 | 
						|
        message_retention_days=parsed_message_retention_days,
 | 
						|
    )
 | 
						|
 | 
						|
    group_settings_map, anonymous_group_membership = access_requested_group_permissions(
 | 
						|
        user_profile,
 | 
						|
        realm,
 | 
						|
        request_settings_dict,
 | 
						|
    )
 | 
						|
 | 
						|
    new_channel, created = create_stream_if_needed(
 | 
						|
        realm,
 | 
						|
        name,
 | 
						|
        stream_description=description,
 | 
						|
        invite_only=invite_only,
 | 
						|
        history_public_to_subscribers=history_public_to_subscribers,
 | 
						|
        is_web_public=is_web_public,
 | 
						|
        message_retention_days=parsed_message_retention_days,
 | 
						|
        anonymous_group_membership=anonymous_group_membership,
 | 
						|
        acting_user=user_profile,
 | 
						|
        can_add_subscribers_group=group_settings_map["can_add_subscribers_group"],
 | 
						|
        can_administer_channel_group=group_settings_map["can_administer_channel_group"],
 | 
						|
        can_move_messages_out_of_channel_group=group_settings_map[
 | 
						|
            "can_move_messages_out_of_channel_group"
 | 
						|
        ],
 | 
						|
        can_move_messages_within_channel_group=group_settings_map[
 | 
						|
            "can_move_messages_within_channel_group"
 | 
						|
        ],
 | 
						|
        can_send_message_group=group_settings_map["can_send_message_group"],
 | 
						|
        can_remove_subscribers_group=group_settings_map["can_remove_subscribers_group"],
 | 
						|
        can_subscribe_group=group_settings_map["can_subscribe_group"],
 | 
						|
        can_resolve_topics_group=group_settings_map["can_resolve_topics_group"],
 | 
						|
        folder=folder,
 | 
						|
        topics_policy=topics_policy_value,
 | 
						|
    )
 | 
						|
 | 
						|
    if is_default_stream:
 | 
						|
        do_add_default_stream(new_channel)
 | 
						|
 | 
						|
    if len(subscribers) == 0:
 | 
						|
        return json_success(
 | 
						|
            request,
 | 
						|
            data={"id": new_channel.id},
 | 
						|
        )
 | 
						|
 | 
						|
    new_subscribers = bulk_principals_to_user_profiles(subscribers, user_profile)
 | 
						|
    bulk_add_subscriptions(
 | 
						|
        realm,
 | 
						|
        [new_channel],
 | 
						|
        new_subscribers,
 | 
						|
        acting_user=user_profile,
 | 
						|
    )
 | 
						|
    if (
 | 
						|
        send_new_subscription_messages
 | 
						|
        and len(new_subscribers) <= settings.MAX_BULK_NEW_SUBSCRIPTION_MESSAGES
 | 
						|
    ):
 | 
						|
        send_user_subscribed_and_new_channel_notifications(
 | 
						|
            user_profile=user_profile,
 | 
						|
            subscribers=new_subscribers,
 | 
						|
            new_subscriptions={str(user.id): [name] for user in new_subscribers},
 | 
						|
            id_to_user_profile={str(user.id): user for user in new_subscribers},
 | 
						|
            created_streams=[new_channel],
 | 
						|
            announce=announce,
 | 
						|
        )
 | 
						|
 | 
						|
    return json_success(
 | 
						|
        request,
 | 
						|
        data={"id": new_channel.id},
 | 
						|
    )
 | 
						|
 | 
						|
 | 
						|
@transaction.atomic(savepoint=False)
 | 
						|
@require_non_guest_user
 | 
						|
@typed_endpoint
 | 
						|
def add_subscriptions_backend(
 | 
						|
    request: HttpRequest,
 | 
						|
    user_profile: UserProfile,
 | 
						|
    *,
 | 
						|
    announce: Json[bool] = False,
 | 
						|
    authorization_errors_fatal: Json[bool] = True,
 | 
						|
    can_add_subscribers_group: Json[int | UserGroupMembersData] | None = None,
 | 
						|
    can_delete_any_message_group: Json[int | UserGroupMembersData] | None = None,
 | 
						|
    can_delete_own_message_group: Json[int | UserGroupMembersData] | None = None,
 | 
						|
    can_administer_channel_group: Json[int | UserGroupMembersData] | None = None,
 | 
						|
    can_move_messages_out_of_channel_group: Json[int | UserGroupMembersData] | None = None,
 | 
						|
    can_move_messages_within_channel_group: Json[int | UserGroupMembersData] | None = None,
 | 
						|
    can_remove_subscribers_group: Json[int | UserGroupMembersData] | None = None,
 | 
						|
    can_resolve_topics_group: Json[int | UserGroupMembersData] | None = None,
 | 
						|
    can_send_message_group: Json[int | UserGroupMembersData] | None = None,
 | 
						|
    can_subscribe_group: Json[int | UserGroupMembersData] | None = None,
 | 
						|
    folder_id: Json[int] | None = None,
 | 
						|
    history_public_to_subscribers: Json[bool] | None = None,
 | 
						|
    invite_only: Json[bool] = False,
 | 
						|
    is_default_stream: Json[bool] = False,
 | 
						|
    is_web_public: Json[bool] = False,
 | 
						|
    message_retention_days: Json[str] | Json[int] = RETENTION_DEFAULT,
 | 
						|
    principals: Json[list[str] | list[int]] | None = None,
 | 
						|
    send_new_subscription_messages: Json[bool] = True,
 | 
						|
    streams_raw: Annotated[Json[list[AddSubscriptionData]], ApiParamConfig("subscriptions")],
 | 
						|
    topics_policy: Json[TopicsPolicy] = None,
 | 
						|
) -> HttpResponse:
 | 
						|
    realm = user_profile.realm
 | 
						|
    stream_dicts = []
 | 
						|
    color_map = {}
 | 
						|
    # UserProfile ids or emails.
 | 
						|
    if principals is None:
 | 
						|
        principals = []
 | 
						|
 | 
						|
    request_settings_dict = locals()
 | 
						|
    group_settings_map, anonymous_group_membership = access_requested_group_permissions(
 | 
						|
        user_profile,
 | 
						|
        realm,
 | 
						|
        request_settings_dict,
 | 
						|
    )
 | 
						|
 | 
						|
    folder: ChannelFolder | None = None
 | 
						|
    if folder_id is not None:
 | 
						|
        folder = get_channel_folder_by_id(folder_id, realm)
 | 
						|
 | 
						|
    for stream_obj in streams_raw:
 | 
						|
        # 'color' field is optional
 | 
						|
        # check for its presence in the streams_raw first
 | 
						|
        if stream_obj.color is not None:
 | 
						|
            color_map[stream_obj.name] = stream_obj.color
 | 
						|
 | 
						|
        stream_dict_copy: StreamDict = {}
 | 
						|
        stream_dict_copy["name"] = stream_obj.name.strip()
 | 
						|
 | 
						|
        if stream_obj.description is not None:
 | 
						|
            stream_dict_copy["description"] = stream_obj.description
 | 
						|
 | 
						|
        stream_dict_copy["invite_only"] = invite_only
 | 
						|
        stream_dict_copy["is_web_public"] = is_web_public
 | 
						|
        stream_dict_copy["history_public_to_subscribers"] = history_public_to_subscribers
 | 
						|
        stream_dict_copy["message_retention_days"] = parse_message_retention_days(
 | 
						|
            message_retention_days, Stream.MESSAGE_RETENTION_SPECIAL_VALUES_MAP
 | 
						|
        )
 | 
						|
        validated_topics_policy = validate_topics_policy(topics_policy, user_profile)
 | 
						|
        if validated_topics_policy is not None:
 | 
						|
            stream_dict_copy["topics_policy"] = validated_topics_policy.value
 | 
						|
        stream_dict_copy["can_add_subscribers_group"] = group_settings_map[
 | 
						|
            "can_add_subscribers_group"
 | 
						|
        ]
 | 
						|
        stream_dict_copy["can_administer_channel_group"] = group_settings_map[
 | 
						|
            "can_administer_channel_group"
 | 
						|
        ]
 | 
						|
        stream_dict_copy["can_delete_any_message_group"] = group_settings_map[
 | 
						|
            "can_delete_any_message_group"
 | 
						|
        ]
 | 
						|
        stream_dict_copy["can_delete_own_message_group"] = group_settings_map[
 | 
						|
            "can_delete_own_message_group"
 | 
						|
        ]
 | 
						|
        stream_dict_copy["can_move_messages_out_of_channel_group"] = group_settings_map[
 | 
						|
            "can_move_messages_out_of_channel_group"
 | 
						|
        ]
 | 
						|
        stream_dict_copy["can_move_messages_within_channel_group"] = group_settings_map[
 | 
						|
            "can_move_messages_within_channel_group"
 | 
						|
        ]
 | 
						|
        stream_dict_copy["can_send_message_group"] = group_settings_map["can_send_message_group"]
 | 
						|
        stream_dict_copy["can_remove_subscribers_group"] = group_settings_map[
 | 
						|
            "can_remove_subscribers_group"
 | 
						|
        ]
 | 
						|
        stream_dict_copy["can_resolve_topics_group"] = group_settings_map[
 | 
						|
            "can_resolve_topics_group"
 | 
						|
        ]
 | 
						|
        stream_dict_copy["can_subscribe_group"] = group_settings_map["can_subscribe_group"]
 | 
						|
        stream_dict_copy["folder"] = folder
 | 
						|
 | 
						|
        stream_dicts.append(stream_dict_copy)
 | 
						|
 | 
						|
    is_subscribing_other_users = False
 | 
						|
    if len(principals) > 0 and not all(user_id == user_profile.id for user_id in principals):
 | 
						|
        is_subscribing_other_users = True
 | 
						|
 | 
						|
    # Validation of the streams arguments, including enforcement of
 | 
						|
    # can_create_streams policy and check_stream_name policy is inside
 | 
						|
    # list_to_streams.
 | 
						|
    existing_streams, created_streams = list_to_streams(
 | 
						|
        stream_dicts,
 | 
						|
        user_profile,
 | 
						|
        autocreate=True,
 | 
						|
        is_default_stream=is_default_stream,
 | 
						|
        anonymous_group_membership=anonymous_group_membership,
 | 
						|
    )
 | 
						|
 | 
						|
    streams_categorized_by_permissions = filter_stream_authorization_for_adding_subscribers(
 | 
						|
        user_profile, existing_streams, is_subscribing_other_users
 | 
						|
    )
 | 
						|
    authorized_streams = streams_categorized_by_permissions.authorized_streams
 | 
						|
    unauthorized_streams = streams_categorized_by_permissions.unauthorized_streams
 | 
						|
    streams_to_which_user_cannot_add_subscribers = (
 | 
						|
        streams_categorized_by_permissions.streams_to_which_user_cannot_add_subscribers
 | 
						|
    )
 | 
						|
 | 
						|
    if len(unauthorized_streams) > 0 and authorization_errors_fatal:
 | 
						|
        raise JsonableError(
 | 
						|
            _("Unable to access channel ({channel_name}).").format(
 | 
						|
                channel_name=unauthorized_streams[0].name,
 | 
						|
            )
 | 
						|
        )
 | 
						|
    if len(streams_to_which_user_cannot_add_subscribers) > 0:
 | 
						|
        raise JsonableError(_("Insufficient permission"))
 | 
						|
 | 
						|
    # Newly created streams are also authorized for the creator
 | 
						|
    streams = authorized_streams + created_streams
 | 
						|
 | 
						|
    for stream in streams:
 | 
						|
        check_zephyr_realm_invite_conditions(is_subscribing_other_users, realm, stream.invite_only)
 | 
						|
 | 
						|
    if is_subscribing_other_users:
 | 
						|
        subscribers = bulk_principals_to_user_profiles(principals, user_profile)
 | 
						|
    else:
 | 
						|
        subscribers = {user_profile}
 | 
						|
 | 
						|
    if is_default_stream:
 | 
						|
        for stream in created_streams:
 | 
						|
            do_add_default_stream(stream)
 | 
						|
 | 
						|
    (subscribed, already_subscribed) = bulk_add_subscriptions(
 | 
						|
        realm, streams, subscribers, acting_user=user_profile, color_map=color_map
 | 
						|
    )
 | 
						|
 | 
						|
    id_to_user_profile: dict[str, UserProfile] = {}
 | 
						|
 | 
						|
    result: dict[str, Any] = dict(
 | 
						|
        subscribed=defaultdict(list), already_subscribed=defaultdict(list)
 | 
						|
    )
 | 
						|
    for sub_info in subscribed:
 | 
						|
        subscriber = sub_info.user
 | 
						|
        stream = sub_info.stream
 | 
						|
        user_id = str(subscriber.id)
 | 
						|
        result["subscribed"][user_id].append(stream.name)
 | 
						|
        id_to_user_profile[user_id] = subscriber
 | 
						|
    for sub_info in already_subscribed:
 | 
						|
        subscriber = sub_info.user
 | 
						|
        stream = sub_info.stream
 | 
						|
        user_id = str(subscriber.id)
 | 
						|
        result["already_subscribed"][user_id].append(stream.name)
 | 
						|
 | 
						|
    result["subscribed"] = dict(result["subscribed"])
 | 
						|
    result["already_subscribed"] = dict(result["already_subscribed"])
 | 
						|
 | 
						|
    if send_new_subscription_messages:
 | 
						|
        if len(result["subscribed"]) <= settings.MAX_BULK_NEW_SUBSCRIPTION_MESSAGES:
 | 
						|
            send_user_subscribed_and_new_channel_notifications(
 | 
						|
                user_profile=user_profile,
 | 
						|
                subscribers=subscribers,
 | 
						|
                new_subscriptions=result["subscribed"],
 | 
						|
                id_to_user_profile=id_to_user_profile,
 | 
						|
                created_streams=created_streams,
 | 
						|
                announce=announce,
 | 
						|
            )
 | 
						|
            result["new_subscription_messages_sent"] = True
 | 
						|
        else:
 | 
						|
            result["new_subscription_messages_sent"] = False
 | 
						|
 | 
						|
    result["subscribed"] = dict(result["subscribed"])
 | 
						|
    result["already_subscribed"] = dict(result["already_subscribed"])
 | 
						|
    if not authorization_errors_fatal:
 | 
						|
        result["unauthorized"] = [s.name for s in unauthorized_streams]
 | 
						|
    return json_success(request, data=result)
 | 
						|
 | 
						|
 | 
						|
def send_user_subscribed_and_new_channel_notifications(
 | 
						|
    user_profile: UserProfile,
 | 
						|
    subscribers: set[UserProfile],
 | 
						|
    new_subscriptions: dict[str, list[str]],
 | 
						|
    id_to_user_profile: dict[str, UserProfile],
 | 
						|
    created_streams: list[Stream],
 | 
						|
    announce: bool,
 | 
						|
) -> None:
 | 
						|
    """
 | 
						|
    If you are subscribing lots of new users to new streams,
 | 
						|
    this function can be pretty expensive in terms of generating
 | 
						|
    lots of queries and sending lots of messages.  We isolate
 | 
						|
    the code partly to make it easier to test things like
 | 
						|
    excessive query counts by mocking this function so that it
 | 
						|
    doesn't drown out query counts from other code.
 | 
						|
    """
 | 
						|
    bots = {str(subscriber.id): subscriber.is_bot for subscriber in subscribers}
 | 
						|
 | 
						|
    newly_created_stream_names = {s.name for s in created_streams}
 | 
						|
 | 
						|
    realm = user_profile.realm
 | 
						|
    mention_backend = MentionBackend(realm.id)
 | 
						|
 | 
						|
    # Inform the user if someone else subscribed them to stuff,
 | 
						|
    # or if a new stream was created with the "announce" option.
 | 
						|
    notifications = []
 | 
						|
    if new_subscriptions:
 | 
						|
        for id, subscribed_stream_names in new_subscriptions.items():
 | 
						|
            if id == str(user_profile.id):
 | 
						|
                # Don't send a notification DM if you subscribed yourself.
 | 
						|
                continue
 | 
						|
            if bots[id]:
 | 
						|
                # Don't send notification DMs to bots.
 | 
						|
                continue
 | 
						|
 | 
						|
            # For each user, we notify them about newly subscribed streams, except for
 | 
						|
            # streams that were newly created.
 | 
						|
            notify_stream_names = set(subscribed_stream_names) - newly_created_stream_names
 | 
						|
 | 
						|
            if not notify_stream_names:
 | 
						|
                continue
 | 
						|
 | 
						|
            recipient_user = id_to_user_profile[id]
 | 
						|
            sender = get_system_bot(settings.NOTIFICATION_BOT, recipient_user.realm_id)
 | 
						|
 | 
						|
            msg = you_were_just_subscribed_message(
 | 
						|
                acting_user=user_profile,
 | 
						|
                recipient_user=recipient_user,
 | 
						|
                stream_names=notify_stream_names,
 | 
						|
            )
 | 
						|
 | 
						|
            notifications.append(
 | 
						|
                internal_prep_private_message(
 | 
						|
                    sender=sender,
 | 
						|
                    recipient_user=recipient_user,
 | 
						|
                    content=msg,
 | 
						|
                    mention_backend=mention_backend,
 | 
						|
                    acting_user=user_profile,
 | 
						|
                )
 | 
						|
            )
 | 
						|
 | 
						|
    if announce and len(created_streams) > 0:
 | 
						|
        new_stream_announcements_stream = user_profile.realm.new_stream_announcements_stream
 | 
						|
        if new_stream_announcements_stream is not None:
 | 
						|
            with override_language(new_stream_announcements_stream.realm.default_language):
 | 
						|
                if len(created_streams) > 1:
 | 
						|
                    content = _("{user_name} created the following channels: {new_channels}.")
 | 
						|
                else:
 | 
						|
                    content = _("{user_name} created a new channel {new_channels}.")
 | 
						|
                topic_name = _("new channels")
 | 
						|
                if (
 | 
						|
                    new_stream_announcements_stream.topics_policy
 | 
						|
                    == StreamTopicsPolicyEnum.empty_topic_only.value
 | 
						|
                ):
 | 
						|
                    topic_name = ""
 | 
						|
 | 
						|
            content = content.format(
 | 
						|
                user_name=silent_mention_syntax_for_user(user_profile),
 | 
						|
                new_channels=", ".join(
 | 
						|
                    f"{get_stream_link_syntax(s.id, s.name)}" for s in created_streams
 | 
						|
                ),
 | 
						|
            )
 | 
						|
 | 
						|
            sender = get_system_bot(
 | 
						|
                settings.NOTIFICATION_BOT, new_stream_announcements_stream.realm_id
 | 
						|
            )
 | 
						|
 | 
						|
            notifications.append(
 | 
						|
                internal_prep_stream_message(
 | 
						|
                    sender=sender,
 | 
						|
                    stream=new_stream_announcements_stream,
 | 
						|
                    topic_name=topic_name,
 | 
						|
                    content=content,
 | 
						|
                ),
 | 
						|
            )
 | 
						|
 | 
						|
    if not user_profile.realm.is_zephyr_mirror_realm and len(created_streams) > 0:
 | 
						|
        sender = get_system_bot(settings.NOTIFICATION_BOT, user_profile.realm_id)
 | 
						|
        for stream in created_streams:
 | 
						|
            with override_language(stream.realm.default_language):
 | 
						|
                if stream.description == "":
 | 
						|
                    stream_description = "*" + _("No description.") + "*"
 | 
						|
                else:
 | 
						|
                    stream_description = stream.description
 | 
						|
 | 
						|
                policy_key = get_stream_permission_policy_key(
 | 
						|
                    invite_only=stream.invite_only,
 | 
						|
                    history_public_to_subscribers=stream.history_public_to_subscribers,
 | 
						|
                    is_web_public=stream.is_web_public,
 | 
						|
                )
 | 
						|
                new_channel_message = None
 | 
						|
 | 
						|
                # Policy `public_protected_history` is missing here as those channels don't get
 | 
						|
                # channel creation notification.
 | 
						|
                if policy_key == "web_public":
 | 
						|
                    new_channel_message = _(
 | 
						|
                        "**Web-public** channel created by {user_name}. **Description:**"
 | 
						|
                    )
 | 
						|
                elif policy_key == "public":
 | 
						|
                    new_channel_message = _(
 | 
						|
                        "**Public** channel created by {user_name}. **Description:**"
 | 
						|
                    )
 | 
						|
                elif policy_key == "private_shared_history":
 | 
						|
                    new_channel_message = _(
 | 
						|
                        "**Private, shared history** channel created by {user_name}. **Description:**"
 | 
						|
                    )
 | 
						|
                elif policy_key == "private_protected_history":
 | 
						|
                    new_channel_message = _(
 | 
						|
                        "**Private, protected history** channel created by {user_name}. **Description:**"
 | 
						|
                    )
 | 
						|
 | 
						|
                assert new_channel_message is not None
 | 
						|
                notifications.append(
 | 
						|
                    internal_prep_stream_message(
 | 
						|
                        sender=sender,
 | 
						|
                        stream=stream,
 | 
						|
                        topic_name=channel_events_topic_name(stream),
 | 
						|
                        content=new_channel_message.format(
 | 
						|
                            user_name=silent_mention_syntax_for_user(user_profile),
 | 
						|
                        )
 | 
						|
                        + f"\n```` quote\n{stream_description}\n````",
 | 
						|
                    ),
 | 
						|
                )
 | 
						|
 | 
						|
    if len(notifications) > 0:
 | 
						|
        do_send_messages(notifications, mark_as_read=[user_profile.id])
 | 
						|
 | 
						|
 | 
						|
@typed_endpoint
 | 
						|
def get_subscribers_backend(
 | 
						|
    request: HttpRequest,
 | 
						|
    user_profile: UserProfile,
 | 
						|
    *,
 | 
						|
    stream_id: Annotated[NonNegativeInt, ApiParamConfig("stream", path_only=True)],
 | 
						|
) -> HttpResponse:
 | 
						|
    (stream, sub) = access_stream_by_id(
 | 
						|
        user_profile,
 | 
						|
        stream_id,
 | 
						|
        require_content_access=False,
 | 
						|
    )
 | 
						|
    subscribers = get_subscriber_ids(stream, user_profile)
 | 
						|
 | 
						|
    return json_success(request, data={"subscribers": list(subscribers)})
 | 
						|
 | 
						|
 | 
						|
# By default, lists all streams that the user has access to --
 | 
						|
# i.e. public streams plus invite-only streams that the user is on
 | 
						|
@typed_endpoint
 | 
						|
def get_streams_backend(
 | 
						|
    request: HttpRequest,
 | 
						|
    user_profile: UserProfile,
 | 
						|
    *,
 | 
						|
    exclude_archived: Json[bool] = True,
 | 
						|
    include_all: Json[bool] = False,
 | 
						|
    include_all_active: Json[bool] = False,
 | 
						|
    include_can_access_content: Json[bool] = False,
 | 
						|
    include_default: Json[bool] = False,
 | 
						|
    include_owner_subscribed: Json[bool] = False,
 | 
						|
    include_public: Json[bool] = True,
 | 
						|
    include_subscribed: Json[bool] = True,
 | 
						|
    include_web_public: Json[bool] = False,
 | 
						|
) -> HttpResponse:
 | 
						|
    if include_all_active is True:
 | 
						|
        include_all = True
 | 
						|
    streams = do_get_streams(
 | 
						|
        user_profile,
 | 
						|
        include_public=include_public,
 | 
						|
        include_web_public=include_web_public,
 | 
						|
        include_subscribed=include_subscribed,
 | 
						|
        exclude_archived=exclude_archived,
 | 
						|
        include_all=include_all,
 | 
						|
        include_default=include_default,
 | 
						|
        include_owner_subscribed=include_owner_subscribed,
 | 
						|
        include_can_access_content=include_can_access_content,
 | 
						|
    )
 | 
						|
    return json_success(request, data={"streams": streams})
 | 
						|
 | 
						|
 | 
						|
@typed_endpoint
 | 
						|
def get_stream_backend(
 | 
						|
    request: HttpRequest,
 | 
						|
    user_profile: UserProfile,
 | 
						|
    *,
 | 
						|
    stream_id: PathOnly[int],
 | 
						|
) -> HttpResponse:
 | 
						|
    (stream, sub) = access_stream_by_id(user_profile, stream_id, require_content_access=False)
 | 
						|
 | 
						|
    recent_traffic = get_streams_traffic({stream.id}, user_profile.realm)
 | 
						|
    anonymous_group_membership = get_anonymous_group_membership_dict_for_streams([stream])
 | 
						|
 | 
						|
    return json_success(
 | 
						|
        request, data={"stream": stream_to_dict(stream, recent_traffic, anonymous_group_membership)}
 | 
						|
    )
 | 
						|
 | 
						|
 | 
						|
@typed_endpoint
 | 
						|
def get_topics_backend(
 | 
						|
    request: HttpRequest,
 | 
						|
    maybe_user_profile: UserProfile | AnonymousUser,
 | 
						|
    *,
 | 
						|
    allow_empty_topic_name: Json[bool] = False,
 | 
						|
    stream_id: PathOnly[NonNegativeInt],
 | 
						|
) -> HttpResponse:
 | 
						|
    if not maybe_user_profile.is_authenticated:
 | 
						|
        is_web_public_query = True
 | 
						|
        user_profile: UserProfile | None = None
 | 
						|
    else:
 | 
						|
        is_web_public_query = False
 | 
						|
        assert isinstance(maybe_user_profile, UserProfile)
 | 
						|
        user_profile = maybe_user_profile
 | 
						|
        assert user_profile is not None
 | 
						|
 | 
						|
    if is_web_public_query:
 | 
						|
        realm = get_valid_realm_from_request(request)
 | 
						|
        stream = access_web_public_stream(stream_id, realm)
 | 
						|
        result = get_topic_history_for_public_stream(
 | 
						|
            realm_id=realm.id,
 | 
						|
            recipient_id=assert_is_not_none(stream.recipient_id),
 | 
						|
            allow_empty_topic_name=allow_empty_topic_name,
 | 
						|
        )
 | 
						|
 | 
						|
    else:
 | 
						|
        assert user_profile is not None
 | 
						|
 | 
						|
        (stream, sub) = access_stream_by_id(user_profile, stream_id)
 | 
						|
 | 
						|
        assert stream.recipient_id is not None
 | 
						|
        result = get_topic_history_for_stream(
 | 
						|
            user_profile=user_profile,
 | 
						|
            recipient_id=stream.recipient_id,
 | 
						|
            public_history=stream.is_history_public_to_subscribers(),
 | 
						|
            allow_empty_topic_name=allow_empty_topic_name,
 | 
						|
        )
 | 
						|
 | 
						|
    return json_success(request, data=dict(topics=result))
 | 
						|
 | 
						|
 | 
						|
@require_realm_admin
 | 
						|
@typed_endpoint
 | 
						|
def delete_in_topic(
 | 
						|
    request: HttpRequest,
 | 
						|
    user_profile: UserProfile,
 | 
						|
    *,
 | 
						|
    stream_id: PathOnly[NonNegativeInt],
 | 
						|
    topic_name: str,
 | 
						|
) -> HttpResponse:
 | 
						|
    stream, ignored_sub = access_stream_by_id(user_profile, stream_id)
 | 
						|
    topic_name = maybe_rename_general_chat_to_empty_topic(topic_name)
 | 
						|
 | 
						|
    messages = messages_for_topic(
 | 
						|
        user_profile.realm_id, assert_is_not_none(stream.recipient_id), topic_name
 | 
						|
    )
 | 
						|
    # This handles applying access control, such that only messages
 | 
						|
    # the user can see are returned in the query.
 | 
						|
    messages = bulk_access_stream_messages_query(user_profile, messages, stream)
 | 
						|
 | 
						|
    # Topics can be large enough that this request will inevitably time out.
 | 
						|
    # In such a case, it's good for some progress to be accomplished, so that
 | 
						|
    # full deletion can be achieved by repeating the request. For that purpose,
 | 
						|
    # we delete messages in atomic batches, committing after each batch.
 | 
						|
    # TODO: Ideally this should be moved to the deferred_work queue.
 | 
						|
    start_time = time.monotonic()
 | 
						|
    batch_size = RETENTION_STREAM_MESSAGE_BATCH_SIZE
 | 
						|
    while True:
 | 
						|
        if time.monotonic() >= start_time + 50:
 | 
						|
            return json_success(request, data={"complete": False})
 | 
						|
        with transaction.atomic(durable=True):
 | 
						|
            messages_to_delete = messages.order_by("-id")[0:batch_size].select_for_update(
 | 
						|
                of=("self",)
 | 
						|
            )
 | 
						|
            if not messages_to_delete:
 | 
						|
                break
 | 
						|
            do_delete_messages(user_profile.realm, messages_to_delete, acting_user=user_profile)
 | 
						|
 | 
						|
    # Since the topic no longer exists, remove the user topic rows.
 | 
						|
    users_with_stale_user_topic_rows = [
 | 
						|
        user_topic.user_profile
 | 
						|
        for user_topic in get_users_with_user_topic_visibility_policy(stream.id, topic_name)
 | 
						|
    ]
 | 
						|
 | 
						|
    if not stream.is_history_public_to_subscribers():
 | 
						|
        # In a private channel with protected history, delete the UserTopic
 | 
						|
        # records for exactly the users for whom after the topic deletion
 | 
						|
        # action, they no longer have access to any messages in the topic.
 | 
						|
        user_ids_with_access_to_protected_messages = set(
 | 
						|
            UserMessage.objects.filter(
 | 
						|
                user_profile__in=users_with_stale_user_topic_rows,
 | 
						|
                message__recipient_id=assert_is_not_none(stream.recipient_id),
 | 
						|
                message__subject__iexact=topic_name,
 | 
						|
                message__is_channel_message=True,
 | 
						|
            ).values_list("user_profile", flat=True)
 | 
						|
        )
 | 
						|
        users_with_stale_user_topic_rows = list(
 | 
						|
            filter(
 | 
						|
                lambda user_profile: user_profile.id
 | 
						|
                not in user_ids_with_access_to_protected_messages,
 | 
						|
                users_with_stale_user_topic_rows,
 | 
						|
            )
 | 
						|
        )
 | 
						|
 | 
						|
    bulk_do_set_user_topic_visibility_policy(
 | 
						|
        users_with_stale_user_topic_rows,
 | 
						|
        stream,
 | 
						|
        topic_name,
 | 
						|
        visibility_policy=UserTopic.VisibilityPolicy.INHERIT,
 | 
						|
    )
 | 
						|
 | 
						|
    return json_success(request, data={"complete": True})
 | 
						|
 | 
						|
 | 
						|
@typed_endpoint
 | 
						|
def json_get_stream_id(
 | 
						|
    request: HttpRequest,
 | 
						|
    user_profile: UserProfile,
 | 
						|
    *,
 | 
						|
    stream_name: Annotated[str, ApiParamConfig("stream")],
 | 
						|
) -> HttpResponse:
 | 
						|
    (stream, sub) = access_stream_by_name(user_profile, stream_name)
 | 
						|
    return json_success(request, data={"stream_id": stream.id})
 | 
						|
 | 
						|
 | 
						|
class SubscriptionPropertyChangeRequest(BaseModel):
 | 
						|
    stream_id: int
 | 
						|
    property: str
 | 
						|
    value: bool | str
 | 
						|
 | 
						|
    @model_validator(mode="after")
 | 
						|
    def validate_terms(self) -> "SubscriptionPropertyChangeRequest":
 | 
						|
        boolean_properties = {
 | 
						|
            "in_home_view",
 | 
						|
            "is_muted",
 | 
						|
            "desktop_notifications",
 | 
						|
            "audible_notifications",
 | 
						|
            "push_notifications",
 | 
						|
            "email_notifications",
 | 
						|
            "pin_to_top",
 | 
						|
            "wildcard_mentions_notify",
 | 
						|
        }
 | 
						|
 | 
						|
        if self.property == "color":
 | 
						|
            self.value = check_color("color", self.value)
 | 
						|
        elif self.property in boolean_properties:
 | 
						|
            if not isinstance(self.value, bool):
 | 
						|
                raise JsonableError(_("{property} is not a boolean").format(property=self.property))
 | 
						|
        else:
 | 
						|
            raise JsonableError(
 | 
						|
                _("Unknown subscription property: {property}").format(property=self.property)
 | 
						|
            )
 | 
						|
        return self
 | 
						|
 | 
						|
 | 
						|
@typed_endpoint
 | 
						|
def update_subscriptions_property(
 | 
						|
    request: HttpRequest,
 | 
						|
    user_profile: UserProfile,
 | 
						|
    *,
 | 
						|
    property: str,
 | 
						|
    stream_id: PathOnly[Json[int]],
 | 
						|
    value: Annotated[Json[bool] | str, Field(union_mode="left_to_right")],
 | 
						|
) -> HttpResponse:
 | 
						|
    change_request = SubscriptionPropertyChangeRequest(
 | 
						|
        stream_id=stream_id, property=property, value=value
 | 
						|
    )
 | 
						|
    return update_subscription_properties_backend(
 | 
						|
        request, user_profile, subscription_data=[change_request]
 | 
						|
    )
 | 
						|
 | 
						|
 | 
						|
@typed_endpoint
 | 
						|
def update_subscription_properties_backend(
 | 
						|
    request: HttpRequest,
 | 
						|
    user_profile: UserProfile,
 | 
						|
    *,
 | 
						|
    subscription_data: Json[list[SubscriptionPropertyChangeRequest]],
 | 
						|
) -> HttpResponse:
 | 
						|
    """
 | 
						|
    This is the entry point to changing subscription properties. This
 | 
						|
    is a bulk endpoint: requesters always provide a subscription_data
 | 
						|
    list containing dictionaries for each stream of interest.
 | 
						|
 | 
						|
    Requests are of the form:
 | 
						|
 | 
						|
    [{"stream_id": "1", "property": "is_muted", "value": False},
 | 
						|
     {"stream_id": "1", "property": "color", "value": "#c2c2c2"}]
 | 
						|
    """
 | 
						|
 | 
						|
    for change in subscription_data:
 | 
						|
        stream_id = change.stream_id
 | 
						|
        property = change.property
 | 
						|
        value = change.value
 | 
						|
 | 
						|
        (stream, sub) = access_stream_by_id(user_profile, stream_id)
 | 
						|
        if sub is None:
 | 
						|
            raise JsonableError(
 | 
						|
                _("Not subscribed to channel ID {channel_id}").format(channel_id=stream_id)
 | 
						|
            )
 | 
						|
 | 
						|
        do_change_subscription_property(
 | 
						|
            user_profile, sub, stream, property, value, acting_user=user_profile
 | 
						|
        )
 | 
						|
 | 
						|
    return json_success(request)
 | 
						|
 | 
						|
 | 
						|
@typed_endpoint
 | 
						|
def get_stream_email_address(
 | 
						|
    request: HttpRequest,
 | 
						|
    user_profile: UserProfile,
 | 
						|
    *,
 | 
						|
    sender_id: Json[NonNegativeInt] | None = None,
 | 
						|
    stream_id: Annotated[NonNegativeInt, ApiParamConfig("stream", path_only=True)],
 | 
						|
) -> HttpResponse:
 | 
						|
    (stream, sub) = access_stream_by_id(
 | 
						|
        user_profile,
 | 
						|
        stream_id,
 | 
						|
    )
 | 
						|
    email_gateway_bot = get_system_bot(settings.EMAIL_GATEWAY_BOT, stream.realm_id)
 | 
						|
 | 
						|
    if sender_id is None or sender_id == email_gateway_bot.id:
 | 
						|
        sender = email_gateway_bot
 | 
						|
    elif sender_id == user_profile.id:
 | 
						|
        sender = user_profile
 | 
						|
    else:
 | 
						|
        sender = access_bot_by_id(user_profile, sender_id)
 | 
						|
 | 
						|
    email_token = get_channel_email_token(stream, creator=user_profile, sender=sender)
 | 
						|
    stream_email = encode_email_address(stream.name, email_token, show_sender=True)
 | 
						|
 | 
						|
    return json_success(request, data={"email": stream_email})
 |