mirror of
https://github.com/zulip/zulip.git
synced 2025-10-23 16:14:02 +00:00
For get and filter queries of NamedUserGroup, realm_for_sharding field is used instead of realm field, as directly using realm_for_sharding field on NamedUserGroup makes the query faster than using realm present on the base UserGroup table.
1457 lines
60 KiB
Python
1457 lines
60 KiB
Python
from typing import TYPE_CHECKING, Any
|
|
|
|
import orjson
|
|
from django.conf import settings
|
|
from django.test import override_settings
|
|
from typing_extensions import override
|
|
|
|
from zerver.actions.streams import (
|
|
do_change_stream_group_based_setting,
|
|
do_change_stream_permission,
|
|
do_deactivate_stream,
|
|
)
|
|
from zerver.lib.email_mirror_helpers import encode_email_address, get_channel_email_token
|
|
from zerver.lib.subscription_info import gather_subscriptions, gather_subscriptions_helper
|
|
from zerver.lib.test_classes import ZulipTestCase
|
|
from zerver.lib.test_helpers import most_recent_message
|
|
from zerver.lib.types import (
|
|
APIStreamDict,
|
|
APISubscriptionDict,
|
|
NeverSubscribedStreamDict,
|
|
SubscriptionInfo,
|
|
UserGroupMembersData,
|
|
UserGroupMembersDict,
|
|
)
|
|
from zerver.models import NamedUserGroup, Realm, Stream, Subscription, UserProfile
|
|
from zerver.models.groups import SystemGroups
|
|
from zerver.models.realms import get_realm
|
|
from zerver.models.streams import get_stream
|
|
from zerver.models.users import get_system_bot
|
|
|
|
if TYPE_CHECKING:
|
|
from django.test.client import _MonkeyPatchedWSGIResponse as TestHttpResponse
|
|
|
|
|
|
def fix_expected_fields_for_stream_group_settings(expected_fields: set[str]) -> set[str]:
|
|
for setting_name in Stream.stream_permission_group_settings:
|
|
expected_fields -= {setting_name + "_id"}
|
|
expected_fields |= {setting_name}
|
|
|
|
return expected_fields
|
|
|
|
|
|
class GetStreamsTest(ZulipTestCase):
|
|
def test_streams_api_for_bot_owners(self) -> None:
|
|
hamlet = self.example_user("hamlet")
|
|
test_bot = self.create_test_bot("foo", hamlet)
|
|
assert test_bot is not None
|
|
realm = get_realm("zulip")
|
|
self.login_user(hamlet)
|
|
|
|
# Check it correctly lists the bot owner's subs with
|
|
# include_owner_subscribed=true
|
|
filters = dict(
|
|
include_owner_subscribed="true",
|
|
include_public="false",
|
|
include_subscribed="false",
|
|
)
|
|
with self.assert_database_query_count(7):
|
|
result = self.api_get(test_bot, "/api/v1/streams", filters)
|
|
owner_subs = self.api_get(hamlet, "/api/v1/users/me/subscriptions")
|
|
|
|
json = self.assert_json_success(result)
|
|
self.assertIn("streams", json)
|
|
self.assertIsInstance(json["streams"], list)
|
|
|
|
self.assert_json_success(owner_subs)
|
|
owner_subs_json = orjson.loads(owner_subs.content)
|
|
|
|
self.assertEqual(
|
|
sorted(s["name"] for s in json["streams"]),
|
|
sorted(s["name"] for s in owner_subs_json["subscriptions"]),
|
|
)
|
|
|
|
# Check it correctly lists the bot owner's subs and the
|
|
# bot's subs
|
|
self.subscribe(test_bot, "Scotland")
|
|
filters = dict(
|
|
include_owner_subscribed="true",
|
|
include_public="false",
|
|
include_subscribed="true",
|
|
)
|
|
with self.assert_database_query_count(8):
|
|
result = self.api_get(test_bot, "/api/v1/streams", filters)
|
|
|
|
json = self.assert_json_success(result)
|
|
self.assertIn("streams", json)
|
|
self.assertIsInstance(json["streams"], list)
|
|
|
|
actual = sorted(s["name"] for s in json["streams"])
|
|
expected = [s["name"] for s in owner_subs_json["subscriptions"]]
|
|
expected.append("Scotland")
|
|
expected.sort()
|
|
|
|
self.assertEqual(actual, expected)
|
|
|
|
# Check it correctly lists the bot owner's subs + all public streams
|
|
self.make_stream("private_stream", realm=realm, invite_only=True)
|
|
self.subscribe(test_bot, "private_stream")
|
|
with self.assert_database_query_count(7):
|
|
result = self.api_get(
|
|
test_bot,
|
|
"/api/v1/streams",
|
|
{
|
|
"include_owner_subscribed": "true",
|
|
"include_public": "true",
|
|
"include_subscribed": "false",
|
|
},
|
|
)
|
|
|
|
json = self.assert_json_success(result)
|
|
self.assertIn("streams", json)
|
|
self.assertIsInstance(json["streams"], list)
|
|
|
|
actual = sorted(s["name"] for s in json["streams"])
|
|
expected = [s["name"] for s in owner_subs_json["subscriptions"]]
|
|
expected.extend(["Rome", "Venice", "Scotland"])
|
|
expected.sort()
|
|
|
|
self.assertEqual(actual, expected)
|
|
|
|
# Check it correctly lists the bot owner's subs + all public streams +
|
|
# the bot's subs
|
|
with self.assert_database_query_count(8):
|
|
result = self.api_get(
|
|
test_bot,
|
|
"/api/v1/streams",
|
|
{
|
|
"include_owner_subscribed": "true",
|
|
"include_public": "true",
|
|
"include_subscribed": "true",
|
|
},
|
|
)
|
|
|
|
json = self.assert_json_success(result)
|
|
self.assertIn("streams", json)
|
|
self.assertIsInstance(json["streams"], list)
|
|
|
|
actual = sorted(s["name"] for s in json["streams"])
|
|
expected = [s["name"] for s in owner_subs_json["subscriptions"]]
|
|
expected.extend(["Rome", "Venice", "Scotland", "private_stream"])
|
|
expected.sort()
|
|
|
|
self.assertEqual(actual, expected)
|
|
|
|
private_stream_2 = self.make_stream("private_stream_2", realm=realm, invite_only=True)
|
|
private_stream_3 = self.make_stream("private_stream_3", realm=realm, invite_only=True)
|
|
self.make_stream("private_stream_4", realm=realm, invite_only=True)
|
|
test_bot_group_member_dict = UserGroupMembersData(
|
|
direct_members=[test_bot.id], direct_subgroups=[]
|
|
)
|
|
do_change_stream_group_based_setting(
|
|
private_stream_2,
|
|
"can_add_subscribers_group",
|
|
test_bot_group_member_dict,
|
|
acting_user=hamlet,
|
|
)
|
|
do_change_stream_group_based_setting(
|
|
private_stream_3,
|
|
"can_administer_channel_group",
|
|
test_bot_group_member_dict,
|
|
acting_user=hamlet,
|
|
)
|
|
# Check it correctly lists the bot owner's subs + the channels
|
|
# bot has content access to.
|
|
with self.assert_database_query_count(10):
|
|
result = self.api_get(
|
|
test_bot,
|
|
"/api/v1/streams",
|
|
{
|
|
"include_owner_subscribed": "true",
|
|
"include_can_access_content": "true",
|
|
},
|
|
)
|
|
|
|
json = self.assert_json_success(result)
|
|
self.assertIn("streams", json)
|
|
self.assertIsInstance(json["streams"], list)
|
|
|
|
actual = sorted(s["name"] for s in json["streams"])
|
|
expected = [s["name"] for s in owner_subs_json["subscriptions"]]
|
|
expected.extend(["Rome", "Venice", "Scotland", "private_stream", "private_stream_2"])
|
|
expected.sort()
|
|
|
|
self.assertEqual(actual, expected)
|
|
|
|
def test_all_streams_api(self) -> None:
|
|
url = "/api/v1/streams"
|
|
data = {"include_all": "true"}
|
|
backward_compatible_data = {"include_all_active": "true"}
|
|
|
|
# Normal user should be able to make this request and get all
|
|
# the streams they have metadata access to.
|
|
normal_user = self.example_user("cordelia")
|
|
realm = normal_user.realm
|
|
normal_user_group_members_dict = UserGroupMembersData(
|
|
direct_members=[normal_user.id], direct_subgroups=[]
|
|
)
|
|
|
|
private_stream_1 = self.make_stream("private_stream_1", realm=realm, invite_only=True)
|
|
private_stream_2 = self.make_stream("private_stream_2", realm=realm, invite_only=True)
|
|
private_stream_3 = self.make_stream("private_stream_3", realm=realm, invite_only=True)
|
|
self.make_stream("private_stream_4", realm=realm, invite_only=True)
|
|
deactivated_public_stream = self.make_stream(
|
|
"deactivated_public_stream", realm=realm, invite_only=False
|
|
)
|
|
do_deactivate_stream(deactivated_public_stream, acting_user=normal_user)
|
|
|
|
self.subscribe(normal_user, private_stream_1.name)
|
|
do_change_stream_group_based_setting(
|
|
private_stream_2,
|
|
"can_add_subscribers_group",
|
|
normal_user_group_members_dict,
|
|
acting_user=normal_user,
|
|
)
|
|
do_change_stream_group_based_setting(
|
|
private_stream_3,
|
|
"can_administer_channel_group",
|
|
normal_user_group_members_dict,
|
|
acting_user=normal_user,
|
|
)
|
|
|
|
result_stream_names: list[str] = [
|
|
stream.name
|
|
for stream in Stream.objects.filter(realm=realm, invite_only=False, deactivated=False)
|
|
]
|
|
result_stream_names.extend(
|
|
[private_stream_1.name, private_stream_2.name, private_stream_3.name]
|
|
)
|
|
with self.assert_database_query_count(8):
|
|
result = self.api_get(normal_user, url, data)
|
|
json = self.assert_json_success(result)
|
|
self.assertEqual(sorted(s["name"] for s in json["streams"]), sorted(result_stream_names))
|
|
|
|
# Normal user should be able to make this request and get all
|
|
# the streams they have metadata access to.
|
|
guest_user = self.example_user("polonius")
|
|
guest_user_group_member_dict = UserGroupMembersData(
|
|
direct_members=[guest_user.id], direct_subgroups=[]
|
|
)
|
|
|
|
self.subscribe(guest_user, private_stream_1.name)
|
|
self.subscribe(guest_user, "design")
|
|
do_change_stream_group_based_setting(
|
|
private_stream_2,
|
|
"can_add_subscribers_group",
|
|
guest_user_group_member_dict,
|
|
acting_user=normal_user,
|
|
)
|
|
do_change_stream_group_based_setting(
|
|
get_stream("Rome", realm),
|
|
"can_add_subscribers_group",
|
|
guest_user_group_member_dict,
|
|
acting_user=normal_user,
|
|
)
|
|
do_change_stream_group_based_setting(
|
|
private_stream_3,
|
|
"can_administer_channel_group",
|
|
guest_user_group_member_dict,
|
|
acting_user=normal_user,
|
|
)
|
|
do_change_stream_group_based_setting(
|
|
get_stream("Denmark", realm),
|
|
"can_administer_channel_group",
|
|
guest_user_group_member_dict,
|
|
acting_user=normal_user,
|
|
)
|
|
|
|
# Guest user should not gain metadata access to a channel via
|
|
# `can_add_subscribers_group` or `can_administer_channel_group`
|
|
# since `allow_everyone_group` if false for both of those groups.
|
|
result_stream_names = ["Verona", "private_stream_1", "design", "Rome"]
|
|
with self.assert_database_query_count(7):
|
|
result = self.api_get(guest_user, url, data)
|
|
json = self.assert_json_success(result)
|
|
self.assertEqual(sorted(s["name"] for s in json["streams"]), sorted(result_stream_names))
|
|
|
|
# Realm admin users can see all active streams if
|
|
# `exclude_archived` is not set.
|
|
admin_user = self.example_user("iago")
|
|
self.assertTrue(admin_user.is_realm_admin)
|
|
|
|
with self.assert_database_query_count(7):
|
|
result = self.api_get(admin_user, url, data)
|
|
json = self.assert_json_success(result)
|
|
|
|
backward_compatible_result = self.api_get(admin_user, url, backward_compatible_data)
|
|
json_for_backward_compatible_request = self.assert_json_success(backward_compatible_result)
|
|
|
|
self.assertEqual(json, json_for_backward_compatible_request)
|
|
|
|
self.assertIn("streams", json)
|
|
self.assertIsInstance(json["streams"], list)
|
|
|
|
stream_names = {s["name"] for s in json["streams"]}
|
|
result_stream_names = [
|
|
stream.name for stream in Stream.objects.filter(realm=realm, deactivated=False)
|
|
]
|
|
self.assertEqual(
|
|
sorted(stream_names),
|
|
sorted(result_stream_names),
|
|
)
|
|
|
|
# Realm admin users can see all streams if `exclude_archived`
|
|
# is set to false.
|
|
data = {"include_all": "true", "exclude_archived": "false"}
|
|
with self.assert_database_query_count(7):
|
|
result = self.api_get(admin_user, url, data)
|
|
json = self.assert_json_success(result)
|
|
stream_names = {s["name"] for s in json["streams"]}
|
|
result_stream_names = [stream.name for stream in Stream.objects.filter(realm=realm)]
|
|
self.assertEqual(
|
|
sorted(stream_names),
|
|
sorted(result_stream_names),
|
|
)
|
|
|
|
# This case will not happen in practice, we are adding this
|
|
# test block to add coverage for the case where
|
|
# `get_metadata_access_streams` returns an empty list without
|
|
# query if an empty list of streams is passed to it.
|
|
all_active_streams = Stream.objects.filter(realm=realm, deactivated=False)
|
|
for stream in all_active_streams:
|
|
do_deactivate_stream(stream, acting_user=None)
|
|
|
|
data = {"include_all": "true"}
|
|
with self.assert_database_query_count(3):
|
|
result = self.api_get(admin_user, url, data)
|
|
json = self.assert_json_success(result)
|
|
stream_names = {s["name"] for s in json["streams"]}
|
|
self.assertEqual(stream_names, set())
|
|
|
|
def test_public_streams_api(self) -> None:
|
|
"""
|
|
Ensure that the query we use to get public streams successfully returns
|
|
a list of streams
|
|
"""
|
|
user = self.example_user("hamlet")
|
|
realm = get_realm("zulip")
|
|
self.login_user(user)
|
|
|
|
# Check it correctly lists the user's subs with include_public=false
|
|
result = self.api_get(user, "/api/v1/streams", {"include_public": "false"})
|
|
result2 = self.api_get(user, "/api/v1/users/me/subscriptions")
|
|
|
|
json = self.assert_json_success(result)
|
|
|
|
self.assertIn("streams", json)
|
|
|
|
self.assertIsInstance(json["streams"], list)
|
|
|
|
self.assert_json_success(result2)
|
|
json2 = orjson.loads(result2.content)
|
|
|
|
self.assertEqual(
|
|
sorted(s["name"] for s in json["streams"]),
|
|
sorted(s["name"] for s in json2["subscriptions"]),
|
|
)
|
|
|
|
# Check it correctly lists all public streams with include_subscribed=false
|
|
filters = dict(include_public="true", include_subscribed="false")
|
|
result = self.api_get(user, "/api/v1/streams", filters)
|
|
json = self.assert_json_success(result)
|
|
all_streams = [
|
|
stream.name for stream in Stream.objects.filter(realm=realm, invite_only=False)
|
|
]
|
|
self.assertEqual(sorted(s["name"] for s in json["streams"]), sorted(all_streams))
|
|
|
|
def test_include_can_access_content_streams_api(self) -> None:
|
|
"""
|
|
Ensure that the query we use to get public streams successfully returns
|
|
a list of streams
|
|
"""
|
|
# Cordelia is not subscribed to private stream `core team`.
|
|
user = self.example_user("cordelia")
|
|
realm = get_realm("zulip")
|
|
self.login_user(user)
|
|
user_group_members_dict = UserGroupMembersData(
|
|
direct_members=[user.id], direct_subgroups=[]
|
|
)
|
|
|
|
private_stream_1 = self.make_stream("private_stream_1", realm=realm, invite_only=True)
|
|
private_stream_2 = self.make_stream("private_stream_2", realm=realm, invite_only=True)
|
|
private_stream_3 = self.make_stream("private_stream_3", realm=realm, invite_only=True)
|
|
self.make_stream("private_stream_4", realm=realm, invite_only=True)
|
|
|
|
self.subscribe(user, private_stream_1.name)
|
|
do_change_stream_group_based_setting(
|
|
private_stream_2, "can_add_subscribers_group", user_group_members_dict, acting_user=user
|
|
)
|
|
do_change_stream_group_based_setting(
|
|
private_stream_3,
|
|
"can_administer_channel_group",
|
|
user_group_members_dict,
|
|
acting_user=user,
|
|
)
|
|
|
|
# Check it correctly lists all content access streams with
|
|
# include_can_access_content=false
|
|
filters = dict(include_can_access_content="true")
|
|
with self.assert_database_query_count(8):
|
|
result = self.api_get(user, "/api/v1/streams", filters)
|
|
json = self.assert_json_success(result)
|
|
result_streams = [
|
|
stream.name for stream in Stream.objects.filter(realm=realm, invite_only=False)
|
|
]
|
|
result_streams.extend([private_stream_1.name, private_stream_2.name])
|
|
self.assertEqual(sorted(s["name"] for s in json["streams"]), sorted(result_streams))
|
|
|
|
def test_get_single_stream_api(self) -> None:
|
|
self.login("hamlet")
|
|
realm = get_realm("zulip")
|
|
denmark_stream = get_stream("Denmark", realm)
|
|
result = self.client_get(f"/json/streams/{denmark_stream.id}")
|
|
json = self.assert_json_success(result)
|
|
self.assertEqual(json["stream"]["name"], "Denmark")
|
|
self.assertEqual(json["stream"]["stream_id"], denmark_stream.id)
|
|
|
|
result = self.client_get("/json/streams/9999")
|
|
self.assert_json_error(result, "Invalid channel ID")
|
|
|
|
private_stream = self.make_stream("private_stream", invite_only=True)
|
|
self.subscribe(self.example_user("cordelia"), "private_stream")
|
|
|
|
# Non-admins cannot access unsubscribed private streams.
|
|
result = self.client_get(f"/json/streams/{private_stream.id}")
|
|
self.assert_json_error(result, "Invalid channel ID")
|
|
|
|
self.login("iago")
|
|
result = self.client_get(f"/json/streams/{private_stream.id}")
|
|
json = self.assert_json_success(result)
|
|
self.assertEqual(json["stream"]["name"], "private_stream")
|
|
self.assertEqual(json["stream"]["stream_id"], private_stream.id)
|
|
|
|
self.login("cordelia")
|
|
result = self.client_get(f"/json/streams/{private_stream.id}")
|
|
json = self.assert_json_success(result)
|
|
self.assertEqual(json["stream"]["name"], "private_stream")
|
|
self.assertEqual(json["stream"]["stream_id"], private_stream.id)
|
|
|
|
def test_get_stream_email_address(self) -> None:
|
|
self.login("hamlet")
|
|
hamlet = self.example_user("hamlet")
|
|
iago = self.example_user("iago")
|
|
polonius = self.example_user("polonius")
|
|
realm = get_realm("zulip")
|
|
email_gateway_bot = get_system_bot(settings.EMAIL_GATEWAY_BOT, realm.id)
|
|
denmark_stream = get_stream("Denmark", realm)
|
|
result = self.client_get(f"/json/streams/{denmark_stream.id}/email_address")
|
|
json = self.assert_json_success(result)
|
|
email_token = get_channel_email_token(
|
|
denmark_stream, creator=hamlet, sender=email_gateway_bot
|
|
)
|
|
hamlet_denmark_email = encode_email_address(
|
|
denmark_stream.name, email_token, show_sender=True
|
|
)
|
|
self.assertEqual(json["email"], hamlet_denmark_email)
|
|
|
|
self.login("polonius")
|
|
result = self.client_get(f"/json/streams/{denmark_stream.id}/email_address")
|
|
self.assert_json_error(result, "Invalid channel ID")
|
|
|
|
self.subscribe(polonius, "Denmark")
|
|
result = self.client_get(f"/json/streams/{denmark_stream.id}/email_address")
|
|
json = self.assert_json_success(result)
|
|
email_token = get_channel_email_token(
|
|
denmark_stream, creator=polonius, sender=email_gateway_bot
|
|
)
|
|
polonius_denmark_email = encode_email_address(
|
|
denmark_stream.name, email_token, show_sender=True
|
|
)
|
|
self.assertEqual(json["email"], polonius_denmark_email)
|
|
|
|
do_change_stream_permission(
|
|
denmark_stream,
|
|
invite_only=True,
|
|
history_public_to_subscribers=True,
|
|
is_web_public=False,
|
|
acting_user=iago,
|
|
)
|
|
self.login("hamlet")
|
|
result = self.client_get(f"/json/streams/{denmark_stream.id}/email_address")
|
|
json = self.assert_json_success(result)
|
|
self.assertEqual(json["email"], hamlet_denmark_email)
|
|
|
|
self.unsubscribe(hamlet, "Denmark")
|
|
result = self.client_get(f"/json/streams/{denmark_stream.id}/email_address")
|
|
self.assert_json_error(result, "Invalid channel ID")
|
|
|
|
self.login("iago")
|
|
result = self.client_get(f"/json/streams/{denmark_stream.id}/email_address")
|
|
json = self.assert_json_success(result)
|
|
email_token = get_channel_email_token(
|
|
denmark_stream, creator=iago, sender=email_gateway_bot
|
|
)
|
|
iago_denmark_email = encode_email_address(
|
|
denmark_stream.name, email_token, show_sender=True
|
|
)
|
|
self.assertEqual(json["email"], iago_denmark_email)
|
|
|
|
self.unsubscribe(iago, "Denmark")
|
|
result = self.client_get(f"/json/streams/{denmark_stream.id}/email_address")
|
|
self.assert_json_error(result, "Invalid channel ID")
|
|
|
|
def test_guest_user_access_to_streams(self) -> None:
|
|
user_profile = self.example_user("polonius")
|
|
self.login_user(user_profile)
|
|
self.assertEqual(user_profile.role, UserProfile.ROLE_GUEST)
|
|
|
|
# Get all the streams that Polonius has access to (subscribed + web-public streams)
|
|
result = self.client_get("/json/streams", {"include_web_public": "true"})
|
|
streams = self.assert_json_success(result)["streams"]
|
|
sub_info = gather_subscriptions_helper(user_profile)
|
|
|
|
subscribed = sub_info.subscriptions
|
|
unsubscribed = sub_info.unsubscribed
|
|
never_subscribed = sub_info.never_subscribed
|
|
|
|
self.assert_length(streams, len(subscribed) + len(unsubscribed) + len(never_subscribed))
|
|
stream_names = [stream["name"] for stream in streams]
|
|
expected_stream_names = [stream["name"] for stream in subscribed + unsubscribed]
|
|
expected_stream_names += [stream["name"] for stream in never_subscribed]
|
|
self.assertEqual(set(stream_names), set(expected_stream_names))
|
|
|
|
|
|
class StreamIdTest(ZulipTestCase):
|
|
def test_get_stream_id(self) -> None:
|
|
user = self.example_user("hamlet")
|
|
self.login_user(user)
|
|
stream = gather_subscriptions(user)[0][0]
|
|
result = self.client_get("/json/get_stream_id", {"stream": stream["name"]})
|
|
response_dict = self.assert_json_success(result)
|
|
self.assertEqual(response_dict["stream_id"], stream["stream_id"])
|
|
|
|
def test_get_stream_id_wrong_name(self) -> None:
|
|
user = self.example_user("hamlet")
|
|
self.login_user(user)
|
|
result = self.client_get("/json/get_stream_id", {"stream": "wrongname"})
|
|
self.assert_json_error(result, "Invalid channel name 'wrongname'")
|
|
|
|
|
|
class GetSubscribersTest(ZulipTestCase):
|
|
@override
|
|
def setUp(self) -> None:
|
|
super().setUp()
|
|
self.user_profile = self.example_user("hamlet")
|
|
self.login_user(self.user_profile)
|
|
|
|
def test_api_fields(self) -> None:
|
|
"""Verify that all the fields from `Stream.API_FIELDS` and `Subscription.API_FIELDS` present
|
|
in `APIStreamDict` and `APISubscriptionDict`, respectively.
|
|
"""
|
|
expected_fields = set(Stream.API_FIELDS) | {"stream_id", "is_archived"}
|
|
expected_fields -= {"id", "deactivated"}
|
|
expected_fields = fix_expected_fields_for_stream_group_settings(expected_fields)
|
|
|
|
stream_dict_fields = set(APIStreamDict.__annotations__.keys())
|
|
computed_fields = {
|
|
"is_announcement_only",
|
|
"is_default",
|
|
"stream_post_policy",
|
|
"stream_weekly_traffic",
|
|
}
|
|
|
|
self.assertEqual(stream_dict_fields - computed_fields, expected_fields)
|
|
|
|
expected_fields = set(Subscription.API_FIELDS)
|
|
|
|
subscription_dict_fields = set(APISubscriptionDict.__annotations__.keys())
|
|
computed_fields = {"in_home_view", "email_address", "stream_weekly_traffic", "subscribers"}
|
|
# `APISubscriptionDict` is a subclass of `APIStreamDict`, therefore having all the
|
|
# fields in addition to the computed fields and `Subscription.API_FIELDS` that
|
|
# need to be excluded here.
|
|
self.assertEqual(
|
|
subscription_dict_fields - computed_fields - stream_dict_fields,
|
|
expected_fields,
|
|
)
|
|
|
|
def verify_sub_fields(self, sub_data: SubscriptionInfo) -> None:
|
|
other_fields = {
|
|
"is_archived",
|
|
"is_announcement_only",
|
|
"in_home_view",
|
|
"stream_id",
|
|
"stream_post_policy",
|
|
"stream_weekly_traffic",
|
|
"subscribers",
|
|
}
|
|
|
|
expected_fields = set(Stream.API_FIELDS) | set(Subscription.API_FIELDS) | other_fields
|
|
expected_fields -= {"id", "deactivated"}
|
|
expected_fields = fix_expected_fields_for_stream_group_settings(expected_fields)
|
|
|
|
for lst in [sub_data.subscriptions, sub_data.unsubscribed]:
|
|
for sub in lst:
|
|
self.assertEqual(set(sub), expected_fields)
|
|
|
|
other_fields = {
|
|
"is_archived",
|
|
"is_announcement_only",
|
|
"stream_id",
|
|
"stream_post_policy",
|
|
"stream_weekly_traffic",
|
|
"subscribers",
|
|
}
|
|
|
|
expected_fields = set(Stream.API_FIELDS) | other_fields
|
|
expected_fields -= {"id", "deactivated"}
|
|
expected_fields = fix_expected_fields_for_stream_group_settings(expected_fields)
|
|
|
|
for never_sub in sub_data.never_subscribed:
|
|
self.assertEqual(set(never_sub), expected_fields)
|
|
|
|
def assert_user_got_subscription_notification(
|
|
self, user: UserProfile, expected_msg: str
|
|
) -> None:
|
|
# verify that the user was sent a message informing them about the subscription
|
|
realm = user.realm
|
|
msg = most_recent_message(user)
|
|
self.assertEqual(msg.recipient.type, msg.recipient.PERSONAL)
|
|
self.assertEqual(msg.sender_id, self.notification_bot(realm).id)
|
|
|
|
def non_ws(s: str) -> str:
|
|
return s.replace("\n", "").replace(" ", "")
|
|
|
|
assert msg.rendered_content is not None
|
|
self.assertEqual(non_ws(msg.rendered_content), non_ws(expected_msg))
|
|
|
|
def check_well_formed_result(
|
|
self, result: dict[str, Any], stream_name: str, realm: Realm
|
|
) -> None:
|
|
"""
|
|
A successful call to get_subscribers returns the list of subscribers in
|
|
the form:
|
|
|
|
{"msg": "",
|
|
"result": "success",
|
|
"subscribers": [hamlet_user.id, prospero_user.id]}
|
|
"""
|
|
self.assertIn("subscribers", result)
|
|
self.assertIsInstance(result["subscribers"], list)
|
|
true_subscribers = [
|
|
user_profile.id for user_profile in self.users_subscribed_to_stream(stream_name, realm)
|
|
]
|
|
self.assertEqual(sorted(result["subscribers"]), sorted(true_subscribers))
|
|
|
|
def make_subscriber_request(
|
|
self, stream_id: int, user: UserProfile | None = None
|
|
) -> "TestHttpResponse":
|
|
if user is None:
|
|
user = self.user_profile
|
|
return self.api_get(user, f"/api/v1/streams/{stream_id}/members")
|
|
|
|
def make_successful_subscriber_request(self, stream_name: str) -> None:
|
|
stream_id = get_stream(stream_name, self.user_profile.realm).id
|
|
result = self.make_subscriber_request(stream_id)
|
|
response_dict = self.assert_json_success(result)
|
|
self.check_well_formed_result(response_dict, stream_name, self.user_profile.realm)
|
|
|
|
def test_subscriber(self) -> None:
|
|
"""
|
|
get_subscribers returns the list of subscribers.
|
|
"""
|
|
stream_name = gather_subscriptions(self.user_profile)[0][0]["name"]
|
|
self.make_successful_subscriber_request(stream_name)
|
|
|
|
@override_settings(MIN_PARTIAL_SUBSCRIBERS_CHANNEL_SIZE=5)
|
|
def test_gather_partial_subscriptions(self) -> None:
|
|
othello = self.example_user("othello")
|
|
user_names = ["iago", "cordelia", "polonius", "shiva", "prospero"]
|
|
idle_users = [self.example_user(name) for name in user_names]
|
|
for user in idle_users:
|
|
user.long_term_idle = True
|
|
user.save()
|
|
bot = self.create_test_bot("bot", othello, "Foo Bot")
|
|
|
|
stream_names = [
|
|
"never_subscribed_only_bots",
|
|
"never_subscribed_many_more_than_bots",
|
|
"unsubscribed_only_bots",
|
|
"subscribed_more_than_bots_including_idle",
|
|
"subscribed_many_more_than_bots",
|
|
]
|
|
for stream_name in stream_names:
|
|
self.make_stream(stream_name)
|
|
|
|
self.subscribe_via_post(
|
|
self.user_profile,
|
|
["never_subscribed_only_bots"],
|
|
dict(principals=orjson.dumps([bot.id]).decode()),
|
|
)
|
|
self.subscribe_via_post(
|
|
self.user_profile,
|
|
["never_subscribed_many_more_than_bots"],
|
|
dict(
|
|
principals=orjson.dumps(
|
|
[bot.id, othello.id] + [user.id for user in idle_users]
|
|
).decode()
|
|
),
|
|
)
|
|
self.subscribe_via_post(
|
|
self.user_profile,
|
|
["unsubscribed_only_bots"],
|
|
dict(principals=orjson.dumps([bot.id, self.user_profile.id]).decode()),
|
|
)
|
|
self.unsubscribe(
|
|
self.user_profile,
|
|
"unsubscribed_only_bots",
|
|
)
|
|
self.subscribe_via_post(
|
|
self.user_profile,
|
|
["subscribed_more_than_bots_including_idle"],
|
|
dict(
|
|
principals=orjson.dumps(
|
|
[bot.id, othello.id, self.user_profile.id, idle_users[0].id]
|
|
).decode()
|
|
),
|
|
)
|
|
self.subscribe_via_post(
|
|
self.user_profile,
|
|
["subscribed_many_more_than_bots"],
|
|
dict(
|
|
principals=orjson.dumps(
|
|
[bot.id, othello.id, self.user_profile.id] + [user.id for user in idle_users]
|
|
).decode()
|
|
),
|
|
)
|
|
|
|
with self.assert_database_query_count(9):
|
|
sub_data = gather_subscriptions_helper(self.user_profile, include_subscribers="partial")
|
|
never_subscribed_streams = sub_data.never_subscribed
|
|
unsubscribed_streams = sub_data.unsubscribed
|
|
subscribed_streams = sub_data.subscriptions
|
|
self.assertGreaterEqual(len(never_subscribed_streams), 2)
|
|
self.assertGreaterEqual(len(unsubscribed_streams), 1)
|
|
self.assertGreaterEqual(len(subscribed_streams), 1)
|
|
|
|
# Streams with only bots have sent all of their subscribers,
|
|
# since we always send bots. We tell the client it doesn't
|
|
# need to fetch more, by filling "subscribers" instead
|
|
# of "partial_subscribers". If there are non-bot subscribers,
|
|
# a partial fetch will return only partial subscribers.
|
|
|
|
for sub in never_subscribed_streams:
|
|
if sub["name"] == "never_subscribed_only_bots":
|
|
self.assert_length(sub["subscribers"], 1)
|
|
self.assertIsNone(sub.get("partial_subscribers"))
|
|
continue
|
|
if sub["name"] == "never_subscribed_many_more_than_bots":
|
|
# the bot and Othello (who is not long_term_idle)
|
|
self.assert_length(sub["partial_subscribers"], 2)
|
|
self.assertIsNone(sub.get("subscribers"))
|
|
|
|
for sub in unsubscribed_streams:
|
|
if sub["name"] == "unsubscribed_only_bots":
|
|
self.assert_length(sub["subscribers"], 1)
|
|
self.assertIsNone(sub.get("partial_subscribers"))
|
|
break
|
|
|
|
for sub in subscribed_streams:
|
|
# fewer than MIN_PARTIAL_SUBSCRIBERS_CHANNEL_SIZE subscribers,
|
|
# so we get all of them
|
|
if sub["name"] == "subscribed_more_than_bots_including_idle":
|
|
self.assertNotIn("partial_subscribers", sub)
|
|
self.assert_length(sub["subscribers"], 4)
|
|
if sub["name"] == "subscribed_many_more_than_bots":
|
|
# the bot, Othello (who is not long_term_idle), and current user
|
|
self.assert_length(sub["partial_subscribers"], 3)
|
|
self.assertNotIn("subscribers", sub)
|
|
|
|
@override_settings(MIN_PARTIAL_SUBSCRIBERS_CHANNEL_SIZE=5)
|
|
def test_gather_partial_subscriptions_api(self) -> None:
|
|
othello = self.example_user("othello")
|
|
user_names = ["iago", "cordelia", "polonius", "shiva", "prospero"]
|
|
idle_users = [self.example_user(name) for name in user_names]
|
|
for user in idle_users:
|
|
user.long_term_idle = True
|
|
user.save()
|
|
bot = self.create_test_bot("bot", othello, "Foo Bot")
|
|
|
|
stream_names = [
|
|
"subscribed_more_than_bots_including_idle",
|
|
"subscribed_many_more_than_bots",
|
|
]
|
|
for stream_name in stream_names:
|
|
self.make_stream(stream_name)
|
|
|
|
for user in [bot, othello, self.user_profile, idle_users[0]]:
|
|
self.subscribe(user, stream_names[0])
|
|
|
|
for user in [bot, othello, self.user_profile, *idle_users]:
|
|
self.subscribe(user, stream_names[1])
|
|
|
|
with self.assert_database_query_count(11):
|
|
result = self.api_get(
|
|
self.user_profile,
|
|
"/api/v1/users/me/subscriptions",
|
|
{"include_subscribers": "partial"},
|
|
)
|
|
sub_data = self.assert_json_success(result)
|
|
subscribed_streams = sub_data["subscriptions"]
|
|
self.assertGreaterEqual(len(subscribed_streams), 2)
|
|
|
|
# Streams with only bots have sent all of their subscribers,
|
|
# since we always send bots. We tell the client it doesn't
|
|
# need to fetch more, by filling "subscribers" instead
|
|
# of "partial_subscribers". If there are non-bot subscribers,
|
|
# a partial fetch will return only partial subscribers.
|
|
for sub in subscribed_streams:
|
|
# fewer than MIN_PARTIAL_SUBSCRIBERS_CHANNEL_SIZE subscribers,
|
|
# so we get all of them
|
|
if sub["name"] == "subscribed_more_than_bots_including_idle":
|
|
self.assertIsNone(sub.get("partial_subscribers"))
|
|
self.assert_length(sub["subscribers"], 4)
|
|
continue
|
|
if sub["name"] == "subscribed_many_more_than_bots":
|
|
# the bot, Othello (who is not long_term_idle), and current user
|
|
self.assert_length(sub["partial_subscribers"], 3)
|
|
self.assertIsNone(sub.get("subscribers"))
|
|
|
|
def test_gather_subscriptions(self) -> None:
|
|
"""
|
|
gather_subscriptions returns correct results with only 3 queries
|
|
|
|
(We also use this test to verify subscription notifications to
|
|
folks who get subscribed to streams.)
|
|
"""
|
|
hamlet = self.example_user("hamlet")
|
|
cordelia = self.example_user("cordelia")
|
|
othello = self.example_user("othello")
|
|
polonius = self.example_user("polonius")
|
|
realm = hamlet.realm
|
|
|
|
stream_names = [f"stream_{i}" for i in range(10)]
|
|
streams: list[Stream] = [self.make_stream(stream_name) for stream_name in stream_names]
|
|
|
|
users_to_subscribe = [
|
|
self.user_profile.id,
|
|
othello.id,
|
|
cordelia.id,
|
|
polonius.id,
|
|
]
|
|
|
|
with self.assert_database_query_count(52):
|
|
self.subscribe_via_post(
|
|
self.user_profile,
|
|
stream_names,
|
|
dict(principals=orjson.dumps(users_to_subscribe).decode()),
|
|
)
|
|
|
|
rendered_stream_list = ""
|
|
for stream in streams:
|
|
rendered_stream_list = (
|
|
rendered_stream_list
|
|
+ f"""<li><a class="stream" data-stream-id="{stream.id}" href="/#narrow/channel/{stream.id}-{stream.name}">#{stream.name}</a></li>\n"""
|
|
)
|
|
msg = f"""
|
|
<p><span class="user-mention silent" data-user-id="{hamlet.id}">King Hamlet</span> subscribed you to the following channels:</p>
|
|
<ul>
|
|
{rendered_stream_list}
|
|
</ul>
|
|
"""
|
|
|
|
for user in [cordelia, othello, polonius]:
|
|
self.assert_user_got_subscription_notification(user, msg)
|
|
|
|
# Subscribe ourself first.
|
|
self.subscribe_via_post(
|
|
self.user_profile,
|
|
["stream_invite_only_1"],
|
|
dict(principals=orjson.dumps([self.user_profile.id]).decode()),
|
|
invite_only=True,
|
|
)
|
|
|
|
# Now add in other users, and this should trigger messages
|
|
# to notify the user.
|
|
self.subscribe_via_post(
|
|
self.user_profile,
|
|
["stream_invite_only_1"],
|
|
dict(principals=orjson.dumps(users_to_subscribe).decode()),
|
|
invite_only=True,
|
|
)
|
|
|
|
stream_invite_only_1 = get_stream("stream_invite_only_1", realm)
|
|
msg = f"""
|
|
<p><span class="user-mention silent" data-user-id="{hamlet.id}">King Hamlet</span> subscribed you to <a class="stream" data-stream-id="{stream_invite_only_1.id}" href="/#narrow/channel/{stream_invite_only_1.id}-{stream_invite_only_1.name}">#{stream_invite_only_1.name}</a>.</p>
|
|
"""
|
|
for user in [cordelia, othello, polonius]:
|
|
self.assert_user_got_subscription_notification(user, msg)
|
|
|
|
with self.assert_database_query_count(9):
|
|
subscribed_streams, _ = gather_subscriptions(
|
|
self.user_profile, include_subscribers=True
|
|
)
|
|
self.assertGreaterEqual(len(subscribed_streams), 11)
|
|
for sub in subscribed_streams:
|
|
if not sub["name"].startswith("stream_"):
|
|
continue
|
|
self.assert_length(sub["subscribers"], len(users_to_subscribe))
|
|
|
|
# Test query count when setting is set to anonymous group.
|
|
stream = get_stream("stream_1", realm)
|
|
admins_group = NamedUserGroup.objects.get(
|
|
name=SystemGroups.ADMINISTRATORS, realm_for_sharding=realm, is_system_group=True
|
|
)
|
|
setting_group_members_dict = UserGroupMembersData(
|
|
direct_members=[hamlet.id], direct_subgroups=[admins_group.id]
|
|
)
|
|
do_change_stream_group_based_setting(
|
|
stream,
|
|
"can_remove_subscribers_group",
|
|
setting_group_members_dict,
|
|
acting_user=hamlet,
|
|
)
|
|
stream = get_stream("stream_2", realm)
|
|
setting_group_members_dict = UserGroupMembersData(
|
|
direct_members=[cordelia.id], direct_subgroups=[admins_group.id]
|
|
)
|
|
do_change_stream_group_based_setting(
|
|
stream,
|
|
"can_remove_subscribers_group",
|
|
setting_group_members_dict,
|
|
acting_user=hamlet,
|
|
)
|
|
|
|
with self.assert_database_query_count(9):
|
|
subscribed_streams, _ = gather_subscriptions(
|
|
self.user_profile, include_subscribers=True
|
|
)
|
|
self.assertGreaterEqual(len(subscribed_streams), 11)
|
|
for sub in subscribed_streams:
|
|
if not sub["name"].startswith("stream_"):
|
|
continue
|
|
self.assert_length(sub["subscribers"], len(users_to_subscribe))
|
|
if sub["name"] == "stream_1":
|
|
self.assertEqual(
|
|
sub["can_remove_subscribers_group"],
|
|
UserGroupMembersDict(
|
|
direct_members=[hamlet.id],
|
|
direct_subgroups=[admins_group.id],
|
|
),
|
|
)
|
|
elif sub["name"] == "stream_2":
|
|
self.assertEqual(
|
|
sub["can_remove_subscribers_group"],
|
|
UserGroupMembersDict(
|
|
direct_members=[cordelia.id],
|
|
direct_subgroups=[admins_group.id],
|
|
),
|
|
)
|
|
else:
|
|
self.assertEqual(sub["can_remove_subscribers_group"], admins_group.id)
|
|
|
|
def test_stream_post_policy_values_in_subscription_objects(self) -> None:
|
|
hamlet = self.example_user("hamlet")
|
|
cordelia = self.example_user("cordelia")
|
|
desdemona = self.example_user("desdemona")
|
|
|
|
streams = [f"stream_{i}" for i in range(6)]
|
|
for stream_name in streams:
|
|
self.make_stream(stream_name)
|
|
|
|
realm = hamlet.realm
|
|
self.subscribe_via_post(
|
|
hamlet,
|
|
streams,
|
|
dict(principals=orjson.dumps([hamlet.id, cordelia.id]).decode()),
|
|
)
|
|
|
|
admins_group = NamedUserGroup.objects.get(
|
|
name=SystemGroups.ADMINISTRATORS, realm_for_sharding=realm, is_system_group=True
|
|
)
|
|
members_group = NamedUserGroup.objects.get(
|
|
name=SystemGroups.MEMBERS, realm_for_sharding=realm, is_system_group=True
|
|
)
|
|
full_members_group = NamedUserGroup.objects.get(
|
|
name=SystemGroups.FULL_MEMBERS, realm_for_sharding=realm, is_system_group=True
|
|
)
|
|
|
|
stream = get_stream("stream_1", realm)
|
|
do_change_stream_group_based_setting(
|
|
stream, "can_send_message_group", admins_group, acting_user=desdemona
|
|
)
|
|
|
|
stream = get_stream("stream_2", realm)
|
|
do_change_stream_group_based_setting(
|
|
stream, "can_send_message_group", members_group, acting_user=desdemona
|
|
)
|
|
|
|
stream = get_stream("stream_3", realm)
|
|
do_change_stream_group_based_setting(
|
|
stream, "can_send_message_group", full_members_group, acting_user=desdemona
|
|
)
|
|
|
|
hamletcharacters_group = NamedUserGroup.objects.get(
|
|
name="hamletcharacters", realm_for_sharding=realm
|
|
)
|
|
stream = get_stream("stream_4", realm)
|
|
do_change_stream_group_based_setting(
|
|
stream, "can_send_message_group", hamletcharacters_group, acting_user=desdemona
|
|
)
|
|
|
|
setting_group_members_dict = UserGroupMembersData(
|
|
direct_members=[cordelia.id], direct_subgroups=[admins_group.id]
|
|
)
|
|
stream = get_stream("stream_5", realm)
|
|
do_change_stream_group_based_setting(
|
|
stream, "can_send_message_group", setting_group_members_dict, acting_user=desdemona
|
|
)
|
|
|
|
with self.assert_database_query_count(9):
|
|
subscribed_streams, _ = gather_subscriptions(hamlet, include_subscribers=True)
|
|
|
|
[stream_1_sub] = [sub for sub in subscribed_streams if sub["name"] == "stream_1"]
|
|
self.assertEqual(stream_1_sub["can_send_message_group"], admins_group.id)
|
|
self.assertEqual(stream_1_sub["stream_post_policy"], Stream.STREAM_POST_POLICY_ADMINS)
|
|
|
|
[stream_2_sub] = [sub for sub in subscribed_streams if sub["name"] == "stream_2"]
|
|
self.assertEqual(stream_2_sub["can_send_message_group"], members_group.id)
|
|
self.assertEqual(stream_2_sub["stream_post_policy"], Stream.STREAM_POST_POLICY_EVERYONE)
|
|
|
|
[stream_3_sub] = [sub for sub in subscribed_streams if sub["name"] == "stream_3"]
|
|
self.assertEqual(stream_3_sub["can_send_message_group"], full_members_group.id)
|
|
self.assertEqual(
|
|
stream_3_sub["stream_post_policy"], Stream.STREAM_POST_POLICY_RESTRICT_NEW_MEMBERS
|
|
)
|
|
|
|
[stream_4_sub] = [sub for sub in subscribed_streams if sub["name"] == "stream_4"]
|
|
self.assertEqual(stream_4_sub["can_send_message_group"], hamletcharacters_group.id)
|
|
self.assertEqual(stream_4_sub["stream_post_policy"], Stream.STREAM_POST_POLICY_EVERYONE)
|
|
|
|
[stream_5_sub] = [sub for sub in subscribed_streams if sub["name"] == "stream_5"]
|
|
self.assertEqual(
|
|
stream_5_sub["can_send_message_group"],
|
|
UserGroupMembersDict(
|
|
direct_members=[cordelia.id],
|
|
direct_subgroups=[admins_group.id],
|
|
),
|
|
)
|
|
self.assertEqual(stream_5_sub["stream_post_policy"], Stream.STREAM_POST_POLICY_EVERYONE)
|
|
|
|
def test_never_subscribed_streams(self) -> None:
|
|
"""
|
|
Check never_subscribed streams are fetched correctly and not include invite_only streams,
|
|
or invite_only and public streams to guest users.
|
|
"""
|
|
realm = get_realm("zulip")
|
|
users_to_subscribe = [
|
|
self.example_user("othello").id,
|
|
self.example_user("cordelia").id,
|
|
]
|
|
|
|
public_streams = [
|
|
"test_stream_public_1",
|
|
"test_stream_public_2",
|
|
"test_stream_public_3",
|
|
"test_stream_public_4",
|
|
"test_stream_public_5",
|
|
]
|
|
|
|
private_streams = [
|
|
"test_stream_invite_only_1",
|
|
"test_stream_invite_only_2",
|
|
]
|
|
|
|
web_public_streams = [
|
|
"test_stream_web_public_1",
|
|
"test_stream_web_public_2",
|
|
]
|
|
|
|
nobody_group = NamedUserGroup.objects.get(
|
|
name="role:nobody", is_system_group=True, realm_for_sharding=realm
|
|
)
|
|
|
|
def create_public_streams() -> None:
|
|
for stream_name in public_streams:
|
|
self.make_stream(stream_name, realm=realm)
|
|
|
|
self.subscribe_via_post(
|
|
self.user_profile,
|
|
public_streams,
|
|
dict(
|
|
principals=orjson.dumps(users_to_subscribe).decode(),
|
|
can_administer_channel_group=nobody_group.id,
|
|
),
|
|
)
|
|
|
|
create_public_streams()
|
|
|
|
def create_web_public_streams() -> None:
|
|
for stream_name in web_public_streams:
|
|
self.make_stream(stream_name, realm=realm, is_web_public=True)
|
|
|
|
ret = self.subscribe_via_post(
|
|
self.user_profile,
|
|
web_public_streams,
|
|
dict(
|
|
principals=orjson.dumps(users_to_subscribe).decode(),
|
|
can_administer_channel_group=nobody_group.id,
|
|
),
|
|
)
|
|
self.assert_json_success(ret)
|
|
|
|
create_web_public_streams()
|
|
|
|
def create_private_streams() -> None:
|
|
self.subscribe_via_post(
|
|
self.user_profile,
|
|
private_streams,
|
|
dict(
|
|
principals=orjson.dumps(users_to_subscribe).decode(),
|
|
can_administer_channel_group=nobody_group.id,
|
|
),
|
|
invite_only=True,
|
|
)
|
|
|
|
create_private_streams()
|
|
|
|
def get_never_subscribed(query_count: int = 9) -> list[NeverSubscribedStreamDict]:
|
|
with self.assert_database_query_count(query_count):
|
|
sub_data = gather_subscriptions_helper(self.user_profile)
|
|
self.verify_sub_fields(sub_data)
|
|
never_subscribed = sub_data.never_subscribed
|
|
|
|
# Ignore old streams.
|
|
never_subscribed = [dct for dct in never_subscribed if dct["name"].startswith("test_")]
|
|
return never_subscribed
|
|
|
|
never_subscribed = get_never_subscribed()
|
|
|
|
# Invite only stream should not be there in never_subscribed streams
|
|
self.assert_length(never_subscribed, len(public_streams) + len(web_public_streams))
|
|
for stream_dict in never_subscribed:
|
|
name = stream_dict["name"]
|
|
self.assertFalse("invite_only" in name)
|
|
self.assert_length(stream_dict["subscribers"], len(users_to_subscribe))
|
|
|
|
# Send private stream subscribers to all realm admins.
|
|
def test_realm_admin_case() -> None:
|
|
self.user_profile.role = UserProfile.ROLE_REALM_ADMINISTRATOR
|
|
# Test realm admins can get never subscribed private stream's subscribers.
|
|
never_subscribed = get_never_subscribed(7)
|
|
|
|
self.assertEqual(
|
|
len(never_subscribed),
|
|
len(public_streams) + len(private_streams) + len(web_public_streams),
|
|
)
|
|
for stream_dict in never_subscribed:
|
|
self.assert_length(stream_dict["subscribers"], len(users_to_subscribe))
|
|
|
|
test_realm_admin_case()
|
|
|
|
# Send private stream subscribers to all realm admins.
|
|
def test_channel_admin_case() -> None:
|
|
self.user_profile.role = UserProfile.ROLE_MEMBER
|
|
user_group_members_dict = UserGroupMembersData(
|
|
direct_members=[self.user_profile.id], direct_subgroups=[]
|
|
)
|
|
do_change_stream_group_based_setting(
|
|
get_stream("test_stream_invite_only_1", realm),
|
|
"can_administer_channel_group",
|
|
user_group_members_dict,
|
|
acting_user=self.user_profile,
|
|
)
|
|
# Test channel admins can get never subscribed private stream's subscribers.
|
|
never_subscribed = get_never_subscribed()
|
|
|
|
self.assertEqual(
|
|
len(never_subscribed),
|
|
len(public_streams) + 1 + len(web_public_streams),
|
|
)
|
|
for stream_dict in never_subscribed:
|
|
self.assert_length(stream_dict["subscribers"], len(users_to_subscribe))
|
|
|
|
test_channel_admin_case()
|
|
|
|
def test_can_add_subscribers_case() -> None:
|
|
self.user_profile.role = UserProfile.ROLE_MEMBER
|
|
user_group_members_dict = UserGroupMembersData(
|
|
direct_members=[self.user_profile.id], direct_subgroups=[]
|
|
)
|
|
do_change_stream_group_based_setting(
|
|
get_stream("test_stream_invite_only_1", realm),
|
|
"can_add_subscribers_group",
|
|
user_group_members_dict,
|
|
acting_user=self.user_profile,
|
|
)
|
|
# Test channel admins can get never subscribed private stream's subscribers.
|
|
never_subscribed = get_never_subscribed()
|
|
|
|
self.assertEqual(
|
|
len(never_subscribed),
|
|
len(public_streams) + 1 + len(web_public_streams),
|
|
)
|
|
for stream_dict in never_subscribed:
|
|
self.assert_length(stream_dict["subscribers"], len(users_to_subscribe))
|
|
|
|
test_can_add_subscribers_case()
|
|
|
|
def test_guest_user_case() -> None:
|
|
self.user_profile.role = UserProfile.ROLE_GUEST
|
|
helper_result = gather_subscriptions_helper(self.user_profile)
|
|
self.verify_sub_fields(helper_result)
|
|
sub = helper_result.subscriptions
|
|
unsub = helper_result.unsubscribed
|
|
never_sub = helper_result.never_subscribed
|
|
|
|
# It's +1 because of the stream Rome.
|
|
self.assert_length(never_sub, len(web_public_streams) + 1)
|
|
sub_ids = [stream["stream_id"] for stream in sub]
|
|
unsub_ids = [stream["stream_id"] for stream in unsub]
|
|
|
|
for stream_dict in never_sub:
|
|
self.assertTrue(stream_dict["is_web_public"])
|
|
self.assertTrue(stream_dict["stream_id"] not in sub_ids)
|
|
self.assertTrue(stream_dict["stream_id"] not in unsub_ids)
|
|
|
|
# The Rome stream has is_web_public=True, with default
|
|
# subscribers not set up by this test, so we do the
|
|
# following check only for the streams we created.
|
|
if stream_dict["name"] in web_public_streams:
|
|
self.assert_length(stream_dict["subscribers"], len(users_to_subscribe))
|
|
|
|
test_guest_user_case()
|
|
|
|
def test_gather_subscribed_streams_for_guest_user(self) -> None:
|
|
guest_user = self.example_user("polonius")
|
|
|
|
stream_name_sub = "public_stream_1"
|
|
self.make_stream(stream_name_sub, realm=get_realm("zulip"))
|
|
self.subscribe(guest_user, stream_name_sub)
|
|
|
|
stream_name_unsub = "public_stream_2"
|
|
self.make_stream(stream_name_unsub, realm=get_realm("zulip"))
|
|
self.subscribe(guest_user, stream_name_unsub)
|
|
self.unsubscribe(guest_user, stream_name_unsub)
|
|
|
|
stream_name_never_sub = "public_stream_3"
|
|
self.make_stream(stream_name_never_sub, realm=get_realm("zulip"))
|
|
|
|
normal_user = self.example_user("aaron")
|
|
self.subscribe(normal_user, stream_name_sub)
|
|
self.subscribe(normal_user, stream_name_unsub)
|
|
self.subscribe(normal_user, stream_name_unsub)
|
|
|
|
helper_result = gather_subscriptions_helper(guest_user)
|
|
self.verify_sub_fields(helper_result)
|
|
subs = helper_result.subscriptions
|
|
neversubs = helper_result.never_subscribed
|
|
|
|
# Guest users get info about subscribed public stream's subscribers
|
|
expected_stream_exists = False
|
|
for sub in subs:
|
|
if sub["name"] == stream_name_sub:
|
|
expected_stream_exists = True
|
|
self.assert_length(sub["subscribers"], 2)
|
|
self.assertTrue(expected_stream_exists)
|
|
|
|
# Guest user only get data about never subscribed streams if they're
|
|
# web-public.
|
|
for stream in neversubs:
|
|
self.assertTrue(stream["is_web_public"])
|
|
|
|
# Guest user only get data about never subscribed web-public streams
|
|
self.assert_length(neversubs, 1)
|
|
|
|
def test_api_fields_present(self) -> None:
|
|
user = self.example_user("cordelia")
|
|
|
|
sub_data = gather_subscriptions_helper(user)
|
|
subscribed = sub_data.subscriptions
|
|
self.assertGreaterEqual(len(subscribed), 1)
|
|
self.verify_sub_fields(sub_data)
|
|
|
|
def test_previously_subscribed_private_streams(self) -> None:
|
|
admin_user = self.example_user("iago")
|
|
non_admin_user = self.example_user("cordelia")
|
|
guest_user = self.example_user("polonius")
|
|
stream_name = "private_stream"
|
|
|
|
stream = self.make_stream(stream_name, realm=get_realm("zulip"), invite_only=True)
|
|
self.subscribe(admin_user, stream_name)
|
|
self.subscribe(non_admin_user, stream_name)
|
|
self.subscribe(guest_user, stream_name)
|
|
self.subscribe(self.example_user("othello"), stream_name)
|
|
|
|
self.unsubscribe(admin_user, stream_name)
|
|
self.unsubscribe(non_admin_user, stream_name)
|
|
self.unsubscribe(guest_user, stream_name)
|
|
|
|
# Test admin user gets previously subscribed private stream's subscribers.
|
|
sub_data = gather_subscriptions_helper(admin_user)
|
|
self.verify_sub_fields(sub_data)
|
|
unsubscribed_streams = sub_data.unsubscribed
|
|
self.assert_length(unsubscribed_streams, 1)
|
|
self.assert_length(unsubscribed_streams[0]["subscribers"], 1)
|
|
|
|
# Test non-admin users cannot get previously subscribed private stream's subscribers.
|
|
sub_data = gather_subscriptions_helper(non_admin_user)
|
|
self.verify_sub_fields(sub_data)
|
|
unsubscribed_streams = sub_data.unsubscribed
|
|
self.assert_length(unsubscribed_streams, 0)
|
|
|
|
# Test channel admin gets previously subscribed private stream's subscribers.
|
|
non_admin_user_group_members_dict = UserGroupMembersData(
|
|
direct_members=[non_admin_user.id], direct_subgroups=[]
|
|
)
|
|
do_change_stream_group_based_setting(
|
|
stream,
|
|
"can_administer_channel_group",
|
|
non_admin_user_group_members_dict,
|
|
acting_user=admin_user,
|
|
)
|
|
sub_data = gather_subscriptions_helper(non_admin_user)
|
|
self.verify_sub_fields(sub_data)
|
|
unsubscribed_streams = sub_data.unsubscribed
|
|
self.assert_length(unsubscribed_streams, 1)
|
|
self.assert_length(unsubscribed_streams[0]["subscribers"], 1)
|
|
|
|
sub_data = gather_subscriptions_helper(guest_user)
|
|
self.verify_sub_fields(sub_data)
|
|
unsubscribed_streams = sub_data.unsubscribed
|
|
self.assert_length(unsubscribed_streams, 0)
|
|
|
|
def test_previously_subscribed_public_streams(self) -> None:
|
|
public_stream_name = "public_stream"
|
|
web_public_stream_name = "web_public_stream"
|
|
guest_user = self.example_user("polonius")
|
|
member_user = self.example_user("hamlet")
|
|
|
|
self.make_stream(public_stream_name, realm=get_realm("zulip"))
|
|
self.make_stream(web_public_stream_name, realm=get_realm("zulip"), is_web_public=True)
|
|
|
|
for stream_name in [public_stream_name, web_public_stream_name]:
|
|
self.subscribe(guest_user, stream_name)
|
|
self.subscribe(member_user, stream_name)
|
|
self.subscribe(self.example_user("othello"), stream_name)
|
|
|
|
for stream_name in [public_stream_name, web_public_stream_name]:
|
|
self.unsubscribe(guest_user, stream_name)
|
|
self.unsubscribe(member_user, stream_name)
|
|
|
|
# Test member user gets previously subscribed public stream and its subscribers.
|
|
sub_data = gather_subscriptions_helper(member_user)
|
|
self.verify_sub_fields(sub_data)
|
|
unsubscribed_streams = sub_data.unsubscribed
|
|
self.assert_length(unsubscribed_streams, 2)
|
|
self.assert_length(unsubscribed_streams[0]["subscribers"], 1)
|
|
self.assert_length(unsubscribed_streams[1]["subscribers"], 1)
|
|
|
|
# Test guest users cannot get previously subscribed public stream but can get
|
|
# web-public stream and its subscribers.
|
|
sub_data = gather_subscriptions_helper(guest_user)
|
|
self.verify_sub_fields(sub_data)
|
|
unsubscribed_streams = sub_data.unsubscribed
|
|
self.assert_length(unsubscribed_streams, 1)
|
|
self.assertEqual(unsubscribed_streams[0]["is_web_public"], True)
|
|
self.assert_length(unsubscribed_streams[0]["subscribers"], 1)
|
|
|
|
def test_nonsubscriber(self) -> None:
|
|
"""
|
|
Even a non-subscriber to a public stream can query a stream's membership
|
|
with get_subscribers.
|
|
"""
|
|
# Create a stream for which Hamlet is the only subscriber.
|
|
stream_name = "Saxony"
|
|
self.subscribe_via_post(self.user_profile, [stream_name])
|
|
other_user = self.example_user("othello")
|
|
|
|
# Fetch the subscriber list as a non-member.
|
|
self.login_user(other_user)
|
|
self.make_successful_subscriber_request(stream_name)
|
|
|
|
def test_subscriber_private_stream(self) -> None:
|
|
"""
|
|
A subscriber to a private stream can query that stream's membership.
|
|
"""
|
|
stream_name = "Saxony"
|
|
self.subscribe_via_post(self.user_profile, [stream_name], invite_only=True)
|
|
self.make_successful_subscriber_request(stream_name)
|
|
|
|
stream_id = get_stream(stream_name, self.user_profile.realm).id
|
|
# Verify another user can't get the data.
|
|
self.login("cordelia")
|
|
result = self.client_get(f"/json/streams/{stream_id}/members")
|
|
self.assert_json_error(result, "Invalid channel ID")
|
|
|
|
# But an organization administrator can
|
|
self.login("iago")
|
|
result = self.client_get(f"/json/streams/{stream_id}/members")
|
|
self.assert_json_success(result)
|
|
|
|
def test_json_get_subscribers_stream_not_exist(self) -> None:
|
|
"""
|
|
json_get_subscribers also returns the list of subscribers for a stream.
|
|
"""
|
|
stream_id = 99999999
|
|
result = self.client_get(f"/json/streams/{stream_id}/members")
|
|
self.assert_json_error(result, "Invalid channel ID")
|
|
|
|
def test_json_get_subscribers(self) -> None:
|
|
"""
|
|
json_get_subscribers in zerver/views/streams.py
|
|
also returns the list of subscribers for a stream, when requested.
|
|
"""
|
|
stream_name = gather_subscriptions(self.user_profile)[0][0]["name"]
|
|
stream_id = get_stream(stream_name, self.user_profile.realm).id
|
|
expected_subscribers = gather_subscriptions(self.user_profile, include_subscribers=True)[0][
|
|
0
|
|
]["subscribers"]
|
|
result = self.client_get(f"/json/streams/{stream_id}/members")
|
|
result_dict = self.assert_json_success(result)
|
|
self.assertIn("subscribers", result_dict)
|
|
self.assertIsInstance(result_dict["subscribers"], list)
|
|
subscribers: list[int] = []
|
|
for subscriber in result_dict["subscribers"]:
|
|
self.assertIsInstance(subscriber, int)
|
|
subscribers.append(subscriber)
|
|
self.assertEqual(set(subscribers), set(expected_subscribers))
|
|
|
|
def test_json_get_subscribers_for_guest_user(self) -> None:
|
|
"""
|
|
Guest users should have access to subscribers of web-public streams, even
|
|
if they aren't subscribed or have never subscribed to that stream.
|
|
"""
|
|
guest_user = self.example_user("polonius")
|
|
never_subscribed = gather_subscriptions_helper(guest_user, True).never_subscribed
|
|
|
|
# A guest user can only see never subscribed streams that are web-public.
|
|
# For Polonius, the only web-public stream that he is not subscribed at
|
|
# this point is Rome.
|
|
self.assert_length(never_subscribed, 1)
|
|
|
|
web_public_stream_id = never_subscribed[0]["stream_id"]
|
|
result = self.client_get(f"/json/streams/{web_public_stream_id}/members")
|
|
result_dict = self.assert_json_success(result)
|
|
self.assertIn("subscribers", result_dict)
|
|
self.assertIsInstance(result_dict["subscribers"], list)
|
|
self.assertGreater(len(result_dict["subscribers"]), 0)
|
|
|
|
def test_nonsubscriber_private_stream(self) -> None:
|
|
"""
|
|
A non-subscriber non-realm-admin user to a private stream can't query that stream's membership.
|
|
But unsubscribed realm admin users can query private stream's membership.
|
|
"""
|
|
# Create a private stream for which Hamlet is the only subscriber.
|
|
stream_name = "NewStream"
|
|
self.subscribe_via_post(self.user_profile, [stream_name], invite_only=True)
|
|
user_profile = self.example_user("othello")
|
|
|
|
# Try to fetch the subscriber list as a non-member & non-realm-admin-user.
|
|
stream_id = get_stream(stream_name, user_profile.realm).id
|
|
result = self.make_subscriber_request(stream_id, user=user_profile)
|
|
self.assert_json_error(result, "Invalid channel ID")
|
|
|
|
# Try to fetch the subscriber list as a non-member & realm-admin-user.
|
|
self.login("iago")
|
|
self.make_successful_subscriber_request(stream_name)
|