mirror of
https://github.com/zulip/zulip.git
synced 2025-10-29 02:53:52 +00:00
tests: Move channel creation tests to test_channel_creation.
This commit is contained in:
819
zerver/tests/test_channel_creation.py
Normal file
819
zerver/tests/test_channel_creation.py
Normal file
@@ -0,0 +1,819 @@
|
||||
import orjson
|
||||
|
||||
from zerver.actions.channel_folders import check_add_channel_folder
|
||||
from zerver.actions.realm_settings import do_change_realm_plan_type
|
||||
from zerver.actions.user_groups import check_add_user_group
|
||||
from zerver.actions.users import do_change_user_role
|
||||
from zerver.lib.default_streams import get_default_stream_ids_for_realm
|
||||
from zerver.lib.exceptions import JsonableError
|
||||
from zerver.lib.message import UnreadStreamInfo, aggregate_unread_data, get_raw_unread_data
|
||||
from zerver.lib.streams import (
|
||||
StreamDict,
|
||||
create_stream_if_needed,
|
||||
create_streams_if_needed,
|
||||
ensure_stream,
|
||||
list_to_streams,
|
||||
)
|
||||
from zerver.lib.test_classes import ZulipTestCase, get_topic_messages
|
||||
from zerver.lib.test_helpers import reset_email_visibility_to_everyone_in_zulip_realm
|
||||
from zerver.lib.types import UserGroupMembersDict
|
||||
from zerver.models import (
|
||||
Message,
|
||||
NamedUserGroup,
|
||||
Realm,
|
||||
Stream,
|
||||
Subscription,
|
||||
UserMessage,
|
||||
UserProfile,
|
||||
)
|
||||
from zerver.models.groups import SystemGroups
|
||||
from zerver.models.realms import get_realm
|
||||
from zerver.models.streams import get_stream
|
||||
from zerver.models.users import active_non_guest_user_ids
|
||||
|
||||
|
||||
class TestCreateStreams(ZulipTestCase):
|
||||
def test_creating_streams(self) -> None:
|
||||
stream_names = ["new1", "new2", "new3"]
|
||||
stream_descriptions = ["des1", "des2", "des3"]
|
||||
realm = get_realm("zulip")
|
||||
iago = self.example_user("iago")
|
||||
|
||||
# Test stream creation events.
|
||||
with self.capture_send_event_calls(expected_num_events=1) as events:
|
||||
ensure_stream(realm, "Public stream", invite_only=False, acting_user=None)
|
||||
|
||||
self.assertEqual(events[0]["event"]["type"], "stream")
|
||||
self.assertEqual(events[0]["event"]["op"], "create")
|
||||
# Send public stream creation event to all active users.
|
||||
self.assertEqual(events[0]["users"], active_non_guest_user_ids(realm.id))
|
||||
self.assertEqual(events[0]["event"]["streams"][0]["name"], "Public stream")
|
||||
self.assertEqual(events[0]["event"]["streams"][0]["stream_weekly_traffic"], None)
|
||||
|
||||
aaron_group = check_add_user_group(
|
||||
realm, "aaron_group", [self.example_user("aaron")], acting_user=iago
|
||||
)
|
||||
prospero_group = check_add_user_group(
|
||||
realm, "prospero_group", [self.example_user("prospero")], acting_user=iago
|
||||
)
|
||||
cordelia_group = check_add_user_group(
|
||||
realm, "cordelia_group", [self.example_user("cordelia")], acting_user=iago
|
||||
)
|
||||
with self.capture_send_event_calls(expected_num_events=1) as events:
|
||||
create_stream_if_needed(
|
||||
realm,
|
||||
"Private stream",
|
||||
invite_only=True,
|
||||
can_administer_channel_group=aaron_group,
|
||||
can_add_subscribers_group=prospero_group,
|
||||
can_subscribe_group=cordelia_group,
|
||||
)
|
||||
|
||||
self.assertEqual(events[0]["event"]["type"], "stream")
|
||||
self.assertEqual(events[0]["event"]["op"], "create")
|
||||
# Send private stream creation event to only realm admins.
|
||||
self.assert_length(events[0]["users"], 5)
|
||||
self.assertCountEqual(
|
||||
[
|
||||
iago.id,
|
||||
self.example_user("desdemona").id,
|
||||
self.example_user("aaron").id,
|
||||
self.example_user("prospero").id,
|
||||
self.example_user("cordelia").id,
|
||||
],
|
||||
events[0]["users"],
|
||||
)
|
||||
self.assertEqual(events[0]["event"]["streams"][0]["name"], "Private stream")
|
||||
self.assertEqual(events[0]["event"]["streams"][0]["stream_weekly_traffic"], None)
|
||||
|
||||
moderators_system_group = NamedUserGroup.objects.get(
|
||||
name="role:moderators", realm=realm, is_system_group=True
|
||||
)
|
||||
new_streams, existing_streams = create_streams_if_needed(
|
||||
realm,
|
||||
[
|
||||
{
|
||||
"name": stream_name,
|
||||
"description": stream_description,
|
||||
"invite_only": True,
|
||||
"message_retention_days": -1,
|
||||
"can_remove_subscribers_group": moderators_system_group,
|
||||
}
|
||||
for (stream_name, stream_description) in zip(
|
||||
stream_names, stream_descriptions, strict=False
|
||||
)
|
||||
],
|
||||
)
|
||||
|
||||
self.assert_length(new_streams, 3)
|
||||
self.assert_length(existing_streams, 0)
|
||||
|
||||
actual_stream_names = {stream.name for stream in new_streams}
|
||||
self.assertEqual(actual_stream_names, set(stream_names))
|
||||
actual_stream_descriptions = {stream.description for stream in new_streams}
|
||||
self.assertEqual(actual_stream_descriptions, set(stream_descriptions))
|
||||
for stream in new_streams:
|
||||
self.assertTrue(stream.invite_only)
|
||||
self.assertTrue(stream.message_retention_days == -1)
|
||||
self.assertEqual(stream.can_remove_subscribers_group.id, moderators_system_group.id)
|
||||
# Streams created where acting_user is None have no creator
|
||||
self.assertIsNone(stream.creator_id)
|
||||
|
||||
new_streams, existing_streams = create_streams_if_needed(
|
||||
realm,
|
||||
[
|
||||
{"name": stream_name, "description": stream_description, "invite_only": True}
|
||||
for (stream_name, stream_description) in zip(
|
||||
stream_names, stream_descriptions, strict=False
|
||||
)
|
||||
],
|
||||
)
|
||||
|
||||
self.assert_length(new_streams, 0)
|
||||
self.assert_length(existing_streams, 3)
|
||||
|
||||
actual_stream_names = {stream.name for stream in existing_streams}
|
||||
self.assertEqual(actual_stream_names, set(stream_names))
|
||||
actual_stream_descriptions = {stream.description for stream in existing_streams}
|
||||
self.assertEqual(actual_stream_descriptions, set(stream_descriptions))
|
||||
for stream in existing_streams:
|
||||
self.assertTrue(stream.invite_only)
|
||||
|
||||
def test_create_api_multiline_description(self) -> None:
|
||||
user = self.example_user("hamlet")
|
||||
realm = user.realm
|
||||
self.login_user(user)
|
||||
subscriptions = [{"name": "new_stream", "description": "multi\nline\ndescription"}]
|
||||
result = self.subscribe_via_post(user, subscriptions, subdomain="zulip")
|
||||
self.assert_json_success(result)
|
||||
stream = get_stream("new_stream", realm)
|
||||
self.assertEqual(stream.description, "multi line description")
|
||||
|
||||
def test_create_api_topic_permalink_description(self) -> None:
|
||||
user = self.example_user("iago")
|
||||
realm = user.realm
|
||||
self.login_user(user)
|
||||
|
||||
hamlet = self.example_user("hamlet")
|
||||
core_stream = self.make_stream("core", realm, True, history_public_to_subscribers=True)
|
||||
self.subscribe(hamlet, "core")
|
||||
msg_id = self.send_stream_message(hamlet, "core", topic_name="testing")
|
||||
|
||||
# Test permalink not generated for description since user has no access to
|
||||
# the channel.
|
||||
subscriptions = [{"name": "stream1", "description": "#**core>testing**"}]
|
||||
result = self.subscribe_via_post(user, subscriptions, subdomain="zulip")
|
||||
self.assert_json_success(result)
|
||||
stream = get_stream("stream1", realm)
|
||||
|
||||
self.assertEqual(stream.rendered_description, "<p>#<strong>core>testing</strong></p>")
|
||||
|
||||
self.subscribe(user, "core")
|
||||
|
||||
# Test permalink generated for the description since user now has access
|
||||
# to the channel.
|
||||
subscriptions = [{"name": "stream2", "description": "#**core>testing**"}]
|
||||
result = self.subscribe_via_post(user, subscriptions, subdomain="zulip")
|
||||
self.assert_json_success(result)
|
||||
stream = get_stream("stream2", realm)
|
||||
|
||||
self.assertEqual(
|
||||
stream.rendered_description,
|
||||
f'<p><a class="stream-topic" data-stream-id="{core_stream.id}" href="/#narrow/channel/{core_stream.id}-core/topic/testing/with/{msg_id}">#{core_stream.name} > testing</a></p>',
|
||||
)
|
||||
|
||||
def test_history_public_to_subscribers_on_stream_creation(self) -> None:
|
||||
realm = get_realm("zulip")
|
||||
stream_dicts: list[StreamDict] = [
|
||||
{
|
||||
"name": "publicstream",
|
||||
"description": "Public stream with public history",
|
||||
},
|
||||
{"name": "webpublicstream", "description": "Web-public stream", "is_web_public": True},
|
||||
{
|
||||
"name": "privatestream",
|
||||
"description": "Private stream with non-public history",
|
||||
"invite_only": True,
|
||||
},
|
||||
{
|
||||
"name": "privatewithhistory",
|
||||
"description": "Private stream with public history",
|
||||
"invite_only": True,
|
||||
"history_public_to_subscribers": True,
|
||||
},
|
||||
{
|
||||
"name": "publictrywithouthistory",
|
||||
"description": "Public stream without public history (disallowed)",
|
||||
"invite_only": False,
|
||||
"history_public_to_subscribers": False,
|
||||
},
|
||||
]
|
||||
|
||||
created, existing = create_streams_if_needed(realm, stream_dicts)
|
||||
|
||||
self.assert_length(created, 5)
|
||||
self.assert_length(existing, 0)
|
||||
for stream in created:
|
||||
if stream.name == "publicstream":
|
||||
self.assertTrue(stream.history_public_to_subscribers)
|
||||
if stream.name == "webpublicstream":
|
||||
self.assertTrue(stream.history_public_to_subscribers)
|
||||
if stream.name == "privatestream":
|
||||
self.assertFalse(stream.history_public_to_subscribers)
|
||||
if stream.name == "privatewithhistory":
|
||||
self.assertTrue(stream.history_public_to_subscribers)
|
||||
if stream.name == "publictrywithouthistory":
|
||||
self.assertTrue(stream.history_public_to_subscribers)
|
||||
|
||||
def test_add_stream_as_default_on_stream_creation(self) -> None:
|
||||
user_profile = self.example_user("hamlet")
|
||||
self.login_user(user_profile)
|
||||
realm = user_profile.realm
|
||||
|
||||
subscriptions = [
|
||||
{"name": "default_stream", "description": "This stream is default for new users"}
|
||||
]
|
||||
result = self.subscribe_via_post(
|
||||
user_profile,
|
||||
subscriptions,
|
||||
{"is_default_stream": "true"},
|
||||
allow_fail=True,
|
||||
subdomain="zulip",
|
||||
)
|
||||
self.assert_json_error(result, "Insufficient permission")
|
||||
|
||||
do_change_user_role(user_profile, UserProfile.ROLE_REALM_ADMINISTRATOR, acting_user=None)
|
||||
result = self.subscribe_via_post(
|
||||
user_profile, subscriptions, {"is_default_stream": "true"}, subdomain="zulip"
|
||||
)
|
||||
self.assert_json_success(result)
|
||||
default_stream = get_stream("default_stream", realm)
|
||||
self.assertTrue(default_stream.id in get_default_stream_ids_for_realm(realm.id))
|
||||
|
||||
subscriptions = [
|
||||
{
|
||||
"name": "private_default_stream",
|
||||
"description": "This stream is private and default for new users",
|
||||
}
|
||||
]
|
||||
result = self.subscribe_via_post(
|
||||
user_profile,
|
||||
subscriptions,
|
||||
{"is_default_stream": "true"},
|
||||
invite_only=True,
|
||||
allow_fail=True,
|
||||
subdomain="zulip",
|
||||
)
|
||||
self.assert_json_error(result, "A default channel cannot be private.")
|
||||
|
||||
def test_history_public_to_subscribers_zephyr_realm(self) -> None:
|
||||
realm = get_realm("zephyr")
|
||||
|
||||
stream, created = create_stream_if_needed(realm, "private_stream", invite_only=True)
|
||||
self.assertTrue(created)
|
||||
self.assertTrue(stream.invite_only)
|
||||
self.assertFalse(stream.history_public_to_subscribers)
|
||||
|
||||
stream, created = create_stream_if_needed(realm, "public_stream", invite_only=False)
|
||||
self.assertTrue(created)
|
||||
self.assertFalse(stream.invite_only)
|
||||
self.assertFalse(stream.history_public_to_subscribers)
|
||||
|
||||
def test_auto_mark_stream_created_message_as_read_for_stream_creator(self) -> None:
|
||||
# This test relies on email == delivery_email for
|
||||
# convenience.
|
||||
reset_email_visibility_to_everyone_in_zulip_realm()
|
||||
|
||||
realm = Realm.objects.get(name="Zulip Dev")
|
||||
iago = self.example_user("iago")
|
||||
hamlet = self.example_user("hamlet")
|
||||
cordelia = self.example_user("cordelia")
|
||||
aaron = self.example_user("aaron")
|
||||
|
||||
# Establish a stream for notifications.
|
||||
announce_stream = ensure_stream(
|
||||
realm, "announce", False, "announcements here.", acting_user=None
|
||||
)
|
||||
realm.new_stream_announcements_stream_id = announce_stream.id
|
||||
realm.save(update_fields=["new_stream_announcements_stream_id"])
|
||||
|
||||
self.subscribe(iago, announce_stream.name)
|
||||
self.subscribe(hamlet, announce_stream.name)
|
||||
|
||||
self.login_user(iago)
|
||||
|
||||
initial_message_count = Message.objects.count()
|
||||
initial_usermessage_count = UserMessage.objects.count()
|
||||
|
||||
data = {
|
||||
"subscriptions": '[{"name":"brand new stream","description":""}]',
|
||||
"history_public_to_subscribers": "true",
|
||||
"invite_only": "false",
|
||||
"announce": "true",
|
||||
"principals": orjson.dumps([iago.id, aaron.id, cordelia.id, hamlet.id]).decode(),
|
||||
}
|
||||
|
||||
response = self.client_post("/json/users/me/subscriptions", data)
|
||||
|
||||
final_message_count = Message.objects.count()
|
||||
final_usermessage_count = UserMessage.objects.count()
|
||||
|
||||
expected_response = {
|
||||
"result": "success",
|
||||
"msg": "",
|
||||
"subscribed": {
|
||||
"10": ["brand new stream"],
|
||||
"11": ["brand new stream"],
|
||||
"6": ["brand new stream"],
|
||||
"8": ["brand new stream"],
|
||||
},
|
||||
"already_subscribed": {},
|
||||
}
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertEqual(orjson.loads(response.content), expected_response)
|
||||
|
||||
# 2 messages should be created, one in announce and one in the new stream itself.
|
||||
self.assertEqual(final_message_count - initial_message_count, 2)
|
||||
# 4 UserMessages per subscriber: One for each of the subscribers, plus 1 for
|
||||
# each user in the notifications stream.
|
||||
announce_stream_subs = Subscription.objects.filter(recipient=announce_stream.recipient)
|
||||
self.assertEqual(
|
||||
final_usermessage_count - initial_usermessage_count, 4 + announce_stream_subs.count()
|
||||
)
|
||||
|
||||
def get_unread_stream_data(user: UserProfile) -> list[UnreadStreamInfo]:
|
||||
raw_unread_data = get_raw_unread_data(user)
|
||||
aggregated_data = aggregate_unread_data(raw_unread_data, allow_empty_topic_name=True)
|
||||
return aggregated_data["streams"]
|
||||
|
||||
stream_id = Stream.objects.get(name="brand new stream").id
|
||||
iago_unread_messages = get_unread_stream_data(iago)
|
||||
hamlet_unread_messages = get_unread_stream_data(hamlet)
|
||||
|
||||
# The stream creation messages should be unread for Hamlet
|
||||
self.assert_length(hamlet_unread_messages, 2)
|
||||
|
||||
# According to the code in zerver/views/streams/add_subscriptions_backend
|
||||
# the notification stream message is sent first, then the new stream's message.
|
||||
self.assertEqual(hamlet_unread_messages[1]["stream_id"], stream_id)
|
||||
|
||||
# But it should be marked as read for Iago, the stream creator.
|
||||
self.assert_length(iago_unread_messages, 0)
|
||||
|
||||
def test_can_administer_channel_group_default_on_stream_creation(self) -> None:
|
||||
user = self.example_user("hamlet")
|
||||
realm = user.realm
|
||||
self.login_user(user)
|
||||
nobody_system_group = NamedUserGroup.objects.get(
|
||||
name="role:nobody", realm=realm, is_system_group=True
|
||||
)
|
||||
|
||||
stream, created = create_stream_if_needed(
|
||||
realm, "new stream without acting user", invite_only=True
|
||||
)
|
||||
self.assertEqual(stream.can_administer_channel_group.id, nobody_system_group.id)
|
||||
|
||||
stream, created = create_stream_if_needed(
|
||||
realm, "new stream with acting user", acting_user=user
|
||||
)
|
||||
self.assertCountEqual(stream.can_administer_channel_group.direct_members.all(), [user])
|
||||
|
||||
def do_test_permission_setting_on_stream_creation(self, setting_name: str) -> None:
|
||||
user = self.example_user("hamlet")
|
||||
realm = user.realm
|
||||
self.login_user(user)
|
||||
moderators_system_group = NamedUserGroup.objects.get(
|
||||
name="role:moderators", realm=realm, is_system_group=True
|
||||
)
|
||||
|
||||
permission_config = Stream.stream_permission_group_settings[setting_name]
|
||||
|
||||
subscriptions = [{"name": "new_stream", "description": "New stream"}]
|
||||
extra_post_data = {}
|
||||
extra_post_data[setting_name] = orjson.dumps(moderators_system_group.id).decode()
|
||||
result = self.subscribe_via_post(
|
||||
user,
|
||||
subscriptions,
|
||||
extra_post_data,
|
||||
subdomain="zulip",
|
||||
)
|
||||
self.assert_json_success(result)
|
||||
stream = get_stream("new_stream", realm)
|
||||
self.assertEqual(getattr(stream, setting_name).id, moderators_system_group.id)
|
||||
# Delete the created stream, so we can create a new one for
|
||||
# testing another setting value.
|
||||
stream.delete()
|
||||
|
||||
subscriptions = [{"name": "new_stream", "description": "New stream"}]
|
||||
result = self.subscribe_via_post(user, subscriptions, subdomain="zulip")
|
||||
self.assert_json_success(result)
|
||||
stream = get_stream("new_stream", realm)
|
||||
if permission_config.default_group_name == "stream_creator_or_nobody":
|
||||
self.assertEqual(list(getattr(stream, setting_name).direct_members.all()), [user])
|
||||
self.assertEqual(
|
||||
list(getattr(stream, setting_name).direct_subgroups.all()),
|
||||
[],
|
||||
)
|
||||
else:
|
||||
default_group = NamedUserGroup.objects.get(
|
||||
name=permission_config.default_group_name, realm=realm, is_system_group=True
|
||||
)
|
||||
self.assertEqual(getattr(stream, setting_name).id, default_group.id)
|
||||
# Delete the created stream, so we can create a new one for
|
||||
# testing another setting value.
|
||||
stream.delete()
|
||||
|
||||
hamletcharacters_group = NamedUserGroup.objects.get(name="hamletcharacters", realm=realm)
|
||||
subscriptions = [{"name": "new_stream", "description": "New stream"}]
|
||||
extra_post_data[setting_name] = orjson.dumps(hamletcharacters_group.id).decode()
|
||||
result = self.subscribe_via_post(
|
||||
user,
|
||||
subscriptions,
|
||||
extra_post_data,
|
||||
allow_fail=True,
|
||||
subdomain="zulip",
|
||||
)
|
||||
self.assert_json_success(result)
|
||||
stream = get_stream("new_stream", realm)
|
||||
self.assertEqual(getattr(stream, setting_name).id, hamletcharacters_group.id)
|
||||
# Delete the created stream, so we can create a new one for
|
||||
# testing another setting value.
|
||||
stream.delete()
|
||||
|
||||
subscriptions = [{"name": "new_stream", "description": "New stream"}]
|
||||
extra_post_data[setting_name] = orjson.dumps(
|
||||
{"direct_members": [user.id], "direct_subgroups": [moderators_system_group.id]}
|
||||
).decode()
|
||||
result = self.subscribe_via_post(
|
||||
user,
|
||||
subscriptions,
|
||||
extra_post_data,
|
||||
allow_fail=True,
|
||||
subdomain="zulip",
|
||||
)
|
||||
self.assert_json_success(result)
|
||||
stream = get_stream("new_stream", realm)
|
||||
self.assertEqual(list(getattr(stream, setting_name).direct_members.all()), [user])
|
||||
self.assertEqual(
|
||||
list(getattr(stream, setting_name).direct_subgroups.all()),
|
||||
[moderators_system_group],
|
||||
)
|
||||
# Delete the created stream, so we can create a new one for
|
||||
# testing another setting value.
|
||||
stream.delete()
|
||||
|
||||
nobody_group = NamedUserGroup.objects.get(
|
||||
name="role:nobody", is_system_group=True, realm=realm
|
||||
)
|
||||
|
||||
subscriptions = [{"name": "new_stream", "description": "New stream"}]
|
||||
extra_post_data[setting_name] = orjson.dumps(
|
||||
{"direct_members": [], "direct_subgroups": []}
|
||||
).decode()
|
||||
result = self.subscribe_via_post(
|
||||
user,
|
||||
subscriptions,
|
||||
extra_post_data,
|
||||
allow_fail=True,
|
||||
subdomain="zulip",
|
||||
)
|
||||
self.assert_json_success(result)
|
||||
stream = get_stream("new_stream", realm)
|
||||
self.assertEqual(getattr(stream, setting_name).id, nobody_group.id)
|
||||
# Delete the created stream, so we can create a new one for
|
||||
# testing another setting value.
|
||||
stream.delete()
|
||||
|
||||
subscriptions = [{"name": "new_stream", "description": "New stream"}]
|
||||
owners_group = NamedUserGroup.objects.get(
|
||||
name="role:owners", is_system_group=True, realm=realm
|
||||
)
|
||||
extra_post_data[setting_name] = orjson.dumps(owners_group.id).decode()
|
||||
result = self.subscribe_via_post(
|
||||
user,
|
||||
subscriptions,
|
||||
extra_post_data,
|
||||
allow_fail=True,
|
||||
subdomain="zulip",
|
||||
)
|
||||
self.assert_json_success(result)
|
||||
stream = get_stream("new_stream", realm)
|
||||
self.assertEqual(getattr(stream, setting_name).id, owners_group.id)
|
||||
# Delete the created stream, so we can create a new one for
|
||||
# testing another setting value.
|
||||
stream.delete()
|
||||
|
||||
subscriptions = [{"name": "new_stream", "description": "New stream"}]
|
||||
extra_post_data[setting_name] = orjson.dumps(nobody_group.id).decode()
|
||||
result = self.subscribe_via_post(
|
||||
user,
|
||||
subscriptions,
|
||||
extra_post_data,
|
||||
allow_fail=True,
|
||||
subdomain="zulip",
|
||||
)
|
||||
self.assert_json_success(result)
|
||||
stream = get_stream("new_stream", realm)
|
||||
self.assertEqual(getattr(stream, setting_name).id, nobody_group.id)
|
||||
# Delete the created stream, so we can create a new one for
|
||||
# testing another setting value.
|
||||
stream.delete()
|
||||
|
||||
subscriptions = [{"name": "new_stream", "description": "New stream"}]
|
||||
everyone_group = NamedUserGroup.objects.get(
|
||||
name="role:everyone", is_system_group=True, realm=realm
|
||||
)
|
||||
extra_post_data[setting_name] = orjson.dumps(everyone_group.id).decode()
|
||||
result = self.subscribe_via_post(
|
||||
user,
|
||||
subscriptions,
|
||||
extra_post_data,
|
||||
allow_fail=True,
|
||||
subdomain="zulip",
|
||||
)
|
||||
if permission_config.allow_everyone_group:
|
||||
self.assert_json_success(result)
|
||||
stream = get_stream("new_stream", realm)
|
||||
self.assertEqual(getattr(stream, setting_name).id, everyone_group.id)
|
||||
# Delete the created stream, so we can create a new one for
|
||||
# testing another setting value.
|
||||
stream.delete()
|
||||
else:
|
||||
self.assert_json_error(
|
||||
result,
|
||||
f"'{setting_name}' setting cannot be set to 'role:everyone' group.",
|
||||
)
|
||||
|
||||
subscriptions = [{"name": "new_stream", "description": "New stream"}]
|
||||
internet_group = NamedUserGroup.objects.get(
|
||||
name="role:internet", is_system_group=True, realm=realm
|
||||
)
|
||||
extra_post_data[setting_name] = orjson.dumps(internet_group.id).decode()
|
||||
result = self.subscribe_via_post(
|
||||
user,
|
||||
subscriptions,
|
||||
extra_post_data,
|
||||
allow_fail=True,
|
||||
subdomain="zulip",
|
||||
)
|
||||
self.assert_json_error(
|
||||
result,
|
||||
f"'{setting_name}' setting cannot be set to 'role:internet' group.",
|
||||
)
|
||||
|
||||
def test_permission_settings_on_stream_creation(self) -> None:
|
||||
for setting_name in Stream.stream_permission_group_settings:
|
||||
self.do_test_permission_setting_on_stream_creation(setting_name)
|
||||
|
||||
def test_default_permission_settings_on_stream_creation(self) -> None:
|
||||
hamlet = self.example_user("hamlet")
|
||||
realm = hamlet.realm
|
||||
subscriptions = [{"name": "new_stream", "description": "New stream"}]
|
||||
|
||||
self.login("hamlet")
|
||||
with self.capture_send_event_calls(expected_num_events=4) as events:
|
||||
result = self.subscribe_via_post(
|
||||
hamlet,
|
||||
subscriptions,
|
||||
)
|
||||
self.assert_json_success(result)
|
||||
|
||||
nobody_group = NamedUserGroup.objects.get(
|
||||
name=SystemGroups.NOBODY, realm=realm, is_system_group=True
|
||||
)
|
||||
admins_group = NamedUserGroup.objects.get(
|
||||
name=SystemGroups.ADMINISTRATORS, realm=realm, is_system_group=True
|
||||
)
|
||||
everyone_group = NamedUserGroup.objects.get(
|
||||
name=SystemGroups.EVERYONE, realm=realm, is_system_group=True
|
||||
)
|
||||
|
||||
stream = get_stream("new_stream", realm)
|
||||
self.assertEqual(
|
||||
list(
|
||||
stream.can_administer_channel_group.direct_members.all().values_list(
|
||||
"id", flat=True
|
||||
)
|
||||
),
|
||||
[hamlet.id],
|
||||
)
|
||||
self.assertEqual(
|
||||
list(
|
||||
stream.can_administer_channel_group.direct_subgroups.all().values_list(
|
||||
"id", flat=True
|
||||
)
|
||||
),
|
||||
[],
|
||||
)
|
||||
|
||||
self.assertEqual(stream.can_add_subscribers_group_id, nobody_group.id)
|
||||
self.assertEqual(stream.can_remove_subscribers_group_id, admins_group.id)
|
||||
self.assertEqual(stream.can_send_message_group_id, everyone_group.id)
|
||||
self.assertEqual(stream.can_subscribe_group_id, nobody_group.id)
|
||||
|
||||
# Check setting values sent in stream creation events.
|
||||
event_stream = events[0]["event"]["streams"][0]
|
||||
self.assertEqual(
|
||||
event_stream["can_administer_channel_group"],
|
||||
UserGroupMembersDict(direct_members=[hamlet.id], direct_subgroups=[]),
|
||||
)
|
||||
|
||||
self.assertEqual(event_stream["can_add_subscribers_group"], nobody_group.id)
|
||||
self.assertEqual(event_stream["can_remove_subscribers_group"], admins_group.id)
|
||||
self.assertEqual(event_stream["can_send_message_group"], everyone_group.id)
|
||||
self.assertEqual(event_stream["can_subscribe_group"], nobody_group.id)
|
||||
|
||||
def test_acting_user_is_creator(self) -> None:
|
||||
"""
|
||||
If backend calls provide an acting_user while trying to
|
||||
create streams, assign acting_user as the stream creator
|
||||
"""
|
||||
hamlet = self.example_user("hamlet")
|
||||
new_streams, _ = create_streams_if_needed(
|
||||
hamlet.realm,
|
||||
[
|
||||
StreamDict(
|
||||
name="hamlet's test stream",
|
||||
description="No description",
|
||||
invite_only=True,
|
||||
is_web_public=True,
|
||||
)
|
||||
],
|
||||
acting_user=hamlet,
|
||||
)
|
||||
created_stream = new_streams[0]
|
||||
self.assertEqual(created_stream.creator_id, hamlet.id)
|
||||
|
||||
def test_channel_create_message_exists_for_all_policy_types(self) -> None:
|
||||
"""
|
||||
Create a channel for each policy type to ensure they all have a "new channel" message.
|
||||
"""
|
||||
# this is to check if the appropriate channel name is present in the "new channel" message
|
||||
policy_key_map: dict[str, str] = {
|
||||
"web_public": "**Web-public**",
|
||||
"public": "**Public**",
|
||||
"private_shared_history": "**Private, shared history**",
|
||||
"private_protected_history": "**Private, protected history**",
|
||||
}
|
||||
for policy_key, policy_dict in Stream.PERMISSION_POLICIES.items():
|
||||
channel_creator = self.example_user("desdemona")
|
||||
subdomain = "zulip"
|
||||
|
||||
if policy_key == "public_protected_history":
|
||||
# This is a special channel policy only available in Zephyr realms.
|
||||
channel_creator = self.mit_user("starnine")
|
||||
subdomain = "zephyr"
|
||||
|
||||
self.login_user(channel_creator)
|
||||
new_channel_name = f"New {policy_key} channel"
|
||||
result = self.api_post(
|
||||
channel_creator,
|
||||
"/json/users/me/subscriptions",
|
||||
{
|
||||
"subscriptions": orjson.dumps([{"name": new_channel_name}]).decode(),
|
||||
"is_web_public": orjson.dumps(policy_dict["is_web_public"]).decode(),
|
||||
"invite_only": orjson.dumps(policy_dict["invite_only"]).decode(),
|
||||
"history_public_to_subscribers": orjson.dumps(
|
||||
policy_dict["history_public_to_subscribers"]
|
||||
).decode(),
|
||||
},
|
||||
subdomain=subdomain,
|
||||
)
|
||||
self.assert_json_success(result)
|
||||
new_channel = get_stream(new_channel_name, channel_creator.realm)
|
||||
channel_events_messages = get_topic_messages(
|
||||
channel_creator, new_channel, "channel events"
|
||||
)
|
||||
if policy_key == "public_protected_history":
|
||||
# These do not get channel creation notification.
|
||||
self.assert_length(channel_events_messages, 0)
|
||||
continue
|
||||
|
||||
self.assert_length(channel_events_messages, 1)
|
||||
self.assertIn(policy_key_map[policy_key], channel_events_messages[0].content)
|
||||
|
||||
def test_adding_channels_to_folder_during_creation(self) -> None:
|
||||
realm = get_realm("zulip")
|
||||
iago = self.example_user("iago")
|
||||
hamlet = self.example_user("hamlet")
|
||||
channel_folder = check_add_channel_folder("Backend", "", acting_user=iago)
|
||||
|
||||
subscriptions = [
|
||||
{"name": "new_stream", "description": "New stream"},
|
||||
{"name": "new_stream_2", "description": "New stream 2"},
|
||||
]
|
||||
extra_post_data = {}
|
||||
|
||||
extra_post_data["folder_id"] = orjson.dumps(99).decode()
|
||||
result = self.subscribe_via_post(
|
||||
hamlet,
|
||||
subscriptions,
|
||||
extra_post_data,
|
||||
allow_fail=True,
|
||||
subdomain="zulip",
|
||||
)
|
||||
self.assert_json_error(result, "Invalid channel folder ID")
|
||||
|
||||
extra_post_data["folder_id"] = orjson.dumps(channel_folder.id).decode()
|
||||
result = self.subscribe_via_post(
|
||||
hamlet,
|
||||
subscriptions,
|
||||
extra_post_data,
|
||||
subdomain="zulip",
|
||||
)
|
||||
stream = get_stream("new_stream", realm)
|
||||
self.assertEqual(stream.folder, channel_folder)
|
||||
stream = get_stream("new_stream_2", realm)
|
||||
self.assertEqual(stream.folder, channel_folder)
|
||||
|
||||
subscriptions = [
|
||||
{"name": "new_stream_3", "description": "New stream 3"},
|
||||
{"name": "new_stream_4", "description": "New stream 4"},
|
||||
]
|
||||
extra_post_data = {}
|
||||
result = self.subscribe_via_post(
|
||||
hamlet,
|
||||
subscriptions,
|
||||
extra_post_data,
|
||||
subdomain="zulip",
|
||||
)
|
||||
stream = get_stream("new_stream_3", realm)
|
||||
self.assertIsNone(stream.folder)
|
||||
stream = get_stream("new_stream_4", realm)
|
||||
self.assertIsNone(stream.folder)
|
||||
|
||||
def test_stream_message_retention_days_on_stream_creation(self) -> None:
|
||||
"""
|
||||
Only admins can create streams with message_retention_days
|
||||
with value other than None.
|
||||
"""
|
||||
admin = self.example_user("iago")
|
||||
|
||||
streams_raw: list[StreamDict] = [
|
||||
{
|
||||
"name": "new_stream",
|
||||
"message_retention_days": 10,
|
||||
"is_web_public": False,
|
||||
}
|
||||
]
|
||||
with self.assertRaisesRegex(JsonableError, "Must be an organization owner"):
|
||||
list_to_streams(streams_raw, admin, autocreate=True)
|
||||
|
||||
streams_raw = [
|
||||
{
|
||||
"name": "new_stream",
|
||||
"message_retention_days": -1,
|
||||
"is_web_public": False,
|
||||
}
|
||||
]
|
||||
with self.assertRaisesRegex(JsonableError, "Must be an organization owner"):
|
||||
list_to_streams(streams_raw, admin, autocreate=True)
|
||||
|
||||
streams_raw = [
|
||||
{
|
||||
"name": "new_stream",
|
||||
"message_retention_days": None,
|
||||
"is_web_public": False,
|
||||
}
|
||||
]
|
||||
result = list_to_streams(streams_raw, admin, autocreate=True)
|
||||
self.assert_length(result[0], 0)
|
||||
self.assert_length(result[1], 1)
|
||||
self.assertEqual(result[1][0].name, "new_stream")
|
||||
self.assertEqual(result[1][0].message_retention_days, None)
|
||||
|
||||
owner = self.example_user("desdemona")
|
||||
realm = owner.realm
|
||||
streams_raw = [
|
||||
{
|
||||
"name": "new_stream1",
|
||||
"message_retention_days": 10,
|
||||
"is_web_public": False,
|
||||
},
|
||||
{
|
||||
"name": "new_stream2",
|
||||
"message_retention_days": -1,
|
||||
"is_web_public": False,
|
||||
},
|
||||
{
|
||||
"name": "new_stream3",
|
||||
"is_web_public": False,
|
||||
},
|
||||
]
|
||||
|
||||
do_change_realm_plan_type(realm, Realm.PLAN_TYPE_LIMITED, acting_user=admin)
|
||||
with self.assertRaisesRegex(
|
||||
JsonableError, "Available on Zulip Cloud Standard. Upgrade to access."
|
||||
):
|
||||
list_to_streams(streams_raw, owner, autocreate=True)
|
||||
|
||||
do_change_realm_plan_type(realm, Realm.PLAN_TYPE_SELF_HOSTED, acting_user=admin)
|
||||
result = list_to_streams(streams_raw, owner, autocreate=True)
|
||||
self.assert_length(result[0], 0)
|
||||
self.assert_length(result[1], 3)
|
||||
self.assertEqual(result[1][0].name, "new_stream1")
|
||||
self.assertEqual(result[1][0].message_retention_days, 10)
|
||||
self.assertEqual(result[1][1].name, "new_stream2")
|
||||
self.assertEqual(result[1][1].message_retention_days, -1)
|
||||
self.assertEqual(result[1][2].name, "new_stream3")
|
||||
self.assertEqual(result[1][2].message_retention_days, None)
|
||||
@@ -78,8 +78,6 @@ from zerver.lib.streams import (
|
||||
bulk_can_access_stream_metadata_user_ids,
|
||||
can_access_stream_history,
|
||||
can_access_stream_metadata_user_ids,
|
||||
create_stream_if_needed,
|
||||
create_streams_if_needed,
|
||||
do_get_streams,
|
||||
ensure_stream,
|
||||
filter_stream_authorization_for_adding_subscribers,
|
||||
@@ -99,7 +97,6 @@ from zerver.lib.test_helpers import (
|
||||
get_subscription,
|
||||
most_recent_message,
|
||||
queries_captured,
|
||||
reset_email_visibility_to_everyone_in_zulip_realm,
|
||||
)
|
||||
from zerver.lib.types import (
|
||||
APIStreamDict,
|
||||
@@ -114,7 +111,6 @@ from zerver.models import (
|
||||
Attachment,
|
||||
DefaultStream,
|
||||
DefaultStreamGroup,
|
||||
Message,
|
||||
NamedUserGroup,
|
||||
Realm,
|
||||
RealmAuditLog,
|
||||
@@ -293,717 +289,6 @@ class TestMiscStuff(ZulipTestCase):
|
||||
)
|
||||
|
||||
|
||||
class TestCreateStreams(ZulipTestCase):
|
||||
def test_creating_streams(self) -> None:
|
||||
stream_names = ["new1", "new2", "new3"]
|
||||
stream_descriptions = ["des1", "des2", "des3"]
|
||||
realm = get_realm("zulip")
|
||||
iago = self.example_user("iago")
|
||||
|
||||
# Test stream creation events.
|
||||
with self.capture_send_event_calls(expected_num_events=1) as events:
|
||||
ensure_stream(realm, "Public stream", invite_only=False, acting_user=None)
|
||||
|
||||
self.assertEqual(events[0]["event"]["type"], "stream")
|
||||
self.assertEqual(events[0]["event"]["op"], "create")
|
||||
# Send public stream creation event to all active users.
|
||||
self.assertEqual(events[0]["users"], active_non_guest_user_ids(realm.id))
|
||||
self.assertEqual(events[0]["event"]["streams"][0]["name"], "Public stream")
|
||||
self.assertEqual(events[0]["event"]["streams"][0]["stream_weekly_traffic"], None)
|
||||
|
||||
aaron_group = check_add_user_group(
|
||||
realm, "aaron_group", [self.example_user("aaron")], acting_user=iago
|
||||
)
|
||||
prospero_group = check_add_user_group(
|
||||
realm, "prospero_group", [self.example_user("prospero")], acting_user=iago
|
||||
)
|
||||
cordelia_group = check_add_user_group(
|
||||
realm, "cordelia_group", [self.example_user("cordelia")], acting_user=iago
|
||||
)
|
||||
with self.capture_send_event_calls(expected_num_events=1) as events:
|
||||
create_stream_if_needed(
|
||||
realm,
|
||||
"Private stream",
|
||||
invite_only=True,
|
||||
can_administer_channel_group=aaron_group,
|
||||
can_add_subscribers_group=prospero_group,
|
||||
can_subscribe_group=cordelia_group,
|
||||
)
|
||||
|
||||
self.assertEqual(events[0]["event"]["type"], "stream")
|
||||
self.assertEqual(events[0]["event"]["op"], "create")
|
||||
# Send private stream creation event to only realm admins.
|
||||
self.assert_length(events[0]["users"], 5)
|
||||
self.assertCountEqual(
|
||||
[
|
||||
iago.id,
|
||||
self.example_user("desdemona").id,
|
||||
self.example_user("aaron").id,
|
||||
self.example_user("prospero").id,
|
||||
self.example_user("cordelia").id,
|
||||
],
|
||||
events[0]["users"],
|
||||
)
|
||||
self.assertEqual(events[0]["event"]["streams"][0]["name"], "Private stream")
|
||||
self.assertEqual(events[0]["event"]["streams"][0]["stream_weekly_traffic"], None)
|
||||
|
||||
moderators_system_group = NamedUserGroup.objects.get(
|
||||
name="role:moderators", realm=realm, is_system_group=True
|
||||
)
|
||||
new_streams, existing_streams = create_streams_if_needed(
|
||||
realm,
|
||||
[
|
||||
{
|
||||
"name": stream_name,
|
||||
"description": stream_description,
|
||||
"invite_only": True,
|
||||
"message_retention_days": -1,
|
||||
"can_remove_subscribers_group": moderators_system_group,
|
||||
}
|
||||
for (stream_name, stream_description) in zip(
|
||||
stream_names, stream_descriptions, strict=False
|
||||
)
|
||||
],
|
||||
)
|
||||
|
||||
self.assert_length(new_streams, 3)
|
||||
self.assert_length(existing_streams, 0)
|
||||
|
||||
actual_stream_names = {stream.name for stream in new_streams}
|
||||
self.assertEqual(actual_stream_names, set(stream_names))
|
||||
actual_stream_descriptions = {stream.description for stream in new_streams}
|
||||
self.assertEqual(actual_stream_descriptions, set(stream_descriptions))
|
||||
for stream in new_streams:
|
||||
self.assertTrue(stream.invite_only)
|
||||
self.assertTrue(stream.message_retention_days == -1)
|
||||
self.assertEqual(stream.can_remove_subscribers_group.id, moderators_system_group.id)
|
||||
# Streams created where acting_user is None have no creator
|
||||
self.assertIsNone(stream.creator_id)
|
||||
|
||||
new_streams, existing_streams = create_streams_if_needed(
|
||||
realm,
|
||||
[
|
||||
{"name": stream_name, "description": stream_description, "invite_only": True}
|
||||
for (stream_name, stream_description) in zip(
|
||||
stream_names, stream_descriptions, strict=False
|
||||
)
|
||||
],
|
||||
)
|
||||
|
||||
self.assert_length(new_streams, 0)
|
||||
self.assert_length(existing_streams, 3)
|
||||
|
||||
actual_stream_names = {stream.name for stream in existing_streams}
|
||||
self.assertEqual(actual_stream_names, set(stream_names))
|
||||
actual_stream_descriptions = {stream.description for stream in existing_streams}
|
||||
self.assertEqual(actual_stream_descriptions, set(stream_descriptions))
|
||||
for stream in existing_streams:
|
||||
self.assertTrue(stream.invite_only)
|
||||
|
||||
def test_create_api_multiline_description(self) -> None:
|
||||
user = self.example_user("hamlet")
|
||||
realm = user.realm
|
||||
self.login_user(user)
|
||||
subscriptions = [{"name": "new_stream", "description": "multi\nline\ndescription"}]
|
||||
result = self.subscribe_via_post(user, subscriptions, subdomain="zulip")
|
||||
self.assert_json_success(result)
|
||||
stream = get_stream("new_stream", realm)
|
||||
self.assertEqual(stream.description, "multi line description")
|
||||
|
||||
def test_create_api_topic_permalink_description(self) -> None:
|
||||
user = self.example_user("iago")
|
||||
realm = user.realm
|
||||
self.login_user(user)
|
||||
|
||||
hamlet = self.example_user("hamlet")
|
||||
core_stream = self.make_stream("core", realm, True, history_public_to_subscribers=True)
|
||||
self.subscribe(hamlet, "core")
|
||||
msg_id = self.send_stream_message(hamlet, "core", topic_name="testing")
|
||||
|
||||
# Test permalink not generated for description since user has no access to
|
||||
# the channel.
|
||||
subscriptions = [{"name": "stream1", "description": "#**core>testing**"}]
|
||||
result = self.subscribe_via_post(user, subscriptions, subdomain="zulip")
|
||||
self.assert_json_success(result)
|
||||
stream = get_stream("stream1", realm)
|
||||
|
||||
self.assertEqual(stream.rendered_description, "<p>#<strong>core>testing</strong></p>")
|
||||
|
||||
self.subscribe(user, "core")
|
||||
|
||||
# Test permalink generated for the description since user now has access
|
||||
# to the channel.
|
||||
subscriptions = [{"name": "stream2", "description": "#**core>testing**"}]
|
||||
result = self.subscribe_via_post(user, subscriptions, subdomain="zulip")
|
||||
self.assert_json_success(result)
|
||||
stream = get_stream("stream2", realm)
|
||||
|
||||
self.assertEqual(
|
||||
stream.rendered_description,
|
||||
f'<p><a class="stream-topic" data-stream-id="{core_stream.id}" href="/#narrow/channel/{core_stream.id}-core/topic/testing/with/{msg_id}">#{core_stream.name} > testing</a></p>',
|
||||
)
|
||||
|
||||
def test_history_public_to_subscribers_on_stream_creation(self) -> None:
|
||||
realm = get_realm("zulip")
|
||||
stream_dicts: list[StreamDict] = [
|
||||
{
|
||||
"name": "publicstream",
|
||||
"description": "Public stream with public history",
|
||||
},
|
||||
{"name": "webpublicstream", "description": "Web-public stream", "is_web_public": True},
|
||||
{
|
||||
"name": "privatestream",
|
||||
"description": "Private stream with non-public history",
|
||||
"invite_only": True,
|
||||
},
|
||||
{
|
||||
"name": "privatewithhistory",
|
||||
"description": "Private stream with public history",
|
||||
"invite_only": True,
|
||||
"history_public_to_subscribers": True,
|
||||
},
|
||||
{
|
||||
"name": "publictrywithouthistory",
|
||||
"description": "Public stream without public history (disallowed)",
|
||||
"invite_only": False,
|
||||
"history_public_to_subscribers": False,
|
||||
},
|
||||
]
|
||||
|
||||
created, existing = create_streams_if_needed(realm, stream_dicts)
|
||||
|
||||
self.assert_length(created, 5)
|
||||
self.assert_length(existing, 0)
|
||||
for stream in created:
|
||||
if stream.name == "publicstream":
|
||||
self.assertTrue(stream.history_public_to_subscribers)
|
||||
if stream.name == "webpublicstream":
|
||||
self.assertTrue(stream.history_public_to_subscribers)
|
||||
if stream.name == "privatestream":
|
||||
self.assertFalse(stream.history_public_to_subscribers)
|
||||
if stream.name == "privatewithhistory":
|
||||
self.assertTrue(stream.history_public_to_subscribers)
|
||||
if stream.name == "publictrywithouthistory":
|
||||
self.assertTrue(stream.history_public_to_subscribers)
|
||||
|
||||
def test_add_stream_as_default_on_stream_creation(self) -> None:
|
||||
user_profile = self.example_user("hamlet")
|
||||
self.login_user(user_profile)
|
||||
realm = user_profile.realm
|
||||
|
||||
subscriptions = [
|
||||
{"name": "default_stream", "description": "This stream is default for new users"}
|
||||
]
|
||||
result = self.subscribe_via_post(
|
||||
user_profile,
|
||||
subscriptions,
|
||||
{"is_default_stream": "true"},
|
||||
allow_fail=True,
|
||||
subdomain="zulip",
|
||||
)
|
||||
self.assert_json_error(result, "Insufficient permission")
|
||||
|
||||
do_change_user_role(user_profile, UserProfile.ROLE_REALM_ADMINISTRATOR, acting_user=None)
|
||||
result = self.subscribe_via_post(
|
||||
user_profile, subscriptions, {"is_default_stream": "true"}, subdomain="zulip"
|
||||
)
|
||||
self.assert_json_success(result)
|
||||
default_stream = get_stream("default_stream", realm)
|
||||
self.assertTrue(default_stream.id in get_default_stream_ids_for_realm(realm.id))
|
||||
|
||||
subscriptions = [
|
||||
{
|
||||
"name": "private_default_stream",
|
||||
"description": "This stream is private and default for new users",
|
||||
}
|
||||
]
|
||||
result = self.subscribe_via_post(
|
||||
user_profile,
|
||||
subscriptions,
|
||||
{"is_default_stream": "true"},
|
||||
invite_only=True,
|
||||
allow_fail=True,
|
||||
subdomain="zulip",
|
||||
)
|
||||
self.assert_json_error(result, "A default channel cannot be private.")
|
||||
|
||||
def test_history_public_to_subscribers_zephyr_realm(self) -> None:
|
||||
realm = get_realm("zephyr")
|
||||
|
||||
stream, created = create_stream_if_needed(realm, "private_stream", invite_only=True)
|
||||
self.assertTrue(created)
|
||||
self.assertTrue(stream.invite_only)
|
||||
self.assertFalse(stream.history_public_to_subscribers)
|
||||
|
||||
stream, created = create_stream_if_needed(realm, "public_stream", invite_only=False)
|
||||
self.assertTrue(created)
|
||||
self.assertFalse(stream.invite_only)
|
||||
self.assertFalse(stream.history_public_to_subscribers)
|
||||
|
||||
def test_auto_mark_stream_created_message_as_read_for_stream_creator(self) -> None:
|
||||
# This test relies on email == delivery_email for
|
||||
# convenience.
|
||||
reset_email_visibility_to_everyone_in_zulip_realm()
|
||||
|
||||
realm = Realm.objects.get(name="Zulip Dev")
|
||||
iago = self.example_user("iago")
|
||||
hamlet = self.example_user("hamlet")
|
||||
cordelia = self.example_user("cordelia")
|
||||
aaron = self.example_user("aaron")
|
||||
|
||||
# Establish a stream for notifications.
|
||||
announce_stream = ensure_stream(
|
||||
realm, "announce", False, "announcements here.", acting_user=None
|
||||
)
|
||||
realm.new_stream_announcements_stream_id = announce_stream.id
|
||||
realm.save(update_fields=["new_stream_announcements_stream_id"])
|
||||
|
||||
self.subscribe(iago, announce_stream.name)
|
||||
self.subscribe(hamlet, announce_stream.name)
|
||||
|
||||
self.login_user(iago)
|
||||
|
||||
initial_message_count = Message.objects.count()
|
||||
initial_usermessage_count = UserMessage.objects.count()
|
||||
|
||||
data = {
|
||||
"subscriptions": '[{"name":"brand new stream","description":""}]',
|
||||
"history_public_to_subscribers": "true",
|
||||
"invite_only": "false",
|
||||
"announce": "true",
|
||||
"principals": orjson.dumps([iago.id, aaron.id, cordelia.id, hamlet.id]).decode(),
|
||||
}
|
||||
|
||||
response = self.client_post("/json/users/me/subscriptions", data)
|
||||
|
||||
final_message_count = Message.objects.count()
|
||||
final_usermessage_count = UserMessage.objects.count()
|
||||
|
||||
expected_response = {
|
||||
"result": "success",
|
||||
"msg": "",
|
||||
"subscribed": {
|
||||
"10": ["brand new stream"],
|
||||
"11": ["brand new stream"],
|
||||
"6": ["brand new stream"],
|
||||
"8": ["brand new stream"],
|
||||
},
|
||||
"already_subscribed": {},
|
||||
}
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertEqual(orjson.loads(response.content), expected_response)
|
||||
|
||||
# 2 messages should be created, one in announce and one in the new stream itself.
|
||||
self.assertEqual(final_message_count - initial_message_count, 2)
|
||||
# 4 UserMessages per subscriber: One for each of the subscribers, plus 1 for
|
||||
# each user in the notifications stream.
|
||||
announce_stream_subs = Subscription.objects.filter(recipient=announce_stream.recipient)
|
||||
self.assertEqual(
|
||||
final_usermessage_count - initial_usermessage_count, 4 + announce_stream_subs.count()
|
||||
)
|
||||
|
||||
def get_unread_stream_data(user: UserProfile) -> list[UnreadStreamInfo]:
|
||||
raw_unread_data = get_raw_unread_data(user)
|
||||
aggregated_data = aggregate_unread_data(raw_unread_data, allow_empty_topic_name=True)
|
||||
return aggregated_data["streams"]
|
||||
|
||||
stream_id = Stream.objects.get(name="brand new stream").id
|
||||
iago_unread_messages = get_unread_stream_data(iago)
|
||||
hamlet_unread_messages = get_unread_stream_data(hamlet)
|
||||
|
||||
# The stream creation messages should be unread for Hamlet
|
||||
self.assert_length(hamlet_unread_messages, 2)
|
||||
|
||||
# According to the code in zerver/views/streams/add_subscriptions_backend
|
||||
# the notification stream message is sent first, then the new stream's message.
|
||||
self.assertEqual(hamlet_unread_messages[1]["stream_id"], stream_id)
|
||||
|
||||
# But it should be marked as read for Iago, the stream creator.
|
||||
self.assert_length(iago_unread_messages, 0)
|
||||
|
||||
def test_can_administer_channel_group_default_on_stream_creation(self) -> None:
|
||||
user = self.example_user("hamlet")
|
||||
realm = user.realm
|
||||
self.login_user(user)
|
||||
nobody_system_group = NamedUserGroup.objects.get(
|
||||
name="role:nobody", realm=realm, is_system_group=True
|
||||
)
|
||||
|
||||
stream, created = create_stream_if_needed(
|
||||
realm, "new stream without acting user", invite_only=True
|
||||
)
|
||||
self.assertEqual(stream.can_administer_channel_group.id, nobody_system_group.id)
|
||||
|
||||
stream, created = create_stream_if_needed(
|
||||
realm, "new stream with acting user", acting_user=user
|
||||
)
|
||||
self.assertCountEqual(stream.can_administer_channel_group.direct_members.all(), [user])
|
||||
|
||||
def do_test_permission_setting_on_stream_creation(self, setting_name: str) -> None:
|
||||
user = self.example_user("hamlet")
|
||||
realm = user.realm
|
||||
self.login_user(user)
|
||||
moderators_system_group = NamedUserGroup.objects.get(
|
||||
name="role:moderators", realm=realm, is_system_group=True
|
||||
)
|
||||
|
||||
permission_config = Stream.stream_permission_group_settings[setting_name]
|
||||
|
||||
subscriptions = [{"name": "new_stream", "description": "New stream"}]
|
||||
extra_post_data = {}
|
||||
extra_post_data[setting_name] = orjson.dumps(moderators_system_group.id).decode()
|
||||
result = self.subscribe_via_post(
|
||||
user,
|
||||
subscriptions,
|
||||
extra_post_data,
|
||||
subdomain="zulip",
|
||||
)
|
||||
self.assert_json_success(result)
|
||||
stream = get_stream("new_stream", realm)
|
||||
self.assertEqual(getattr(stream, setting_name).id, moderators_system_group.id)
|
||||
# Delete the created stream, so we can create a new one for
|
||||
# testing another setting value.
|
||||
stream.delete()
|
||||
|
||||
subscriptions = [{"name": "new_stream", "description": "New stream"}]
|
||||
result = self.subscribe_via_post(user, subscriptions, subdomain="zulip")
|
||||
self.assert_json_success(result)
|
||||
stream = get_stream("new_stream", realm)
|
||||
if permission_config.default_group_name == "stream_creator_or_nobody":
|
||||
self.assertEqual(list(getattr(stream, setting_name).direct_members.all()), [user])
|
||||
self.assertEqual(
|
||||
list(getattr(stream, setting_name).direct_subgroups.all()),
|
||||
[],
|
||||
)
|
||||
else:
|
||||
default_group = NamedUserGroup.objects.get(
|
||||
name=permission_config.default_group_name, realm=realm, is_system_group=True
|
||||
)
|
||||
self.assertEqual(getattr(stream, setting_name).id, default_group.id)
|
||||
# Delete the created stream, so we can create a new one for
|
||||
# testing another setting value.
|
||||
stream.delete()
|
||||
|
||||
hamletcharacters_group = NamedUserGroup.objects.get(name="hamletcharacters", realm=realm)
|
||||
subscriptions = [{"name": "new_stream", "description": "New stream"}]
|
||||
extra_post_data[setting_name] = orjson.dumps(hamletcharacters_group.id).decode()
|
||||
result = self.subscribe_via_post(
|
||||
user,
|
||||
subscriptions,
|
||||
extra_post_data,
|
||||
allow_fail=True,
|
||||
subdomain="zulip",
|
||||
)
|
||||
self.assert_json_success(result)
|
||||
stream = get_stream("new_stream", realm)
|
||||
self.assertEqual(getattr(stream, setting_name).id, hamletcharacters_group.id)
|
||||
# Delete the created stream, so we can create a new one for
|
||||
# testing another setting value.
|
||||
stream.delete()
|
||||
|
||||
subscriptions = [{"name": "new_stream", "description": "New stream"}]
|
||||
extra_post_data[setting_name] = orjson.dumps(
|
||||
{"direct_members": [user.id], "direct_subgroups": [moderators_system_group.id]}
|
||||
).decode()
|
||||
result = self.subscribe_via_post(
|
||||
user,
|
||||
subscriptions,
|
||||
extra_post_data,
|
||||
allow_fail=True,
|
||||
subdomain="zulip",
|
||||
)
|
||||
self.assert_json_success(result)
|
||||
stream = get_stream("new_stream", realm)
|
||||
self.assertEqual(list(getattr(stream, setting_name).direct_members.all()), [user])
|
||||
self.assertEqual(
|
||||
list(getattr(stream, setting_name).direct_subgroups.all()),
|
||||
[moderators_system_group],
|
||||
)
|
||||
# Delete the created stream, so we can create a new one for
|
||||
# testing another setting value.
|
||||
stream.delete()
|
||||
|
||||
nobody_group = NamedUserGroup.objects.get(
|
||||
name="role:nobody", is_system_group=True, realm=realm
|
||||
)
|
||||
|
||||
subscriptions = [{"name": "new_stream", "description": "New stream"}]
|
||||
extra_post_data[setting_name] = orjson.dumps(
|
||||
{"direct_members": [], "direct_subgroups": []}
|
||||
).decode()
|
||||
result = self.subscribe_via_post(
|
||||
user,
|
||||
subscriptions,
|
||||
extra_post_data,
|
||||
allow_fail=True,
|
||||
subdomain="zulip",
|
||||
)
|
||||
self.assert_json_success(result)
|
||||
stream = get_stream("new_stream", realm)
|
||||
self.assertEqual(getattr(stream, setting_name).id, nobody_group.id)
|
||||
# Delete the created stream, so we can create a new one for
|
||||
# testing another setting value.
|
||||
stream.delete()
|
||||
|
||||
subscriptions = [{"name": "new_stream", "description": "New stream"}]
|
||||
owners_group = NamedUserGroup.objects.get(
|
||||
name="role:owners", is_system_group=True, realm=realm
|
||||
)
|
||||
extra_post_data[setting_name] = orjson.dumps(owners_group.id).decode()
|
||||
result = self.subscribe_via_post(
|
||||
user,
|
||||
subscriptions,
|
||||
extra_post_data,
|
||||
allow_fail=True,
|
||||
subdomain="zulip",
|
||||
)
|
||||
self.assert_json_success(result)
|
||||
stream = get_stream("new_stream", realm)
|
||||
self.assertEqual(getattr(stream, setting_name).id, owners_group.id)
|
||||
# Delete the created stream, so we can create a new one for
|
||||
# testing another setting value.
|
||||
stream.delete()
|
||||
|
||||
subscriptions = [{"name": "new_stream", "description": "New stream"}]
|
||||
extra_post_data[setting_name] = orjson.dumps(nobody_group.id).decode()
|
||||
result = self.subscribe_via_post(
|
||||
user,
|
||||
subscriptions,
|
||||
extra_post_data,
|
||||
allow_fail=True,
|
||||
subdomain="zulip",
|
||||
)
|
||||
self.assert_json_success(result)
|
||||
stream = get_stream("new_stream", realm)
|
||||
self.assertEqual(getattr(stream, setting_name).id, nobody_group.id)
|
||||
# Delete the created stream, so we can create a new one for
|
||||
# testing another setting value.
|
||||
stream.delete()
|
||||
|
||||
subscriptions = [{"name": "new_stream", "description": "New stream"}]
|
||||
everyone_group = NamedUserGroup.objects.get(
|
||||
name="role:everyone", is_system_group=True, realm=realm
|
||||
)
|
||||
extra_post_data[setting_name] = orjson.dumps(everyone_group.id).decode()
|
||||
result = self.subscribe_via_post(
|
||||
user,
|
||||
subscriptions,
|
||||
extra_post_data,
|
||||
allow_fail=True,
|
||||
subdomain="zulip",
|
||||
)
|
||||
if permission_config.allow_everyone_group:
|
||||
self.assert_json_success(result)
|
||||
stream = get_stream("new_stream", realm)
|
||||
self.assertEqual(getattr(stream, setting_name).id, everyone_group.id)
|
||||
# Delete the created stream, so we can create a new one for
|
||||
# testing another setting value.
|
||||
stream.delete()
|
||||
else:
|
||||
self.assert_json_error(
|
||||
result,
|
||||
f"'{setting_name}' setting cannot be set to 'role:everyone' group.",
|
||||
)
|
||||
|
||||
subscriptions = [{"name": "new_stream", "description": "New stream"}]
|
||||
internet_group = NamedUserGroup.objects.get(
|
||||
name="role:internet", is_system_group=True, realm=realm
|
||||
)
|
||||
extra_post_data[setting_name] = orjson.dumps(internet_group.id).decode()
|
||||
result = self.subscribe_via_post(
|
||||
user,
|
||||
subscriptions,
|
||||
extra_post_data,
|
||||
allow_fail=True,
|
||||
subdomain="zulip",
|
||||
)
|
||||
self.assert_json_error(
|
||||
result,
|
||||
f"'{setting_name}' setting cannot be set to 'role:internet' group.",
|
||||
)
|
||||
|
||||
def test_permission_settings_on_stream_creation(self) -> None:
|
||||
for setting_name in Stream.stream_permission_group_settings:
|
||||
self.do_test_permission_setting_on_stream_creation(setting_name)
|
||||
|
||||
def test_default_permission_settings_on_stream_creation(self) -> None:
|
||||
hamlet = self.example_user("hamlet")
|
||||
realm = hamlet.realm
|
||||
subscriptions = [{"name": "new_stream", "description": "New stream"}]
|
||||
|
||||
self.login("hamlet")
|
||||
with self.capture_send_event_calls(expected_num_events=4) as events:
|
||||
result = self.subscribe_via_post(
|
||||
hamlet,
|
||||
subscriptions,
|
||||
)
|
||||
self.assert_json_success(result)
|
||||
|
||||
nobody_group = NamedUserGroup.objects.get(
|
||||
name=SystemGroups.NOBODY, realm=realm, is_system_group=True
|
||||
)
|
||||
admins_group = NamedUserGroup.objects.get(
|
||||
name=SystemGroups.ADMINISTRATORS, realm=realm, is_system_group=True
|
||||
)
|
||||
everyone_group = NamedUserGroup.objects.get(
|
||||
name=SystemGroups.EVERYONE, realm=realm, is_system_group=True
|
||||
)
|
||||
|
||||
stream = get_stream("new_stream", realm)
|
||||
self.assertEqual(
|
||||
list(
|
||||
stream.can_administer_channel_group.direct_members.all().values_list(
|
||||
"id", flat=True
|
||||
)
|
||||
),
|
||||
[hamlet.id],
|
||||
)
|
||||
self.assertEqual(
|
||||
list(
|
||||
stream.can_administer_channel_group.direct_subgroups.all().values_list(
|
||||
"id", flat=True
|
||||
)
|
||||
),
|
||||
[],
|
||||
)
|
||||
|
||||
self.assertEqual(stream.can_add_subscribers_group_id, nobody_group.id)
|
||||
self.assertEqual(stream.can_remove_subscribers_group_id, admins_group.id)
|
||||
self.assertEqual(stream.can_send_message_group_id, everyone_group.id)
|
||||
self.assertEqual(stream.can_subscribe_group_id, nobody_group.id)
|
||||
|
||||
# Check setting values sent in stream creation events.
|
||||
event_stream = events[0]["event"]["streams"][0]
|
||||
self.assertEqual(
|
||||
event_stream["can_administer_channel_group"],
|
||||
UserGroupMembersDict(direct_members=[hamlet.id], direct_subgroups=[]),
|
||||
)
|
||||
|
||||
self.assertEqual(event_stream["can_add_subscribers_group"], nobody_group.id)
|
||||
self.assertEqual(event_stream["can_remove_subscribers_group"], admins_group.id)
|
||||
self.assertEqual(event_stream["can_send_message_group"], everyone_group.id)
|
||||
self.assertEqual(event_stream["can_subscribe_group"], nobody_group.id)
|
||||
|
||||
def test_acting_user_is_creator(self) -> None:
|
||||
"""
|
||||
If backend calls provide an acting_user while trying to
|
||||
create streams, assign acting_user as the stream creator
|
||||
"""
|
||||
hamlet = self.example_user("hamlet")
|
||||
new_streams, _ = create_streams_if_needed(
|
||||
hamlet.realm,
|
||||
[
|
||||
StreamDict(
|
||||
name="hamlet's test stream",
|
||||
description="No description",
|
||||
invite_only=True,
|
||||
is_web_public=True,
|
||||
)
|
||||
],
|
||||
acting_user=hamlet,
|
||||
)
|
||||
created_stream = new_streams[0]
|
||||
self.assertEqual(created_stream.creator_id, hamlet.id)
|
||||
|
||||
def test_channel_create_message_exists_for_all_policy_types(self) -> None:
|
||||
"""
|
||||
Create a channel for each policy type to ensure they all have a "new channel" message.
|
||||
"""
|
||||
# this is to check if the appropriate channel name is present in the "new channel" message
|
||||
policy_key_map: dict[str, str] = {
|
||||
"web_public": "**Web-public**",
|
||||
"public": "**Public**",
|
||||
"private_shared_history": "**Private, shared history**",
|
||||
"private_protected_history": "**Private, protected history**",
|
||||
}
|
||||
for policy_key, policy_dict in Stream.PERMISSION_POLICIES.items():
|
||||
channel_creator = self.example_user("desdemona")
|
||||
subdomain = "zulip"
|
||||
|
||||
if policy_key == "public_protected_history":
|
||||
# This is a special channel policy only available in Zephyr realms.
|
||||
channel_creator = self.mit_user("starnine")
|
||||
subdomain = "zephyr"
|
||||
|
||||
self.login_user(channel_creator)
|
||||
new_channel_name = f"New {policy_key} channel"
|
||||
result = self.api_post(
|
||||
channel_creator,
|
||||
"/json/users/me/subscriptions",
|
||||
{
|
||||
"subscriptions": orjson.dumps([{"name": new_channel_name}]).decode(),
|
||||
"is_web_public": orjson.dumps(policy_dict["is_web_public"]).decode(),
|
||||
"invite_only": orjson.dumps(policy_dict["invite_only"]).decode(),
|
||||
"history_public_to_subscribers": orjson.dumps(
|
||||
policy_dict["history_public_to_subscribers"]
|
||||
).decode(),
|
||||
},
|
||||
subdomain=subdomain,
|
||||
)
|
||||
self.assert_json_success(result)
|
||||
new_channel = get_stream(new_channel_name, channel_creator.realm)
|
||||
channel_events_messages = get_topic_messages(
|
||||
channel_creator, new_channel, "channel events"
|
||||
)
|
||||
if policy_key == "public_protected_history":
|
||||
# These do not get channel creation notification.
|
||||
self.assert_length(channel_events_messages, 0)
|
||||
continue
|
||||
|
||||
self.assert_length(channel_events_messages, 1)
|
||||
self.assertIn(policy_key_map[policy_key], channel_events_messages[0].content)
|
||||
|
||||
def test_adding_channels_to_folder_during_creation(self) -> None:
|
||||
realm = get_realm("zulip")
|
||||
iago = self.example_user("iago")
|
||||
hamlet = self.example_user("hamlet")
|
||||
channel_folder = check_add_channel_folder("Backend", "", acting_user=iago)
|
||||
|
||||
subscriptions = [
|
||||
{"name": "new_stream", "description": "New stream"},
|
||||
{"name": "new_stream_2", "description": "New stream 2"},
|
||||
]
|
||||
extra_post_data = {}
|
||||
|
||||
extra_post_data["folder_id"] = orjson.dumps(99).decode()
|
||||
result = self.subscribe_via_post(
|
||||
hamlet,
|
||||
subscriptions,
|
||||
extra_post_data,
|
||||
allow_fail=True,
|
||||
subdomain="zulip",
|
||||
)
|
||||
self.assert_json_error(result, "Invalid channel folder ID")
|
||||
|
||||
extra_post_data["folder_id"] = orjson.dumps(channel_folder.id).decode()
|
||||
result = self.subscribe_via_post(
|
||||
hamlet,
|
||||
subscriptions,
|
||||
extra_post_data,
|
||||
subdomain="zulip",
|
||||
)
|
||||
stream = get_stream("new_stream", realm)
|
||||
self.assertEqual(stream.folder, channel_folder)
|
||||
stream = get_stream("new_stream_2", realm)
|
||||
self.assertEqual(stream.folder, channel_folder)
|
||||
|
||||
subscriptions = [
|
||||
{"name": "new_stream_3", "description": "New stream 3"},
|
||||
{"name": "new_stream_4", "description": "New stream 4"},
|
||||
]
|
||||
extra_post_data = {}
|
||||
result = self.subscribe_via_post(
|
||||
hamlet,
|
||||
subscriptions,
|
||||
extra_post_data,
|
||||
subdomain="zulip",
|
||||
)
|
||||
stream = get_stream("new_stream_3", realm)
|
||||
self.assertIsNone(stream.folder)
|
||||
stream = get_stream("new_stream_4", realm)
|
||||
self.assertIsNone(stream.folder)
|
||||
|
||||
|
||||
class RecipientTest(ZulipTestCase):
|
||||
def test_recipient(self) -> None:
|
||||
realm = get_realm("zulip")
|
||||
@@ -3476,82 +2761,6 @@ class StreamAdminTest(ZulipTestCase):
|
||||
)
|
||||
self.assertEqual(messages[-1].content, expected_notification)
|
||||
|
||||
def test_stream_message_retention_days_on_stream_creation(self) -> None:
|
||||
"""
|
||||
Only admins can create streams with message_retention_days
|
||||
with value other than None.
|
||||
"""
|
||||
admin = self.example_user("iago")
|
||||
|
||||
streams_raw: list[StreamDict] = [
|
||||
{
|
||||
"name": "new_stream",
|
||||
"message_retention_days": 10,
|
||||
"is_web_public": False,
|
||||
}
|
||||
]
|
||||
with self.assertRaisesRegex(JsonableError, "Must be an organization owner"):
|
||||
list_to_streams(streams_raw, admin, autocreate=True)
|
||||
|
||||
streams_raw = [
|
||||
{
|
||||
"name": "new_stream",
|
||||
"message_retention_days": -1,
|
||||
"is_web_public": False,
|
||||
}
|
||||
]
|
||||
with self.assertRaisesRegex(JsonableError, "Must be an organization owner"):
|
||||
list_to_streams(streams_raw, admin, autocreate=True)
|
||||
|
||||
streams_raw = [
|
||||
{
|
||||
"name": "new_stream",
|
||||
"message_retention_days": None,
|
||||
"is_web_public": False,
|
||||
}
|
||||
]
|
||||
result = list_to_streams(streams_raw, admin, autocreate=True)
|
||||
self.assert_length(result[0], 0)
|
||||
self.assert_length(result[1], 1)
|
||||
self.assertEqual(result[1][0].name, "new_stream")
|
||||
self.assertEqual(result[1][0].message_retention_days, None)
|
||||
|
||||
owner = self.example_user("desdemona")
|
||||
realm = owner.realm
|
||||
streams_raw = [
|
||||
{
|
||||
"name": "new_stream1",
|
||||
"message_retention_days": 10,
|
||||
"is_web_public": False,
|
||||
},
|
||||
{
|
||||
"name": "new_stream2",
|
||||
"message_retention_days": -1,
|
||||
"is_web_public": False,
|
||||
},
|
||||
{
|
||||
"name": "new_stream3",
|
||||
"is_web_public": False,
|
||||
},
|
||||
]
|
||||
|
||||
do_change_realm_plan_type(realm, Realm.PLAN_TYPE_LIMITED, acting_user=admin)
|
||||
with self.assertRaisesRegex(
|
||||
JsonableError, "Available on Zulip Cloud Standard. Upgrade to access."
|
||||
):
|
||||
list_to_streams(streams_raw, owner, autocreate=True)
|
||||
|
||||
do_change_realm_plan_type(realm, Realm.PLAN_TYPE_SELF_HOSTED, acting_user=admin)
|
||||
result = list_to_streams(streams_raw, owner, autocreate=True)
|
||||
self.assert_length(result[0], 0)
|
||||
self.assert_length(result[1], 3)
|
||||
self.assertEqual(result[1][0].name, "new_stream1")
|
||||
self.assertEqual(result[1][0].message_retention_days, 10)
|
||||
self.assertEqual(result[1][1].name, "new_stream2")
|
||||
self.assertEqual(result[1][1].message_retention_days, -1)
|
||||
self.assertEqual(result[1][2].name, "new_stream3")
|
||||
self.assertEqual(result[1][2].message_retention_days, None)
|
||||
|
||||
def set_up_stream_for_archiving(
|
||||
self, stream_name: str, invite_only: bool = False, subscribed: bool = True
|
||||
) -> Stream:
|
||||
|
||||
Reference in New Issue
Block a user