Files
zulip/zerver/tests/test_channel_creation.py
Kislay Verma 5f80f0a970 channel: Add option to notify users newly added to a channel.
When a user is added to a channel, we send
the user that was added a Notification Bot
DMs to let them know about it.

In this commit, we add an option for whether or not
this message is sent.

If more than 100 users are added at once, we
do not send notification bot DMs since it would
be a performance-costly operation.

We also send this threshold value of 100 in the
initial state data to the clients.

Fixes part of #31189
2025-06-26 10:08:11 -07:00

821 lines
33 KiB
Python

import orjson
from zerver.actions.channel_folders import check_add_channel_folder
from zerver.actions.realm_settings import do_change_realm_plan_type
from zerver.actions.user_groups import check_add_user_group
from zerver.actions.users import do_change_user_role
from zerver.lib.default_streams import get_default_stream_ids_for_realm
from zerver.lib.exceptions import JsonableError
from zerver.lib.message import UnreadStreamInfo, aggregate_unread_data, get_raw_unread_data
from zerver.lib.streams import (
StreamDict,
create_stream_if_needed,
create_streams_if_needed,
ensure_stream,
list_to_streams,
)
from zerver.lib.test_classes import ZulipTestCase, get_topic_messages
from zerver.lib.test_helpers import reset_email_visibility_to_everyone_in_zulip_realm
from zerver.lib.types import UserGroupMembersDict
from zerver.models import (
Message,
NamedUserGroup,
Realm,
Stream,
Subscription,
UserMessage,
UserProfile,
)
from zerver.models.groups import SystemGroups
from zerver.models.realms import get_realm
from zerver.models.streams import get_stream
from zerver.models.users import active_non_guest_user_ids
class TestCreateStreams(ZulipTestCase):
def test_creating_streams(self) -> None:
stream_names = ["new1", "new2", "new3"]
stream_descriptions = ["des1", "des2", "des3"]
realm = get_realm("zulip")
iago = self.example_user("iago")
# Test stream creation events.
with self.capture_send_event_calls(expected_num_events=1) as events:
ensure_stream(realm, "Public stream", invite_only=False, acting_user=None)
self.assertEqual(events[0]["event"]["type"], "stream")
self.assertEqual(events[0]["event"]["op"], "create")
# Send public stream creation event to all active users.
self.assertEqual(events[0]["users"], active_non_guest_user_ids(realm.id))
self.assertEqual(events[0]["event"]["streams"][0]["name"], "Public stream")
self.assertEqual(events[0]["event"]["streams"][0]["stream_weekly_traffic"], None)
aaron_group = check_add_user_group(
realm, "aaron_group", [self.example_user("aaron")], acting_user=iago
)
prospero_group = check_add_user_group(
realm, "prospero_group", [self.example_user("prospero")], acting_user=iago
)
cordelia_group = check_add_user_group(
realm, "cordelia_group", [self.example_user("cordelia")], acting_user=iago
)
with self.capture_send_event_calls(expected_num_events=1) as events:
create_stream_if_needed(
realm,
"Private stream",
invite_only=True,
can_administer_channel_group=aaron_group,
can_add_subscribers_group=prospero_group,
can_subscribe_group=cordelia_group,
)
self.assertEqual(events[0]["event"]["type"], "stream")
self.assertEqual(events[0]["event"]["op"], "create")
# Send private stream creation event to only realm admins.
self.assert_length(events[0]["users"], 5)
self.assertCountEqual(
[
iago.id,
self.example_user("desdemona").id,
self.example_user("aaron").id,
self.example_user("prospero").id,
self.example_user("cordelia").id,
],
events[0]["users"],
)
self.assertEqual(events[0]["event"]["streams"][0]["name"], "Private stream")
self.assertEqual(events[0]["event"]["streams"][0]["stream_weekly_traffic"], None)
moderators_system_group = NamedUserGroup.objects.get(
name="role:moderators", realm=realm, is_system_group=True
)
new_streams, existing_streams = create_streams_if_needed(
realm,
[
{
"name": stream_name,
"description": stream_description,
"invite_only": True,
"message_retention_days": -1,
"can_remove_subscribers_group": moderators_system_group,
}
for (stream_name, stream_description) in zip(
stream_names, stream_descriptions, strict=False
)
],
)
self.assert_length(new_streams, 3)
self.assert_length(existing_streams, 0)
actual_stream_names = {stream.name for stream in new_streams}
self.assertEqual(actual_stream_names, set(stream_names))
actual_stream_descriptions = {stream.description for stream in new_streams}
self.assertEqual(actual_stream_descriptions, set(stream_descriptions))
for stream in new_streams:
self.assertTrue(stream.invite_only)
self.assertTrue(stream.message_retention_days == -1)
self.assertEqual(stream.can_remove_subscribers_group.id, moderators_system_group.id)
# Streams created where acting_user is None have no creator
self.assertIsNone(stream.creator_id)
new_streams, existing_streams = create_streams_if_needed(
realm,
[
{"name": stream_name, "description": stream_description, "invite_only": True}
for (stream_name, stream_description) in zip(
stream_names, stream_descriptions, strict=False
)
],
)
self.assert_length(new_streams, 0)
self.assert_length(existing_streams, 3)
actual_stream_names = {stream.name for stream in existing_streams}
self.assertEqual(actual_stream_names, set(stream_names))
actual_stream_descriptions = {stream.description for stream in existing_streams}
self.assertEqual(actual_stream_descriptions, set(stream_descriptions))
for stream in existing_streams:
self.assertTrue(stream.invite_only)
def test_create_api_multiline_description(self) -> None:
user = self.example_user("hamlet")
realm = user.realm
self.login_user(user)
subscriptions = [{"name": "new_stream", "description": "multi\nline\ndescription"}]
result = self.subscribe_via_post(user, subscriptions, subdomain="zulip")
self.assert_json_success(result)
stream = get_stream("new_stream", realm)
self.assertEqual(stream.description, "multi line description")
def test_create_api_topic_permalink_description(self) -> None:
user = self.example_user("iago")
realm = user.realm
self.login_user(user)
hamlet = self.example_user("hamlet")
core_stream = self.make_stream("core", realm, True, history_public_to_subscribers=True)
self.subscribe(hamlet, "core")
msg_id = self.send_stream_message(hamlet, "core", topic_name="testing")
# Test permalink not generated for description since user has no access to
# the channel.
subscriptions = [{"name": "stream1", "description": "#**core>testing**"}]
result = self.subscribe_via_post(user, subscriptions, subdomain="zulip")
self.assert_json_success(result)
stream = get_stream("stream1", realm)
self.assertEqual(stream.rendered_description, "<p>#<strong>core&gt;testing</strong></p>")
self.subscribe(user, "core")
# Test permalink generated for the description since user now has access
# to the channel.
subscriptions = [{"name": "stream2", "description": "#**core>testing**"}]
result = self.subscribe_via_post(user, subscriptions, subdomain="zulip")
self.assert_json_success(result)
stream = get_stream("stream2", realm)
self.assertEqual(
stream.rendered_description,
f'<p><a class="stream-topic" data-stream-id="{core_stream.id}" href="/#narrow/channel/{core_stream.id}-core/topic/testing/with/{msg_id}">#{core_stream.name} &gt; testing</a></p>',
)
def test_history_public_to_subscribers_on_stream_creation(self) -> None:
realm = get_realm("zulip")
stream_dicts: list[StreamDict] = [
{
"name": "publicstream",
"description": "Public stream with public history",
},
{"name": "webpublicstream", "description": "Web-public stream", "is_web_public": True},
{
"name": "privatestream",
"description": "Private stream with non-public history",
"invite_only": True,
},
{
"name": "privatewithhistory",
"description": "Private stream with public history",
"invite_only": True,
"history_public_to_subscribers": True,
},
{
"name": "publictrywithouthistory",
"description": "Public stream without public history (disallowed)",
"invite_only": False,
"history_public_to_subscribers": False,
},
]
created, existing = create_streams_if_needed(realm, stream_dicts)
self.assert_length(created, 5)
self.assert_length(existing, 0)
for stream in created:
if stream.name == "publicstream":
self.assertTrue(stream.history_public_to_subscribers)
if stream.name == "webpublicstream":
self.assertTrue(stream.history_public_to_subscribers)
if stream.name == "privatestream":
self.assertFalse(stream.history_public_to_subscribers)
if stream.name == "privatewithhistory":
self.assertTrue(stream.history_public_to_subscribers)
if stream.name == "publictrywithouthistory":
self.assertTrue(stream.history_public_to_subscribers)
def test_add_stream_as_default_on_stream_creation(self) -> None:
user_profile = self.example_user("hamlet")
self.login_user(user_profile)
realm = user_profile.realm
subscriptions = [
{"name": "default_stream", "description": "This stream is default for new users"}
]
result = self.subscribe_via_post(
user_profile,
subscriptions,
{"is_default_stream": "true"},
allow_fail=True,
subdomain="zulip",
)
self.assert_json_error(result, "Insufficient permission")
do_change_user_role(user_profile, UserProfile.ROLE_REALM_ADMINISTRATOR, acting_user=None)
result = self.subscribe_via_post(
user_profile, subscriptions, {"is_default_stream": "true"}, subdomain="zulip"
)
self.assert_json_success(result)
default_stream = get_stream("default_stream", realm)
self.assertTrue(default_stream.id in get_default_stream_ids_for_realm(realm.id))
subscriptions = [
{
"name": "private_default_stream",
"description": "This stream is private and default for new users",
}
]
result = self.subscribe_via_post(
user_profile,
subscriptions,
{"is_default_stream": "true"},
invite_only=True,
allow_fail=True,
subdomain="zulip",
)
self.assert_json_error(result, "A default channel cannot be private.")
def test_history_public_to_subscribers_zephyr_realm(self) -> None:
realm = get_realm("zephyr")
stream, created = create_stream_if_needed(realm, "private_stream", invite_only=True)
self.assertTrue(created)
self.assertTrue(stream.invite_only)
self.assertFalse(stream.history_public_to_subscribers)
stream, created = create_stream_if_needed(realm, "public_stream", invite_only=False)
self.assertTrue(created)
self.assertFalse(stream.invite_only)
self.assertFalse(stream.history_public_to_subscribers)
def test_auto_mark_stream_created_message_as_read_for_stream_creator(self) -> None:
# This test relies on email == delivery_email for
# convenience.
reset_email_visibility_to_everyone_in_zulip_realm()
realm = Realm.objects.get(name="Zulip Dev")
iago = self.example_user("iago")
hamlet = self.example_user("hamlet")
cordelia = self.example_user("cordelia")
aaron = self.example_user("aaron")
# Establish a stream for notifications.
announce_stream = ensure_stream(
realm, "announce", False, "announcements here.", acting_user=None
)
realm.new_stream_announcements_stream_id = announce_stream.id
realm.save(update_fields=["new_stream_announcements_stream_id"])
self.subscribe(iago, announce_stream.name)
self.subscribe(hamlet, announce_stream.name)
self.login_user(iago)
initial_message_count = Message.objects.count()
initial_usermessage_count = UserMessage.objects.count()
data = {
"subscriptions": '[{"name":"brand new stream","description":""}]',
"history_public_to_subscribers": "true",
"invite_only": "false",
"announce": "true",
"principals": orjson.dumps([iago.id, aaron.id, cordelia.id, hamlet.id]).decode(),
}
response = self.client_post("/json/users/me/subscriptions", data)
final_message_count = Message.objects.count()
final_usermessage_count = UserMessage.objects.count()
expected_response = {
"result": "success",
"msg": "",
"subscribed": {
"10": ["brand new stream"],
"11": ["brand new stream"],
"6": ["brand new stream"],
"8": ["brand new stream"],
},
"already_subscribed": {},
"new_subscription_messages_sent": True,
}
self.assertEqual(response.status_code, 200)
self.assertEqual(orjson.loads(response.content), expected_response)
# 2 messages should be created, one in announce and one in the new stream itself.
self.assertEqual(final_message_count - initial_message_count, 2)
# 4 UserMessages per subscriber: One for each of the subscribers, plus 1 for
# each user in the notifications stream.
announce_stream_subs = Subscription.objects.filter(recipient=announce_stream.recipient)
self.assertEqual(
final_usermessage_count - initial_usermessage_count, 4 + announce_stream_subs.count()
)
def get_unread_stream_data(user: UserProfile) -> list[UnreadStreamInfo]:
raw_unread_data = get_raw_unread_data(user)
aggregated_data = aggregate_unread_data(raw_unread_data, allow_empty_topic_name=True)
return aggregated_data["streams"]
stream_id = Stream.objects.get(name="brand new stream").id
iago_unread_messages = get_unread_stream_data(iago)
hamlet_unread_messages = get_unread_stream_data(hamlet)
# The stream creation messages should be unread for Hamlet
self.assert_length(hamlet_unread_messages, 2)
# According to the code in zerver/views/streams/add_subscriptions_backend
# the notification stream message is sent first, then the new stream's message.
self.assertEqual(hamlet_unread_messages[1]["stream_id"], stream_id)
# But it should be marked as read for Iago, the stream creator.
self.assert_length(iago_unread_messages, 0)
def test_can_administer_channel_group_default_on_stream_creation(self) -> None:
user = self.example_user("hamlet")
realm = user.realm
self.login_user(user)
nobody_system_group = NamedUserGroup.objects.get(
name="role:nobody", realm=realm, is_system_group=True
)
stream, created = create_stream_if_needed(
realm, "new stream without acting user", invite_only=True
)
self.assertEqual(stream.can_administer_channel_group.id, nobody_system_group.id)
stream, created = create_stream_if_needed(
realm, "new stream with acting user", acting_user=user
)
self.assertCountEqual(stream.can_administer_channel_group.direct_members.all(), [user])
def do_test_permission_setting_on_stream_creation(self, setting_name: str) -> None:
user = self.example_user("hamlet")
realm = user.realm
self.login_user(user)
moderators_system_group = NamedUserGroup.objects.get(
name="role:moderators", realm=realm, is_system_group=True
)
permission_config = Stream.stream_permission_group_settings[setting_name]
subscriptions = [{"name": "new_stream", "description": "New stream"}]
extra_post_data = {}
extra_post_data[setting_name] = orjson.dumps(moderators_system_group.id).decode()
result = self.subscribe_via_post(
user,
subscriptions,
extra_post_data,
subdomain="zulip",
)
self.assert_json_success(result)
stream = get_stream("new_stream", realm)
self.assertEqual(getattr(stream, setting_name).id, moderators_system_group.id)
# Delete the created stream, so we can create a new one for
# testing another setting value.
stream.delete()
subscriptions = [{"name": "new_stream", "description": "New stream"}]
result = self.subscribe_via_post(user, subscriptions, subdomain="zulip")
self.assert_json_success(result)
stream = get_stream("new_stream", realm)
if permission_config.default_group_name == "stream_creator_or_nobody":
self.assertEqual(list(getattr(stream, setting_name).direct_members.all()), [user])
self.assertEqual(
list(getattr(stream, setting_name).direct_subgroups.all()),
[],
)
else:
default_group = NamedUserGroup.objects.get(
name=permission_config.default_group_name, realm=realm, is_system_group=True
)
self.assertEqual(getattr(stream, setting_name).id, default_group.id)
# Delete the created stream, so we can create a new one for
# testing another setting value.
stream.delete()
hamletcharacters_group = NamedUserGroup.objects.get(name="hamletcharacters", realm=realm)
subscriptions = [{"name": "new_stream", "description": "New stream"}]
extra_post_data[setting_name] = orjson.dumps(hamletcharacters_group.id).decode()
result = self.subscribe_via_post(
user,
subscriptions,
extra_post_data,
allow_fail=True,
subdomain="zulip",
)
self.assert_json_success(result)
stream = get_stream("new_stream", realm)
self.assertEqual(getattr(stream, setting_name).id, hamletcharacters_group.id)
# Delete the created stream, so we can create a new one for
# testing another setting value.
stream.delete()
subscriptions = [{"name": "new_stream", "description": "New stream"}]
extra_post_data[setting_name] = orjson.dumps(
{"direct_members": [user.id], "direct_subgroups": [moderators_system_group.id]}
).decode()
result = self.subscribe_via_post(
user,
subscriptions,
extra_post_data,
allow_fail=True,
subdomain="zulip",
)
self.assert_json_success(result)
stream = get_stream("new_stream", realm)
self.assertEqual(list(getattr(stream, setting_name).direct_members.all()), [user])
self.assertEqual(
list(getattr(stream, setting_name).direct_subgroups.all()),
[moderators_system_group],
)
# Delete the created stream, so we can create a new one for
# testing another setting value.
stream.delete()
nobody_group = NamedUserGroup.objects.get(
name="role:nobody", is_system_group=True, realm=realm
)
subscriptions = [{"name": "new_stream", "description": "New stream"}]
extra_post_data[setting_name] = orjson.dumps(
{"direct_members": [], "direct_subgroups": []}
).decode()
result = self.subscribe_via_post(
user,
subscriptions,
extra_post_data,
allow_fail=True,
subdomain="zulip",
)
self.assert_json_success(result)
stream = get_stream("new_stream", realm)
self.assertEqual(getattr(stream, setting_name).id, nobody_group.id)
# Delete the created stream, so we can create a new one for
# testing another setting value.
stream.delete()
subscriptions = [{"name": "new_stream", "description": "New stream"}]
owners_group = NamedUserGroup.objects.get(
name="role:owners", is_system_group=True, realm=realm
)
extra_post_data[setting_name] = orjson.dumps(owners_group.id).decode()
result = self.subscribe_via_post(
user,
subscriptions,
extra_post_data,
allow_fail=True,
subdomain="zulip",
)
self.assert_json_success(result)
stream = get_stream("new_stream", realm)
self.assertEqual(getattr(stream, setting_name).id, owners_group.id)
# Delete the created stream, so we can create a new one for
# testing another setting value.
stream.delete()
subscriptions = [{"name": "new_stream", "description": "New stream"}]
extra_post_data[setting_name] = orjson.dumps(nobody_group.id).decode()
result = self.subscribe_via_post(
user,
subscriptions,
extra_post_data,
allow_fail=True,
subdomain="zulip",
)
self.assert_json_success(result)
stream = get_stream("new_stream", realm)
self.assertEqual(getattr(stream, setting_name).id, nobody_group.id)
# Delete the created stream, so we can create a new one for
# testing another setting value.
stream.delete()
subscriptions = [{"name": "new_stream", "description": "New stream"}]
everyone_group = NamedUserGroup.objects.get(
name="role:everyone", is_system_group=True, realm=realm
)
extra_post_data[setting_name] = orjson.dumps(everyone_group.id).decode()
result = self.subscribe_via_post(
user,
subscriptions,
extra_post_data,
allow_fail=True,
subdomain="zulip",
)
if permission_config.allow_everyone_group:
self.assert_json_success(result)
stream = get_stream("new_stream", realm)
self.assertEqual(getattr(stream, setting_name).id, everyone_group.id)
# Delete the created stream, so we can create a new one for
# testing another setting value.
stream.delete()
else:
self.assert_json_error(
result,
f"'{setting_name}' setting cannot be set to 'role:everyone' group.",
)
subscriptions = [{"name": "new_stream", "description": "New stream"}]
internet_group = NamedUserGroup.objects.get(
name="role:internet", is_system_group=True, realm=realm
)
extra_post_data[setting_name] = orjson.dumps(internet_group.id).decode()
result = self.subscribe_via_post(
user,
subscriptions,
extra_post_data,
allow_fail=True,
subdomain="zulip",
)
self.assert_json_error(
result,
f"'{setting_name}' setting cannot be set to 'role:internet' group.",
)
def test_permission_settings_on_stream_creation(self) -> None:
for setting_name in Stream.stream_permission_group_settings:
self.do_test_permission_setting_on_stream_creation(setting_name)
def test_default_permission_settings_on_stream_creation(self) -> None:
hamlet = self.example_user("hamlet")
realm = hamlet.realm
subscriptions = [{"name": "new_stream", "description": "New stream"}]
self.login("hamlet")
with self.capture_send_event_calls(expected_num_events=4) as events:
result = self.subscribe_via_post(
hamlet,
subscriptions,
)
self.assert_json_success(result)
nobody_group = NamedUserGroup.objects.get(
name=SystemGroups.NOBODY, realm=realm, is_system_group=True
)
admins_group = NamedUserGroup.objects.get(
name=SystemGroups.ADMINISTRATORS, realm=realm, is_system_group=True
)
everyone_group = NamedUserGroup.objects.get(
name=SystemGroups.EVERYONE, realm=realm, is_system_group=True
)
stream = get_stream("new_stream", realm)
self.assertEqual(
list(
stream.can_administer_channel_group.direct_members.all().values_list(
"id", flat=True
)
),
[hamlet.id],
)
self.assertEqual(
list(
stream.can_administer_channel_group.direct_subgroups.all().values_list(
"id", flat=True
)
),
[],
)
self.assertEqual(stream.can_add_subscribers_group_id, nobody_group.id)
self.assertEqual(stream.can_remove_subscribers_group_id, admins_group.id)
self.assertEqual(stream.can_send_message_group_id, everyone_group.id)
self.assertEqual(stream.can_subscribe_group_id, nobody_group.id)
# Check setting values sent in stream creation events.
event_stream = events[0]["event"]["streams"][0]
self.assertEqual(
event_stream["can_administer_channel_group"],
UserGroupMembersDict(direct_members=[hamlet.id], direct_subgroups=[]),
)
self.assertEqual(event_stream["can_add_subscribers_group"], nobody_group.id)
self.assertEqual(event_stream["can_remove_subscribers_group"], admins_group.id)
self.assertEqual(event_stream["can_send_message_group"], everyone_group.id)
self.assertEqual(event_stream["can_subscribe_group"], nobody_group.id)
def test_acting_user_is_creator(self) -> None:
"""
If backend calls provide an acting_user while trying to
create streams, assign acting_user as the stream creator
"""
hamlet = self.example_user("hamlet")
new_streams, _ = create_streams_if_needed(
hamlet.realm,
[
StreamDict(
name="hamlet's test stream",
description="No description",
invite_only=True,
is_web_public=True,
)
],
acting_user=hamlet,
)
created_stream = new_streams[0]
self.assertEqual(created_stream.creator_id, hamlet.id)
def test_channel_create_message_exists_for_all_policy_types(self) -> None:
"""
Create a channel for each policy type to ensure they all have a "new channel" message.
"""
# this is to check if the appropriate channel name is present in the "new channel" message
policy_key_map: dict[str, str] = {
"web_public": "**Web-public**",
"public": "**Public**",
"private_shared_history": "**Private, shared history**",
"private_protected_history": "**Private, protected history**",
}
for policy_key, policy_dict in Stream.PERMISSION_POLICIES.items():
channel_creator = self.example_user("desdemona")
subdomain = "zulip"
if policy_key == "public_protected_history":
# This is a special channel policy only available in Zephyr realms.
channel_creator = self.mit_user("starnine")
subdomain = "zephyr"
self.login_user(channel_creator)
new_channel_name = f"New {policy_key} channel"
result = self.api_post(
channel_creator,
"/json/users/me/subscriptions",
{
"subscriptions": orjson.dumps([{"name": new_channel_name}]).decode(),
"is_web_public": orjson.dumps(policy_dict["is_web_public"]).decode(),
"invite_only": orjson.dumps(policy_dict["invite_only"]).decode(),
"history_public_to_subscribers": orjson.dumps(
policy_dict["history_public_to_subscribers"]
).decode(),
},
subdomain=subdomain,
)
self.assert_json_success(result)
new_channel = get_stream(new_channel_name, channel_creator.realm)
channel_events_messages = get_topic_messages(
channel_creator, new_channel, "channel events"
)
if policy_key == "public_protected_history":
# These do not get channel creation notification.
self.assert_length(channel_events_messages, 0)
continue
self.assert_length(channel_events_messages, 1)
self.assertIn(policy_key_map[policy_key], channel_events_messages[0].content)
def test_adding_channels_to_folder_during_creation(self) -> None:
realm = get_realm("zulip")
iago = self.example_user("iago")
hamlet = self.example_user("hamlet")
channel_folder = check_add_channel_folder(realm, "Backend", "", acting_user=iago)
subscriptions = [
{"name": "new_stream", "description": "New stream"},
{"name": "new_stream_2", "description": "New stream 2"},
]
extra_post_data = {}
extra_post_data["folder_id"] = orjson.dumps(99).decode()
result = self.subscribe_via_post(
hamlet,
subscriptions,
extra_post_data,
allow_fail=True,
subdomain="zulip",
)
self.assert_json_error(result, "Invalid channel folder ID")
extra_post_data["folder_id"] = orjson.dumps(channel_folder.id).decode()
result = self.subscribe_via_post(
hamlet,
subscriptions,
extra_post_data,
subdomain="zulip",
)
stream = get_stream("new_stream", realm)
self.assertEqual(stream.folder, channel_folder)
stream = get_stream("new_stream_2", realm)
self.assertEqual(stream.folder, channel_folder)
subscriptions = [
{"name": "new_stream_3", "description": "New stream 3"},
{"name": "new_stream_4", "description": "New stream 4"},
]
extra_post_data = {}
result = self.subscribe_via_post(
hamlet,
subscriptions,
extra_post_data,
subdomain="zulip",
)
stream = get_stream("new_stream_3", realm)
self.assertIsNone(stream.folder)
stream = get_stream("new_stream_4", realm)
self.assertIsNone(stream.folder)
def test_stream_message_retention_days_on_stream_creation(self) -> None:
"""
Only admins can create streams with message_retention_days
with value other than None.
"""
admin = self.example_user("iago")
streams_raw: list[StreamDict] = [
{
"name": "new_stream",
"message_retention_days": 10,
"is_web_public": False,
}
]
with self.assertRaisesRegex(JsonableError, "Must be an organization owner"):
list_to_streams(streams_raw, admin, autocreate=True)
streams_raw = [
{
"name": "new_stream",
"message_retention_days": -1,
"is_web_public": False,
}
]
with self.assertRaisesRegex(JsonableError, "Must be an organization owner"):
list_to_streams(streams_raw, admin, autocreate=True)
streams_raw = [
{
"name": "new_stream",
"message_retention_days": None,
"is_web_public": False,
}
]
result = list_to_streams(streams_raw, admin, autocreate=True)
self.assert_length(result[0], 0)
self.assert_length(result[1], 1)
self.assertEqual(result[1][0].name, "new_stream")
self.assertEqual(result[1][0].message_retention_days, None)
owner = self.example_user("desdemona")
realm = owner.realm
streams_raw = [
{
"name": "new_stream1",
"message_retention_days": 10,
"is_web_public": False,
},
{
"name": "new_stream2",
"message_retention_days": -1,
"is_web_public": False,
},
{
"name": "new_stream3",
"is_web_public": False,
},
]
do_change_realm_plan_type(realm, Realm.PLAN_TYPE_LIMITED, acting_user=admin)
with self.assertRaisesRegex(
JsonableError, "Available on Zulip Cloud Standard. Upgrade to access."
):
list_to_streams(streams_raw, owner, autocreate=True)
do_change_realm_plan_type(realm, Realm.PLAN_TYPE_SELF_HOSTED, acting_user=admin)
result = list_to_streams(streams_raw, owner, autocreate=True)
self.assert_length(result[0], 0)
self.assert_length(result[1], 3)
self.assertEqual(result[1][0].name, "new_stream1")
self.assertEqual(result[1][0].message_retention_days, 10)
self.assertEqual(result[1][1].name, "new_stream2")
self.assertEqual(result[1][1].message_retention_days, -1)
self.assertEqual(result[1][2].name, "new_stream3")
self.assertEqual(result[1][2].message_retention_days, None)