mirror of
				https://github.com/zulip/zulip.git
				synced 2025-11-04 05:53:43 +00:00 
			
		
		
		
	Removes send_new_subscription_messages parameter from
`POST channel/create` endpoint as Notification Bot DMs
are never sent when users are subscribed as part of
creating a new channel.
Not considered a documentable API change, since the previous parameter
never had any effect, so this is effectively just a documentation and
error-handling bug fix, not an API change relevant to implementors.
(cherry picked from commit 699c0c6200)
		
	
		
			
				
	
	
		
			1152 lines
		
	
	
		
			46 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			1152 lines
		
	
	
		
			46 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import orjson
 | 
						|
 | 
						|
from zerver.actions.channel_folders import check_add_channel_folder
 | 
						|
from zerver.actions.realm_settings import (
 | 
						|
    do_change_realm_permission_group_setting,
 | 
						|
    do_change_realm_plan_type,
 | 
						|
)
 | 
						|
from zerver.actions.user_groups import check_add_user_group
 | 
						|
from zerver.actions.users import do_change_user_role
 | 
						|
from zerver.lib.default_streams import get_default_stream_ids_for_realm
 | 
						|
from zerver.lib.exceptions import JsonableError
 | 
						|
from zerver.lib.message import UnreadStreamInfo, aggregate_unread_data, get_raw_unread_data
 | 
						|
from zerver.lib.streams import (
 | 
						|
    StreamDict,
 | 
						|
    create_stream_if_needed,
 | 
						|
    create_streams_if_needed,
 | 
						|
    ensure_stream,
 | 
						|
    list_to_streams,
 | 
						|
)
 | 
						|
from zerver.lib.test_classes import ZulipTestCase, get_topic_messages
 | 
						|
from zerver.lib.test_helpers import reset_email_visibility_to_everyone_in_zulip_realm
 | 
						|
from zerver.lib.types import UserGroupMembersDict
 | 
						|
from zerver.models import (
 | 
						|
    Message,
 | 
						|
    NamedUserGroup,
 | 
						|
    Realm,
 | 
						|
    Recipient,
 | 
						|
    Stream,
 | 
						|
    Subscription,
 | 
						|
    UserMessage,
 | 
						|
    UserProfile,
 | 
						|
)
 | 
						|
from zerver.models.groups import SystemGroups
 | 
						|
from zerver.models.realms import get_realm
 | 
						|
from zerver.models.streams import StreamTopicsPolicyEnum, get_stream
 | 
						|
from zerver.models.users import active_non_guest_user_ids
 | 
						|
 | 
						|
 | 
						|
class TestCreateStreams(ZulipTestCase):
 | 
						|
    def test_creating_streams(self) -> None:
 | 
						|
        stream_names = ["new1", "new2", "new3"]
 | 
						|
        stream_descriptions = ["des1", "des2", "des3"]
 | 
						|
        realm = get_realm("zulip")
 | 
						|
        iago = self.example_user("iago")
 | 
						|
 | 
						|
        # Test stream creation events.
 | 
						|
        with self.capture_send_event_calls(expected_num_events=1) as events:
 | 
						|
            ensure_stream(realm, "Public stream", invite_only=False, acting_user=None)
 | 
						|
 | 
						|
        self.assertEqual(events[0]["event"]["type"], "stream")
 | 
						|
        self.assertEqual(events[0]["event"]["op"], "create")
 | 
						|
        # Send public stream creation event to all active users.
 | 
						|
        self.assertEqual(events[0]["users"], active_non_guest_user_ids(realm.id))
 | 
						|
        self.assertEqual(events[0]["event"]["streams"][0]["name"], "Public stream")
 | 
						|
        self.assertEqual(events[0]["event"]["streams"][0]["stream_weekly_traffic"], None)
 | 
						|
 | 
						|
        aaron_group = check_add_user_group(
 | 
						|
            realm, "aaron_group", [self.example_user("aaron")], acting_user=iago
 | 
						|
        )
 | 
						|
        prospero_group = check_add_user_group(
 | 
						|
            realm, "prospero_group", [self.example_user("prospero")], acting_user=iago
 | 
						|
        )
 | 
						|
        cordelia_group = check_add_user_group(
 | 
						|
            realm, "cordelia_group", [self.example_user("cordelia")], acting_user=iago
 | 
						|
        )
 | 
						|
        with self.capture_send_event_calls(expected_num_events=1) as events:
 | 
						|
            create_stream_if_needed(
 | 
						|
                realm,
 | 
						|
                "Private stream",
 | 
						|
                invite_only=True,
 | 
						|
                can_administer_channel_group=aaron_group,
 | 
						|
                can_add_subscribers_group=prospero_group,
 | 
						|
                can_subscribe_group=cordelia_group,
 | 
						|
            )
 | 
						|
 | 
						|
        self.assertEqual(events[0]["event"]["type"], "stream")
 | 
						|
        self.assertEqual(events[0]["event"]["op"], "create")
 | 
						|
        # Send private stream creation event to only realm admins.
 | 
						|
        self.assert_length(events[0]["users"], 5)
 | 
						|
        self.assertCountEqual(
 | 
						|
            [
 | 
						|
                iago.id,
 | 
						|
                self.example_user("desdemona").id,
 | 
						|
                self.example_user("aaron").id,
 | 
						|
                self.example_user("prospero").id,
 | 
						|
                self.example_user("cordelia").id,
 | 
						|
            ],
 | 
						|
            events[0]["users"],
 | 
						|
        )
 | 
						|
        self.assertEqual(events[0]["event"]["streams"][0]["name"], "Private stream")
 | 
						|
        self.assertEqual(events[0]["event"]["streams"][0]["stream_weekly_traffic"], None)
 | 
						|
 | 
						|
        moderators_system_group = NamedUserGroup.objects.get(
 | 
						|
            name="role:moderators", realm=realm, is_system_group=True
 | 
						|
        )
 | 
						|
        new_streams, existing_streams = create_streams_if_needed(
 | 
						|
            realm,
 | 
						|
            [
 | 
						|
                {
 | 
						|
                    "name": stream_name,
 | 
						|
                    "description": stream_description,
 | 
						|
                    "invite_only": True,
 | 
						|
                    "message_retention_days": -1,
 | 
						|
                    "can_remove_subscribers_group": moderators_system_group,
 | 
						|
                }
 | 
						|
                for (stream_name, stream_description) in zip(
 | 
						|
                    stream_names, stream_descriptions, strict=False
 | 
						|
                )
 | 
						|
            ],
 | 
						|
        )
 | 
						|
 | 
						|
        self.assert_length(new_streams, 3)
 | 
						|
        self.assert_length(existing_streams, 0)
 | 
						|
 | 
						|
        actual_stream_names = {stream.name for stream in new_streams}
 | 
						|
        self.assertEqual(actual_stream_names, set(stream_names))
 | 
						|
        actual_stream_descriptions = {stream.description for stream in new_streams}
 | 
						|
        self.assertEqual(actual_stream_descriptions, set(stream_descriptions))
 | 
						|
        for stream in new_streams:
 | 
						|
            self.assertTrue(stream.invite_only)
 | 
						|
            self.assertTrue(stream.message_retention_days == -1)
 | 
						|
            self.assertEqual(stream.can_remove_subscribers_group.id, moderators_system_group.id)
 | 
						|
            # Streams created where acting_user is None have no creator
 | 
						|
            self.assertIsNone(stream.creator_id)
 | 
						|
 | 
						|
        new_streams, existing_streams = create_streams_if_needed(
 | 
						|
            realm,
 | 
						|
            [
 | 
						|
                {"name": stream_name, "description": stream_description, "invite_only": True}
 | 
						|
                for (stream_name, stream_description) in zip(
 | 
						|
                    stream_names, stream_descriptions, strict=False
 | 
						|
                )
 | 
						|
            ],
 | 
						|
        )
 | 
						|
 | 
						|
        self.assert_length(new_streams, 0)
 | 
						|
        self.assert_length(existing_streams, 3)
 | 
						|
 | 
						|
        actual_stream_names = {stream.name for stream in existing_streams}
 | 
						|
        self.assertEqual(actual_stream_names, set(stream_names))
 | 
						|
        actual_stream_descriptions = {stream.description for stream in existing_streams}
 | 
						|
        self.assertEqual(actual_stream_descriptions, set(stream_descriptions))
 | 
						|
        for stream in existing_streams:
 | 
						|
            self.assertTrue(stream.invite_only)
 | 
						|
 | 
						|
    def test_create_api_multiline_description(self) -> None:
 | 
						|
        user = self.example_user("hamlet")
 | 
						|
        realm = user.realm
 | 
						|
        self.login_user(user)
 | 
						|
        subscriptions = [{"name": "new_stream", "description": "multi\nline\ndescription"}]
 | 
						|
        result = self.subscribe_via_post(user, subscriptions, subdomain="zulip")
 | 
						|
        self.assert_json_success(result)
 | 
						|
        stream = get_stream("new_stream", realm)
 | 
						|
        self.assertEqual(stream.description, "multi line description")
 | 
						|
 | 
						|
    def test_create_api_topic_permalink_description(self) -> None:
 | 
						|
        user = self.example_user("iago")
 | 
						|
        realm = user.realm
 | 
						|
        self.login_user(user)
 | 
						|
 | 
						|
        hamlet = self.example_user("hamlet")
 | 
						|
        core_stream = self.make_stream("core", realm, True, history_public_to_subscribers=True)
 | 
						|
        self.subscribe(hamlet, "core")
 | 
						|
        msg_id = self.send_stream_message(hamlet, "core", topic_name="testing")
 | 
						|
 | 
						|
        # Test permalink not generated for description since user has no access to
 | 
						|
        # the channel.
 | 
						|
        subscriptions = [{"name": "stream1", "description": "#**core>testing**"}]
 | 
						|
        result = self.subscribe_via_post(user, subscriptions, subdomain="zulip")
 | 
						|
        self.assert_json_success(result)
 | 
						|
        stream = get_stream("stream1", realm)
 | 
						|
 | 
						|
        self.assertEqual(stream.rendered_description, "<p>#<strong>core>testing</strong></p>")
 | 
						|
 | 
						|
        self.subscribe(user, "core")
 | 
						|
 | 
						|
        # Test permalink generated for the description since user now has access
 | 
						|
        # to the channel.
 | 
						|
        subscriptions = [{"name": "stream2", "description": "#**core>testing**"}]
 | 
						|
        result = self.subscribe_via_post(user, subscriptions, subdomain="zulip")
 | 
						|
        self.assert_json_success(result)
 | 
						|
        stream = get_stream("stream2", realm)
 | 
						|
 | 
						|
        self.assertEqual(
 | 
						|
            stream.rendered_description,
 | 
						|
            f'<p><a class="stream-topic" data-stream-id="{core_stream.id}" href="/#narrow/channel/{core_stream.id}-core/topic/testing/with/{msg_id}">#{core_stream.name} > testing</a></p>',
 | 
						|
        )
 | 
						|
 | 
						|
    def test_history_public_to_subscribers_on_stream_creation(self) -> None:
 | 
						|
        realm = get_realm("zulip")
 | 
						|
        stream_dicts: list[StreamDict] = [
 | 
						|
            {
 | 
						|
                "name": "publicstream",
 | 
						|
                "description": "Public stream with public history",
 | 
						|
            },
 | 
						|
            {"name": "webpublicstream", "description": "Web-public stream", "is_web_public": True},
 | 
						|
            {
 | 
						|
                "name": "privatestream",
 | 
						|
                "description": "Private stream with non-public history",
 | 
						|
                "invite_only": True,
 | 
						|
            },
 | 
						|
            {
 | 
						|
                "name": "privatewithhistory",
 | 
						|
                "description": "Private stream with public history",
 | 
						|
                "invite_only": True,
 | 
						|
                "history_public_to_subscribers": True,
 | 
						|
            },
 | 
						|
            {
 | 
						|
                "name": "publictrywithouthistory",
 | 
						|
                "description": "Public stream without public history (disallowed)",
 | 
						|
                "invite_only": False,
 | 
						|
                "history_public_to_subscribers": False,
 | 
						|
            },
 | 
						|
        ]
 | 
						|
 | 
						|
        created, existing = create_streams_if_needed(realm, stream_dicts)
 | 
						|
 | 
						|
        self.assert_length(created, 5)
 | 
						|
        self.assert_length(existing, 0)
 | 
						|
        for stream in created:
 | 
						|
            if stream.name == "publicstream":
 | 
						|
                self.assertTrue(stream.history_public_to_subscribers)
 | 
						|
            if stream.name == "webpublicstream":
 | 
						|
                self.assertTrue(stream.history_public_to_subscribers)
 | 
						|
            if stream.name == "privatestream":
 | 
						|
                self.assertFalse(stream.history_public_to_subscribers)
 | 
						|
            if stream.name == "privatewithhistory":
 | 
						|
                self.assertTrue(stream.history_public_to_subscribers)
 | 
						|
            if stream.name == "publictrywithouthistory":
 | 
						|
                self.assertTrue(stream.history_public_to_subscribers)
 | 
						|
 | 
						|
    def test_add_stream_as_default_on_stream_creation(self) -> None:
 | 
						|
        user_profile = self.example_user("hamlet")
 | 
						|
        self.login_user(user_profile)
 | 
						|
        realm = user_profile.realm
 | 
						|
 | 
						|
        subscriptions = [
 | 
						|
            {"name": "default_stream", "description": "This stream is default for new users"}
 | 
						|
        ]
 | 
						|
        result = self.subscribe_via_post(
 | 
						|
            user_profile,
 | 
						|
            subscriptions,
 | 
						|
            {"is_default_stream": "true"},
 | 
						|
            allow_fail=True,
 | 
						|
            subdomain="zulip",
 | 
						|
        )
 | 
						|
        self.assert_json_error(result, "Insufficient permission")
 | 
						|
 | 
						|
        do_change_user_role(user_profile, UserProfile.ROLE_REALM_ADMINISTRATOR, acting_user=None)
 | 
						|
        result = self.subscribe_via_post(
 | 
						|
            user_profile, subscriptions, {"is_default_stream": "true"}, subdomain="zulip"
 | 
						|
        )
 | 
						|
        self.assert_json_success(result)
 | 
						|
        default_stream = get_stream("default_stream", realm)
 | 
						|
        self.assertTrue(default_stream.id in get_default_stream_ids_for_realm(realm.id))
 | 
						|
 | 
						|
        subscriptions = [
 | 
						|
            {
 | 
						|
                "name": "private_default_stream",
 | 
						|
                "description": "This stream is private and default for new users",
 | 
						|
            }
 | 
						|
        ]
 | 
						|
        result = self.subscribe_via_post(
 | 
						|
            user_profile,
 | 
						|
            subscriptions,
 | 
						|
            {"is_default_stream": "true"},
 | 
						|
            invite_only=True,
 | 
						|
            allow_fail=True,
 | 
						|
            subdomain="zulip",
 | 
						|
        )
 | 
						|
        self.assert_json_error(result, "A default channel cannot be private.")
 | 
						|
 | 
						|
    def test_create_stream_using_add_channel(self) -> None:
 | 
						|
        user_profile = self.example_user("iago")
 | 
						|
        result = self.create_channel_via_post(user_profile, name="basketball")
 | 
						|
        self.assert_json_success(result)
 | 
						|
        stream = get_stream("basketball", user_profile.realm)
 | 
						|
        self.assertEqual(stream.name, "basketball")
 | 
						|
 | 
						|
        cordelia = self.example_user("cordelia")
 | 
						|
        nobody_group = NamedUserGroup.objects.get(
 | 
						|
            name=SystemGroups.NOBODY, realm=cordelia.realm, is_system_group=True
 | 
						|
        )
 | 
						|
 | 
						|
        channel_folder = check_add_channel_folder(
 | 
						|
            user_profile.realm, "sports", "", acting_user=user_profile
 | 
						|
        )
 | 
						|
        result = self.create_channel_via_post(
 | 
						|
            user_profile,
 | 
						|
            name="testchannel",
 | 
						|
            extra_post_data=dict(
 | 
						|
                description="test channel",
 | 
						|
                can_administer_channel_group=orjson.dumps(
 | 
						|
                    {
 | 
						|
                        "direct_members": [cordelia.id],
 | 
						|
                        "direct_subgroups": [nobody_group.id],
 | 
						|
                    }
 | 
						|
                ).decode(),
 | 
						|
                folder_id=orjson.dumps(channel_folder.id).decode(),
 | 
						|
            ),
 | 
						|
        )
 | 
						|
        self.assert_json_success(result)
 | 
						|
        stream = get_stream("testchannel", user_profile.realm)
 | 
						|
        self.assertEqual(stream.name, "testchannel")
 | 
						|
        self.assertEqual(stream.description, "test channel")
 | 
						|
 | 
						|
        # Confirm channel created notification message in channel events topic.
 | 
						|
        message = self.get_last_message()
 | 
						|
        self.assertEqual(message.recipient.type, Recipient.STREAM)
 | 
						|
        self.assertEqual(message.recipient.type_id, stream.id)
 | 
						|
        self.assertEqual(message.topic_name(), Realm.STREAM_EVENTS_NOTIFICATION_TOPIC_NAME)
 | 
						|
        self.assertEqual(message.sender_id, self.notification_bot(user_profile.realm).id)
 | 
						|
        expected_message_content = (
 | 
						|
            f"**Public** channel created by @_**{user_profile.full_name}|{user_profile.id}**. **Description:**\n"
 | 
						|
            "```` quote\ntest channel\n````"
 | 
						|
        )
 | 
						|
        self.assertEqual(message.content, expected_message_content)
 | 
						|
 | 
						|
        # Creating an existing channel should return an error.
 | 
						|
        result = self.create_channel_via_post(user_profile, name="basketball")
 | 
						|
        self.assert_json_error(result, "Channel 'basketball' already exists", status_code=409)
 | 
						|
 | 
						|
        # Test creating channel with no subscribers
 | 
						|
        post_data = {
 | 
						|
            "name": "no-sub-channel",
 | 
						|
            "subscribers": orjson.dumps([]).decode(),
 | 
						|
        }
 | 
						|
 | 
						|
        result = self.api_post(
 | 
						|
            user_profile,
 | 
						|
            "/api/v1/channels/create",
 | 
						|
            post_data,
 | 
						|
            intentionally_undocumented=True,
 | 
						|
        )
 | 
						|
        self.assert_json_success(result)
 | 
						|
        stream = get_stream("no-sub-channel", user_profile.realm)
 | 
						|
        self.assertEqual(stream.name, "no-sub-channel")
 | 
						|
        self.assertEqual(stream.subscriber_count, 0)
 | 
						|
 | 
						|
    def test_channel_creation_miscellaneous(self) -> None:
 | 
						|
        iago = self.example_user("iago")
 | 
						|
        desdemona = self.example_user("desdemona")
 | 
						|
        cordelia = self.example_user("cordelia")
 | 
						|
 | 
						|
        result = self.create_channel_via_post(
 | 
						|
            iago, extra_post_data={"message_retention_days": orjson.dumps(10).decode()}
 | 
						|
        )
 | 
						|
        self.assert_json_error(result, "Must be an organization owner")
 | 
						|
 | 
						|
        result = self.create_channel_via_post(
 | 
						|
            desdemona,
 | 
						|
            [iago.id],
 | 
						|
            name="new_channel",
 | 
						|
            extra_post_data={"message_retention_days": orjson.dumps(10).decode()},
 | 
						|
        )
 | 
						|
        self.assert_json_success(result)
 | 
						|
        stream = get_stream("new_channel", desdemona.realm)
 | 
						|
        self.assertEqual(stream.name, "new_channel")
 | 
						|
        self.assertEqual(stream.message_retention_days, 10)
 | 
						|
 | 
						|
        # Default streams can only be created by admins
 | 
						|
        result = self.create_channel_via_post(
 | 
						|
            iago,
 | 
						|
            name="testing_channel1",
 | 
						|
            extra_post_data={"is_default_stream": orjson.dumps(True).decode()},
 | 
						|
            invite_only=True,
 | 
						|
        )
 | 
						|
        self.assert_json_error(result, "A default channel cannot be private.")
 | 
						|
 | 
						|
        result = self.create_channel_via_post(
 | 
						|
            iago,
 | 
						|
            name="testing_channel1",
 | 
						|
            extra_post_data={"is_default_stream": orjson.dumps(True).decode()},
 | 
						|
            invite_only=False,
 | 
						|
        )
 | 
						|
        self.assert_json_success(result)
 | 
						|
        stream = get_stream("testing_channel1", iago.realm)
 | 
						|
        self.assertEqual(stream.name, "testing_channel1")
 | 
						|
        self.assertTrue(stream.id in get_default_stream_ids_for_realm(iago.realm.id))
 | 
						|
 | 
						|
        # Only org owners can create web public streams by default, if they are enabled.
 | 
						|
        with self.settings(WEB_PUBLIC_STREAMS_ENABLED=False):
 | 
						|
            self.assertFalse(desdemona.realm.has_web_public_streams())
 | 
						|
            result = self.create_channel_via_post(
 | 
						|
                desdemona,
 | 
						|
                name="testing_web_public_channel",
 | 
						|
                is_web_public=True,
 | 
						|
            )
 | 
						|
            self.assert_json_error(result, "Web-public channels are not enabled.")
 | 
						|
 | 
						|
        with self.settings(WEB_PUBLIC_STREAMS_ENABLED=True):
 | 
						|
            self.assertTrue(desdemona.realm.has_web_public_streams())
 | 
						|
            result = self.create_channel_via_post(
 | 
						|
                desdemona,
 | 
						|
                name="testing_web_public_channel",
 | 
						|
                is_web_public=True,
 | 
						|
            )
 | 
						|
            self.assert_json_success(result)
 | 
						|
            stream = get_stream("testing_web_public_channel", desdemona.realm)
 | 
						|
            self.assertEqual(stream.name, "testing_web_public_channel")
 | 
						|
 | 
						|
        polonius = self.example_user("polonius")
 | 
						|
        result = self.create_channel_via_post(
 | 
						|
            polonius,
 | 
						|
            name="testing_channel4",
 | 
						|
            invite_only=True,
 | 
						|
        )
 | 
						|
        self.assert_json_error(result, "Not allowed for guest users")
 | 
						|
 | 
						|
        # topics policy
 | 
						|
        owners = NamedUserGroup.objects.get(
 | 
						|
            name=SystemGroups.OWNERS, realm=cordelia.realm, is_system_group=True
 | 
						|
        )
 | 
						|
        do_change_realm_permission_group_setting(
 | 
						|
            cordelia.realm, "can_set_topics_policy_group", owners, acting_user=None
 | 
						|
        )
 | 
						|
        self.assertTrue(desdemona.can_set_topics_policy())
 | 
						|
        self.assertFalse(cordelia.can_set_topics_policy())
 | 
						|
        result = self.create_channel_via_post(
 | 
						|
            cordelia,
 | 
						|
            name="testing_channel4",
 | 
						|
            extra_post_data={
 | 
						|
                "topics_policy": orjson.dumps(
 | 
						|
                    StreamTopicsPolicyEnum.disable_empty_topic.name
 | 
						|
                ).decode()
 | 
						|
            },
 | 
						|
        )
 | 
						|
        self.assert_json_error(result, "Insufficient permission")
 | 
						|
 | 
						|
        result = self.create_channel_via_post(
 | 
						|
            desdemona,
 | 
						|
            name="testing_channel4",
 | 
						|
            extra_post_data={
 | 
						|
                "topics_policy": orjson.dumps(
 | 
						|
                    StreamTopicsPolicyEnum.disable_empty_topic.name
 | 
						|
                ).decode()
 | 
						|
            },
 | 
						|
        )
 | 
						|
        self.assert_json_success(result)
 | 
						|
        stream = get_stream("testing_channel4", desdemona.realm)
 | 
						|
        self.assertEqual(stream.name, "testing_channel4")
 | 
						|
        self.assertEqual(stream.topics_policy, StreamTopicsPolicyEnum.disable_empty_topic.value)
 | 
						|
 | 
						|
        realm = get_realm("zephyr")
 | 
						|
        starnine = self.mit_user("starnine")
 | 
						|
        espuser = self.mit_user("espuser")
 | 
						|
        result = self.create_channel_via_post(
 | 
						|
            starnine, [espuser.id], name="zephyr_channel", subdomain="zephyr"
 | 
						|
        )
 | 
						|
        self.assert_json_error(
 | 
						|
            result, "You can only invite other Zephyr mirroring users to private channels."
 | 
						|
        )
 | 
						|
 | 
						|
        result = self.create_channel_via_post(
 | 
						|
            starnine, [espuser.id], name="zephyr_channel", invite_only=True, subdomain="zephyr"
 | 
						|
        )
 | 
						|
        self.assert_json_success(result)
 | 
						|
        stream = get_stream("zephyr_channel", realm)
 | 
						|
        self.assertEqual(stream.name, "zephyr_channel")
 | 
						|
 | 
						|
    def _test_group_based_settings_for_creating_channels(
 | 
						|
        self,
 | 
						|
        stream_policy: str,
 | 
						|
        *,
 | 
						|
        invite_only: bool,
 | 
						|
        is_web_public: bool,
 | 
						|
    ) -> None:
 | 
						|
        def check_permission_to_create_channel(
 | 
						|
            user: UserProfile, stream_name: str, *, expect_fail: bool = False
 | 
						|
        ) -> None:
 | 
						|
            result = self.create_channel_via_post(
 | 
						|
                user,
 | 
						|
                name=stream_name,
 | 
						|
                invite_only=invite_only,
 | 
						|
                is_web_public=is_web_public,
 | 
						|
            )
 | 
						|
            if expect_fail:
 | 
						|
                self.assert_json_error(result, "Insufficient permission")
 | 
						|
                return
 | 
						|
 | 
						|
            self.assert_json_success(result)
 | 
						|
            self.assertTrue(
 | 
						|
                Stream.objects.filter(name=stream_name, realm_id=user.realm.id).exists()
 | 
						|
            )
 | 
						|
 | 
						|
        cordelia = self.example_user("cordelia")
 | 
						|
        iago = self.example_user("iago")
 | 
						|
        desdemona = self.example_user("desdemona")
 | 
						|
 | 
						|
        # System groups case
 | 
						|
        nobody_group = NamedUserGroup.objects.get(
 | 
						|
            name=SystemGroups.NOBODY, realm=cordelia.realm, is_system_group=True
 | 
						|
        )
 | 
						|
        do_change_realm_permission_group_setting(
 | 
						|
            cordelia.realm, stream_policy, nobody_group, acting_user=None
 | 
						|
        )
 | 
						|
 | 
						|
        check_permission_to_create_channel(
 | 
						|
            cordelia,
 | 
						|
            "testing_channel_group_permission1",
 | 
						|
            expect_fail=True,
 | 
						|
        )
 | 
						|
 | 
						|
        check_permission_to_create_channel(
 | 
						|
            iago, "testing_channel_group_permission1", expect_fail=True
 | 
						|
        )
 | 
						|
 | 
						|
        member_group = NamedUserGroup.objects.get(
 | 
						|
            name=SystemGroups.MEMBERS, realm=cordelia.realm, is_system_group=True
 | 
						|
        )
 | 
						|
        do_change_realm_permission_group_setting(
 | 
						|
            cordelia.realm, stream_policy, member_group, acting_user=None
 | 
						|
        )
 | 
						|
        check_permission_to_create_channel(
 | 
						|
            cordelia,
 | 
						|
            "testing_channel_group_permission1",
 | 
						|
        )
 | 
						|
 | 
						|
        check_permission_to_create_channel(iago, "testing_channel_group_permission2")
 | 
						|
 | 
						|
        admin_group = NamedUserGroup.objects.get(
 | 
						|
            name=SystemGroups.ADMINISTRATORS, realm=cordelia.realm, is_system_group=True
 | 
						|
        )
 | 
						|
        do_change_realm_permission_group_setting(
 | 
						|
            cordelia.realm, stream_policy, admin_group, acting_user=None
 | 
						|
        )
 | 
						|
        check_permission_to_create_channel(
 | 
						|
            cordelia,
 | 
						|
            "testing_channel_group_permission3",
 | 
						|
            expect_fail=True,
 | 
						|
        )
 | 
						|
        check_permission_to_create_channel(iago, "testing_channel_group_permission3")
 | 
						|
 | 
						|
        # User defined group case
 | 
						|
        leadership_group = check_add_user_group(
 | 
						|
            cordelia.realm, "Leadership", [desdemona], acting_user=desdemona
 | 
						|
        )
 | 
						|
        do_change_realm_permission_group_setting(
 | 
						|
            cordelia.realm, stream_policy, leadership_group, acting_user=None
 | 
						|
        )
 | 
						|
        check_permission_to_create_channel(
 | 
						|
            cordelia,
 | 
						|
            "testing_channel_group_permission4",
 | 
						|
            expect_fail=True,
 | 
						|
        )
 | 
						|
        check_permission_to_create_channel(
 | 
						|
            desdemona,
 | 
						|
            "testing_channel_group_permission4",
 | 
						|
        )
 | 
						|
 | 
						|
        # Anonymous group case
 | 
						|
        staff_group = check_add_user_group(cordelia.realm, "Staff", [iago], acting_user=iago)
 | 
						|
        setting_group = self.create_or_update_anonymous_group_for_setting([cordelia], [staff_group])
 | 
						|
        do_change_realm_permission_group_setting(
 | 
						|
            cordelia.realm, stream_policy, setting_group, acting_user=None
 | 
						|
        )
 | 
						|
        check_permission_to_create_channel(
 | 
						|
            desdemona,
 | 
						|
            "testing_channel_group_permission5",
 | 
						|
            expect_fail=True,
 | 
						|
        )
 | 
						|
        check_permission_to_create_channel(iago, "testing_channel_group_permission5")
 | 
						|
        check_permission_to_create_channel(
 | 
						|
            cordelia,
 | 
						|
            "testing_channel_group_permission6",
 | 
						|
        )
 | 
						|
 | 
						|
    def test_group_based_permisions_for_creating_private_streams(self) -> None:
 | 
						|
        self._test_group_based_settings_for_creating_channels(
 | 
						|
            "can_create_private_channel_group",
 | 
						|
            invite_only=True,
 | 
						|
            is_web_public=False,
 | 
						|
        )
 | 
						|
 | 
						|
    def test_group_based_permisions_for_creating_public_streams(self) -> None:
 | 
						|
        self._test_group_based_settings_for_creating_channels(
 | 
						|
            "can_create_public_channel_group",
 | 
						|
            invite_only=False,
 | 
						|
            is_web_public=False,
 | 
						|
        )
 | 
						|
 | 
						|
    def test_group_based_permisions_for_creating_web_public_streams(self) -> None:
 | 
						|
        self._test_group_based_settings_for_creating_channels(
 | 
						|
            "can_create_web_public_channel_group",
 | 
						|
            invite_only=False,
 | 
						|
            is_web_public=True,
 | 
						|
        )
 | 
						|
 | 
						|
    def test_history_public_to_subscribers_zephyr_realm(self) -> None:
 | 
						|
        realm = get_realm("zephyr")
 | 
						|
 | 
						|
        stream, created = create_stream_if_needed(realm, "private_stream", invite_only=True)
 | 
						|
        self.assertTrue(created)
 | 
						|
        self.assertTrue(stream.invite_only)
 | 
						|
        self.assertFalse(stream.history_public_to_subscribers)
 | 
						|
 | 
						|
        stream, created = create_stream_if_needed(realm, "public_stream", invite_only=False)
 | 
						|
        self.assertTrue(created)
 | 
						|
        self.assertFalse(stream.invite_only)
 | 
						|
        self.assertFalse(stream.history_public_to_subscribers)
 | 
						|
 | 
						|
    def test_auto_mark_stream_created_message_as_read_for_stream_creator(self) -> None:
 | 
						|
        # This test relies on email == delivery_email for
 | 
						|
        # convenience.
 | 
						|
        reset_email_visibility_to_everyone_in_zulip_realm()
 | 
						|
 | 
						|
        realm = Realm.objects.get(name="Zulip Dev")
 | 
						|
        iago = self.example_user("iago")
 | 
						|
        hamlet = self.example_user("hamlet")
 | 
						|
        cordelia = self.example_user("cordelia")
 | 
						|
        aaron = self.example_user("aaron")
 | 
						|
 | 
						|
        # Establish a stream for notifications.
 | 
						|
        announce_stream = ensure_stream(
 | 
						|
            realm, "announce", False, "announcements here.", acting_user=None
 | 
						|
        )
 | 
						|
        realm.new_stream_announcements_stream_id = announce_stream.id
 | 
						|
        realm.save(update_fields=["new_stream_announcements_stream_id"])
 | 
						|
 | 
						|
        self.subscribe(iago, announce_stream.name)
 | 
						|
        self.subscribe(hamlet, announce_stream.name)
 | 
						|
 | 
						|
        self.login_user(iago)
 | 
						|
 | 
						|
        initial_message_count = Message.objects.count()
 | 
						|
        initial_usermessage_count = UserMessage.objects.count()
 | 
						|
 | 
						|
        data = {
 | 
						|
            "subscriptions": '[{"name":"brand new stream","description":""}]',
 | 
						|
            "history_public_to_subscribers": "true",
 | 
						|
            "invite_only": "false",
 | 
						|
            "announce": "true",
 | 
						|
            "principals": orjson.dumps([iago.id, aaron.id, cordelia.id, hamlet.id]).decode(),
 | 
						|
        }
 | 
						|
 | 
						|
        response = self.client_post("/json/users/me/subscriptions", data)
 | 
						|
 | 
						|
        final_message_count = Message.objects.count()
 | 
						|
        final_usermessage_count = UserMessage.objects.count()
 | 
						|
 | 
						|
        expected_response = {
 | 
						|
            "result": "success",
 | 
						|
            "msg": "",
 | 
						|
            "subscribed": {
 | 
						|
                "10": ["brand new stream"],
 | 
						|
                "11": ["brand new stream"],
 | 
						|
                "6": ["brand new stream"],
 | 
						|
                "8": ["brand new stream"],
 | 
						|
            },
 | 
						|
            "already_subscribed": {},
 | 
						|
            "new_subscription_messages_sent": True,
 | 
						|
        }
 | 
						|
        self.assertEqual(response.status_code, 200)
 | 
						|
        self.assertEqual(orjson.loads(response.content), expected_response)
 | 
						|
 | 
						|
        # 2 messages should be created, one in announce and one in the new stream itself.
 | 
						|
        self.assertEqual(final_message_count - initial_message_count, 2)
 | 
						|
        # 4 UserMessages per subscriber: One for each of the subscribers, plus 1 for
 | 
						|
        # each user in the notifications stream.
 | 
						|
        announce_stream_subs = Subscription.objects.filter(recipient=announce_stream.recipient)
 | 
						|
        self.assertEqual(
 | 
						|
            final_usermessage_count - initial_usermessage_count, 4 + announce_stream_subs.count()
 | 
						|
        )
 | 
						|
 | 
						|
        def get_unread_stream_data(user: UserProfile) -> list[UnreadStreamInfo]:
 | 
						|
            raw_unread_data = get_raw_unread_data(user)
 | 
						|
            aggregated_data = aggregate_unread_data(raw_unread_data, allow_empty_topic_name=True)
 | 
						|
            return aggregated_data["streams"]
 | 
						|
 | 
						|
        stream_id = Stream.objects.get(name="brand new stream").id
 | 
						|
        iago_unread_messages = get_unread_stream_data(iago)
 | 
						|
        hamlet_unread_messages = get_unread_stream_data(hamlet)
 | 
						|
 | 
						|
        # The stream creation messages should be unread for Hamlet
 | 
						|
        self.assert_length(hamlet_unread_messages, 2)
 | 
						|
 | 
						|
        # According to the code in zerver/views/streams/add_subscriptions_backend
 | 
						|
        # the notification stream message is sent first, then the new stream's message.
 | 
						|
        self.assertEqual(hamlet_unread_messages[1]["stream_id"], stream_id)
 | 
						|
 | 
						|
        # But it should be marked as read for Iago, the stream creator.
 | 
						|
        self.assert_length(iago_unread_messages, 0)
 | 
						|
 | 
						|
    def test_can_administer_channel_group_default_on_stream_creation(self) -> None:
 | 
						|
        user = self.example_user("hamlet")
 | 
						|
        realm = user.realm
 | 
						|
        self.login_user(user)
 | 
						|
        nobody_system_group = NamedUserGroup.objects.get(
 | 
						|
            name="role:nobody", realm=realm, is_system_group=True
 | 
						|
        )
 | 
						|
 | 
						|
        stream, created = create_stream_if_needed(
 | 
						|
            realm, "new stream without acting user", invite_only=True
 | 
						|
        )
 | 
						|
        self.assertEqual(stream.can_administer_channel_group.id, nobody_system_group.id)
 | 
						|
 | 
						|
        stream, created = create_stream_if_needed(
 | 
						|
            realm, "new stream with acting user", acting_user=user
 | 
						|
        )
 | 
						|
        self.assertCountEqual(stream.can_administer_channel_group.direct_members.all(), [user])
 | 
						|
 | 
						|
    def do_test_permission_setting_on_stream_creation(self, setting_name: str) -> None:
 | 
						|
        user = self.example_user("hamlet")
 | 
						|
        realm = user.realm
 | 
						|
        self.login_user(user)
 | 
						|
        moderators_system_group = NamedUserGroup.objects.get(
 | 
						|
            name="role:moderators", realm=realm, is_system_group=True
 | 
						|
        )
 | 
						|
 | 
						|
        permission_config = Stream.stream_permission_group_settings[setting_name]
 | 
						|
 | 
						|
        subscriptions = [{"name": "new_stream", "description": "New stream"}]
 | 
						|
        extra_post_data = {}
 | 
						|
        extra_post_data[setting_name] = orjson.dumps(moderators_system_group.id).decode()
 | 
						|
        result = self.subscribe_via_post(
 | 
						|
            user,
 | 
						|
            subscriptions,
 | 
						|
            extra_post_data,
 | 
						|
            subdomain="zulip",
 | 
						|
        )
 | 
						|
        self.assert_json_success(result)
 | 
						|
        stream = get_stream("new_stream", realm)
 | 
						|
        self.assertEqual(getattr(stream, setting_name).id, moderators_system_group.id)
 | 
						|
        # Delete the created stream, so we can create a new one for
 | 
						|
        # testing another setting value.
 | 
						|
        stream.delete()
 | 
						|
 | 
						|
        subscriptions = [{"name": "new_stream", "description": "New stream"}]
 | 
						|
        result = self.subscribe_via_post(user, subscriptions, subdomain="zulip")
 | 
						|
        self.assert_json_success(result)
 | 
						|
        stream = get_stream("new_stream", realm)
 | 
						|
        if permission_config.default_group_name == "stream_creator_or_nobody":
 | 
						|
            self.assertEqual(list(getattr(stream, setting_name).direct_members.all()), [user])
 | 
						|
            self.assertEqual(
 | 
						|
                list(getattr(stream, setting_name).direct_subgroups.all()),
 | 
						|
                [],
 | 
						|
            )
 | 
						|
        else:
 | 
						|
            default_group = NamedUserGroup.objects.get(
 | 
						|
                name=permission_config.default_group_name, realm=realm, is_system_group=True
 | 
						|
            )
 | 
						|
            self.assertEqual(getattr(stream, setting_name).id, default_group.id)
 | 
						|
        # Delete the created stream, so we can create a new one for
 | 
						|
        # testing another setting value.
 | 
						|
        stream.delete()
 | 
						|
 | 
						|
        hamletcharacters_group = NamedUserGroup.objects.get(name="hamletcharacters", realm=realm)
 | 
						|
        subscriptions = [{"name": "new_stream", "description": "New stream"}]
 | 
						|
        extra_post_data[setting_name] = orjson.dumps(hamletcharacters_group.id).decode()
 | 
						|
        result = self.subscribe_via_post(
 | 
						|
            user,
 | 
						|
            subscriptions,
 | 
						|
            extra_post_data,
 | 
						|
            allow_fail=True,
 | 
						|
            subdomain="zulip",
 | 
						|
        )
 | 
						|
        self.assert_json_success(result)
 | 
						|
        stream = get_stream("new_stream", realm)
 | 
						|
        self.assertEqual(getattr(stream, setting_name).id, hamletcharacters_group.id)
 | 
						|
        # Delete the created stream, so we can create a new one for
 | 
						|
        # testing another setting value.
 | 
						|
        stream.delete()
 | 
						|
 | 
						|
        subscriptions = [{"name": "new_stream", "description": "New stream"}]
 | 
						|
        extra_post_data[setting_name] = orjson.dumps(
 | 
						|
            {"direct_members": [user.id], "direct_subgroups": [moderators_system_group.id]}
 | 
						|
        ).decode()
 | 
						|
        result = self.subscribe_via_post(
 | 
						|
            user,
 | 
						|
            subscriptions,
 | 
						|
            extra_post_data,
 | 
						|
            allow_fail=True,
 | 
						|
            subdomain="zulip",
 | 
						|
        )
 | 
						|
        self.assert_json_success(result)
 | 
						|
        stream = get_stream("new_stream", realm)
 | 
						|
        self.assertEqual(list(getattr(stream, setting_name).direct_members.all()), [user])
 | 
						|
        self.assertEqual(
 | 
						|
            list(getattr(stream, setting_name).direct_subgroups.all()),
 | 
						|
            [moderators_system_group],
 | 
						|
        )
 | 
						|
        # Delete the created stream, so we can create a new one for
 | 
						|
        # testing another setting value.
 | 
						|
        stream.delete()
 | 
						|
 | 
						|
        nobody_group = NamedUserGroup.objects.get(
 | 
						|
            name="role:nobody", is_system_group=True, realm=realm
 | 
						|
        )
 | 
						|
 | 
						|
        subscriptions = [{"name": "new_stream", "description": "New stream"}]
 | 
						|
        extra_post_data[setting_name] = orjson.dumps(
 | 
						|
            {"direct_members": [], "direct_subgroups": []}
 | 
						|
        ).decode()
 | 
						|
        result = self.subscribe_via_post(
 | 
						|
            user,
 | 
						|
            subscriptions,
 | 
						|
            extra_post_data,
 | 
						|
            allow_fail=True,
 | 
						|
            subdomain="zulip",
 | 
						|
        )
 | 
						|
        self.assert_json_success(result)
 | 
						|
        stream = get_stream("new_stream", realm)
 | 
						|
        self.assertEqual(getattr(stream, setting_name).id, nobody_group.id)
 | 
						|
        # Delete the created stream, so we can create a new one for
 | 
						|
        # testing another setting value.
 | 
						|
        stream.delete()
 | 
						|
 | 
						|
        subscriptions = [{"name": "new_stream", "description": "New stream"}]
 | 
						|
        owners_group = NamedUserGroup.objects.get(
 | 
						|
            name="role:owners", is_system_group=True, realm=realm
 | 
						|
        )
 | 
						|
        extra_post_data[setting_name] = orjson.dumps(owners_group.id).decode()
 | 
						|
        result = self.subscribe_via_post(
 | 
						|
            user,
 | 
						|
            subscriptions,
 | 
						|
            extra_post_data,
 | 
						|
            allow_fail=True,
 | 
						|
            subdomain="zulip",
 | 
						|
        )
 | 
						|
        self.assert_json_success(result)
 | 
						|
        stream = get_stream("new_stream", realm)
 | 
						|
        self.assertEqual(getattr(stream, setting_name).id, owners_group.id)
 | 
						|
        # Delete the created stream, so we can create a new one for
 | 
						|
        # testing another setting value.
 | 
						|
        stream.delete()
 | 
						|
 | 
						|
        subscriptions = [{"name": "new_stream", "description": "New stream"}]
 | 
						|
        extra_post_data[setting_name] = orjson.dumps(nobody_group.id).decode()
 | 
						|
        result = self.subscribe_via_post(
 | 
						|
            user,
 | 
						|
            subscriptions,
 | 
						|
            extra_post_data,
 | 
						|
            allow_fail=True,
 | 
						|
            subdomain="zulip",
 | 
						|
        )
 | 
						|
        self.assert_json_success(result)
 | 
						|
        stream = get_stream("new_stream", realm)
 | 
						|
        self.assertEqual(getattr(stream, setting_name).id, nobody_group.id)
 | 
						|
        # Delete the created stream, so we can create a new one for
 | 
						|
        # testing another setting value.
 | 
						|
        stream.delete()
 | 
						|
 | 
						|
        subscriptions = [{"name": "new_stream", "description": "New stream"}]
 | 
						|
        everyone_group = NamedUserGroup.objects.get(
 | 
						|
            name="role:everyone", is_system_group=True, realm=realm
 | 
						|
        )
 | 
						|
        extra_post_data[setting_name] = orjson.dumps(everyone_group.id).decode()
 | 
						|
        result = self.subscribe_via_post(
 | 
						|
            user,
 | 
						|
            subscriptions,
 | 
						|
            extra_post_data,
 | 
						|
            allow_fail=True,
 | 
						|
            subdomain="zulip",
 | 
						|
        )
 | 
						|
        if permission_config.allow_everyone_group:
 | 
						|
            self.assert_json_success(result)
 | 
						|
            stream = get_stream("new_stream", realm)
 | 
						|
            self.assertEqual(getattr(stream, setting_name).id, everyone_group.id)
 | 
						|
            # Delete the created stream, so we can create a new one for
 | 
						|
            # testing another setting value.
 | 
						|
            stream.delete()
 | 
						|
        else:
 | 
						|
            self.assert_json_error(
 | 
						|
                result,
 | 
						|
                f"'{setting_name}' setting cannot be set to 'role:everyone' group.",
 | 
						|
            )
 | 
						|
 | 
						|
        subscriptions = [{"name": "new_stream", "description": "New stream"}]
 | 
						|
        internet_group = NamedUserGroup.objects.get(
 | 
						|
            name="role:internet", is_system_group=True, realm=realm
 | 
						|
        )
 | 
						|
        extra_post_data[setting_name] = orjson.dumps(internet_group.id).decode()
 | 
						|
        result = self.subscribe_via_post(
 | 
						|
            user,
 | 
						|
            subscriptions,
 | 
						|
            extra_post_data,
 | 
						|
            allow_fail=True,
 | 
						|
            subdomain="zulip",
 | 
						|
        )
 | 
						|
        self.assert_json_error(
 | 
						|
            result,
 | 
						|
            f"'{setting_name}' setting cannot be set to 'role:internet' group.",
 | 
						|
        )
 | 
						|
 | 
						|
    def test_permission_settings_on_stream_creation(self) -> None:
 | 
						|
        realm = get_realm("zulip")
 | 
						|
        members_system_group = NamedUserGroup.objects.get(
 | 
						|
            name=SystemGroups.MEMBERS, realm=realm, is_system_group=True
 | 
						|
        )
 | 
						|
        do_change_realm_permission_group_setting(
 | 
						|
            realm,
 | 
						|
            "can_set_delete_message_policy_group",
 | 
						|
            members_system_group,
 | 
						|
            acting_user=None,
 | 
						|
        )
 | 
						|
 | 
						|
        for setting_name in Stream.stream_permission_group_settings:
 | 
						|
            self.do_test_permission_setting_on_stream_creation(setting_name)
 | 
						|
 | 
						|
    def test_default_permission_settings_on_stream_creation(self) -> None:
 | 
						|
        hamlet = self.example_user("hamlet")
 | 
						|
        realm = hamlet.realm
 | 
						|
        subscriptions = [{"name": "new_stream", "description": "New stream"}]
 | 
						|
 | 
						|
        self.login("hamlet")
 | 
						|
        with self.capture_send_event_calls(expected_num_events=4) as events:
 | 
						|
            result = self.subscribe_via_post(
 | 
						|
                hamlet,
 | 
						|
                subscriptions,
 | 
						|
            )
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
        nobody_group = NamedUserGroup.objects.get(
 | 
						|
            name=SystemGroups.NOBODY, realm=realm, is_system_group=True
 | 
						|
        )
 | 
						|
        admins_group = NamedUserGroup.objects.get(
 | 
						|
            name=SystemGroups.ADMINISTRATORS, realm=realm, is_system_group=True
 | 
						|
        )
 | 
						|
        everyone_group = NamedUserGroup.objects.get(
 | 
						|
            name=SystemGroups.EVERYONE, realm=realm, is_system_group=True
 | 
						|
        )
 | 
						|
 | 
						|
        stream = get_stream("new_stream", realm)
 | 
						|
        self.assertEqual(
 | 
						|
            list(
 | 
						|
                stream.can_administer_channel_group.direct_members.all().values_list(
 | 
						|
                    "id", flat=True
 | 
						|
                )
 | 
						|
            ),
 | 
						|
            [hamlet.id],
 | 
						|
        )
 | 
						|
        self.assertEqual(
 | 
						|
            list(
 | 
						|
                stream.can_administer_channel_group.direct_subgroups.all().values_list(
 | 
						|
                    "id", flat=True
 | 
						|
                )
 | 
						|
            ),
 | 
						|
            [],
 | 
						|
        )
 | 
						|
 | 
						|
        self.assertEqual(stream.can_add_subscribers_group_id, nobody_group.id)
 | 
						|
        self.assertEqual(stream.can_remove_subscribers_group_id, admins_group.id)
 | 
						|
        self.assertEqual(stream.can_send_message_group_id, everyone_group.id)
 | 
						|
        self.assertEqual(stream.can_subscribe_group_id, nobody_group.id)
 | 
						|
 | 
						|
        # Check setting values sent in stream creation events.
 | 
						|
        event_stream = events[0]["event"]["streams"][0]
 | 
						|
        self.assertEqual(
 | 
						|
            event_stream["can_administer_channel_group"],
 | 
						|
            UserGroupMembersDict(direct_members=[hamlet.id], direct_subgroups=[]),
 | 
						|
        )
 | 
						|
 | 
						|
        self.assertEqual(event_stream["can_add_subscribers_group"], nobody_group.id)
 | 
						|
        self.assertEqual(event_stream["can_remove_subscribers_group"], admins_group.id)
 | 
						|
        self.assertEqual(event_stream["can_send_message_group"], everyone_group.id)
 | 
						|
        self.assertEqual(event_stream["can_subscribe_group"], nobody_group.id)
 | 
						|
 | 
						|
    def test_acting_user_is_creator(self) -> None:
 | 
						|
        """
 | 
						|
        If backend calls provide an acting_user while trying to
 | 
						|
        create streams, assign acting_user as the stream creator
 | 
						|
        """
 | 
						|
        hamlet = self.example_user("hamlet")
 | 
						|
        new_streams, _ = create_streams_if_needed(
 | 
						|
            hamlet.realm,
 | 
						|
            [
 | 
						|
                StreamDict(
 | 
						|
                    name="hamlet's test stream",
 | 
						|
                    description="No description",
 | 
						|
                    invite_only=True,
 | 
						|
                    is_web_public=True,
 | 
						|
                )
 | 
						|
            ],
 | 
						|
            acting_user=hamlet,
 | 
						|
        )
 | 
						|
        created_stream = new_streams[0]
 | 
						|
        self.assertEqual(created_stream.creator_id, hamlet.id)
 | 
						|
 | 
						|
    def test_channel_create_message_exists_for_all_policy_types(self) -> None:
 | 
						|
        """
 | 
						|
        Create a channel for each policy type to ensure they all have a "new channel" message.
 | 
						|
        """
 | 
						|
        # this is to check if the appropriate channel name is present in the "new channel" message
 | 
						|
        policy_key_map: dict[str, str] = {
 | 
						|
            "web_public": "**Web-public**",
 | 
						|
            "public": "**Public**",
 | 
						|
            "private_shared_history": "**Private, shared history**",
 | 
						|
            "private_protected_history": "**Private, protected history**",
 | 
						|
        }
 | 
						|
        for policy_key, policy_dict in Stream.PERMISSION_POLICIES.items():
 | 
						|
            channel_creator = self.example_user("desdemona")
 | 
						|
            subdomain = "zulip"
 | 
						|
 | 
						|
            if policy_key == "public_protected_history":
 | 
						|
                # This is a special channel policy only available in Zephyr realms.
 | 
						|
                channel_creator = self.mit_user("starnine")
 | 
						|
                subdomain = "zephyr"
 | 
						|
 | 
						|
            self.login_user(channel_creator)
 | 
						|
            new_channel_name = f"New {policy_key} channel"
 | 
						|
            result = self.api_post(
 | 
						|
                channel_creator,
 | 
						|
                "/json/users/me/subscriptions",
 | 
						|
                {
 | 
						|
                    "subscriptions": orjson.dumps([{"name": new_channel_name}]).decode(),
 | 
						|
                    "is_web_public": orjson.dumps(policy_dict["is_web_public"]).decode(),
 | 
						|
                    "invite_only": orjson.dumps(policy_dict["invite_only"]).decode(),
 | 
						|
                    "history_public_to_subscribers": orjson.dumps(
 | 
						|
                        policy_dict["history_public_to_subscribers"]
 | 
						|
                    ).decode(),
 | 
						|
                },
 | 
						|
                subdomain=subdomain,
 | 
						|
            )
 | 
						|
            self.assert_json_success(result)
 | 
						|
            new_channel = get_stream(new_channel_name, channel_creator.realm)
 | 
						|
            channel_events_messages = get_topic_messages(
 | 
						|
                channel_creator, new_channel, "channel events"
 | 
						|
            )
 | 
						|
            if policy_key == "public_protected_history":
 | 
						|
                # These do not get channel creation notification.
 | 
						|
                self.assert_length(channel_events_messages, 0)
 | 
						|
                continue
 | 
						|
 | 
						|
            self.assert_length(channel_events_messages, 1)
 | 
						|
            self.assertIn(policy_key_map[policy_key], channel_events_messages[0].content)
 | 
						|
 | 
						|
    def test_adding_channels_to_folder_during_creation(self) -> None:
 | 
						|
        realm = get_realm("zulip")
 | 
						|
        iago = self.example_user("iago")
 | 
						|
        hamlet = self.example_user("hamlet")
 | 
						|
        channel_folder = check_add_channel_folder(realm, "Backend", "", acting_user=iago)
 | 
						|
 | 
						|
        subscriptions = [
 | 
						|
            {"name": "new_stream", "description": "New stream"},
 | 
						|
            {"name": "new_stream_2", "description": "New stream 2"},
 | 
						|
        ]
 | 
						|
        extra_post_data = {}
 | 
						|
 | 
						|
        extra_post_data["folder_id"] = orjson.dumps(99).decode()
 | 
						|
        result = self.subscribe_via_post(
 | 
						|
            hamlet,
 | 
						|
            subscriptions,
 | 
						|
            extra_post_data,
 | 
						|
            allow_fail=True,
 | 
						|
            subdomain="zulip",
 | 
						|
        )
 | 
						|
        self.assert_json_error(result, "Invalid channel folder ID")
 | 
						|
 | 
						|
        extra_post_data["folder_id"] = orjson.dumps(channel_folder.id).decode()
 | 
						|
        result = self.subscribe_via_post(
 | 
						|
            hamlet,
 | 
						|
            subscriptions,
 | 
						|
            extra_post_data,
 | 
						|
            subdomain="zulip",
 | 
						|
        )
 | 
						|
        stream = get_stream("new_stream", realm)
 | 
						|
        self.assertEqual(stream.folder, channel_folder)
 | 
						|
        stream = get_stream("new_stream_2", realm)
 | 
						|
        self.assertEqual(stream.folder, channel_folder)
 | 
						|
 | 
						|
        subscriptions = [
 | 
						|
            {"name": "new_stream_3", "description": "New stream 3"},
 | 
						|
            {"name": "new_stream_4", "description": "New stream 4"},
 | 
						|
        ]
 | 
						|
        extra_post_data = {}
 | 
						|
        result = self.subscribe_via_post(
 | 
						|
            hamlet,
 | 
						|
            subscriptions,
 | 
						|
            extra_post_data,
 | 
						|
            subdomain="zulip",
 | 
						|
        )
 | 
						|
        stream = get_stream("new_stream_3", realm)
 | 
						|
        self.assertIsNone(stream.folder)
 | 
						|
        stream = get_stream("new_stream_4", realm)
 | 
						|
        self.assertIsNone(stream.folder)
 | 
						|
 | 
						|
    def test_stream_message_retention_days_on_stream_creation(self) -> None:
 | 
						|
        """
 | 
						|
        Only admins can create streams with message_retention_days
 | 
						|
        with value other than None.
 | 
						|
        """
 | 
						|
        admin = self.example_user("iago")
 | 
						|
 | 
						|
        streams_raw: list[StreamDict] = [
 | 
						|
            {
 | 
						|
                "name": "new_stream",
 | 
						|
                "message_retention_days": 10,
 | 
						|
                "is_web_public": False,
 | 
						|
            }
 | 
						|
        ]
 | 
						|
        with self.assertRaisesRegex(JsonableError, "Must be an organization owner"):
 | 
						|
            list_to_streams(streams_raw, admin, autocreate=True)
 | 
						|
 | 
						|
        streams_raw = [
 | 
						|
            {
 | 
						|
                "name": "new_stream",
 | 
						|
                "message_retention_days": -1,
 | 
						|
                "is_web_public": False,
 | 
						|
            }
 | 
						|
        ]
 | 
						|
        with self.assertRaisesRegex(JsonableError, "Must be an organization owner"):
 | 
						|
            list_to_streams(streams_raw, admin, autocreate=True)
 | 
						|
 | 
						|
        streams_raw = [
 | 
						|
            {
 | 
						|
                "name": "new_stream",
 | 
						|
                "message_retention_days": None,
 | 
						|
                "is_web_public": False,
 | 
						|
            }
 | 
						|
        ]
 | 
						|
        result = list_to_streams(streams_raw, admin, autocreate=True)
 | 
						|
        self.assert_length(result[0], 0)
 | 
						|
        self.assert_length(result[1], 1)
 | 
						|
        self.assertEqual(result[1][0].name, "new_stream")
 | 
						|
        self.assertEqual(result[1][0].message_retention_days, None)
 | 
						|
 | 
						|
        owner = self.example_user("desdemona")
 | 
						|
        realm = owner.realm
 | 
						|
        streams_raw = [
 | 
						|
            {
 | 
						|
                "name": "new_stream1",
 | 
						|
                "message_retention_days": 10,
 | 
						|
                "is_web_public": False,
 | 
						|
            },
 | 
						|
            {
 | 
						|
                "name": "new_stream2",
 | 
						|
                "message_retention_days": -1,
 | 
						|
                "is_web_public": False,
 | 
						|
            },
 | 
						|
            {
 | 
						|
                "name": "new_stream3",
 | 
						|
                "is_web_public": False,
 | 
						|
            },
 | 
						|
        ]
 | 
						|
 | 
						|
        do_change_realm_plan_type(realm, Realm.PLAN_TYPE_LIMITED, acting_user=admin)
 | 
						|
        with self.assertRaisesRegex(
 | 
						|
            JsonableError, "Available on Zulip Cloud Standard. Upgrade to access."
 | 
						|
        ):
 | 
						|
            list_to_streams(streams_raw, owner, autocreate=True)
 | 
						|
 | 
						|
        do_change_realm_plan_type(realm, Realm.PLAN_TYPE_SELF_HOSTED, acting_user=admin)
 | 
						|
        result = list_to_streams(streams_raw, owner, autocreate=True)
 | 
						|
        self.assert_length(result[0], 0)
 | 
						|
        self.assert_length(result[1], 3)
 | 
						|
        self.assertEqual(result[1][0].name, "new_stream1")
 | 
						|
        self.assertEqual(result[1][0].message_retention_days, 10)
 | 
						|
        self.assertEqual(result[1][1].name, "new_stream2")
 | 
						|
        self.assertEqual(result[1][1].message_retention_days, -1)
 | 
						|
        self.assertEqual(result[1][2].name, "new_stream3")
 | 
						|
        self.assertEqual(result[1][2].message_retention_days, None)
 |