stream: Modify flag to allow access for users with metadata access.

This commit is contained in:
Shubham Padia
2025-02-06 17:33:25 +00:00
committed by Tim Abbott
parent 9725de99e9
commit 35f9305acb
7 changed files with 265 additions and 57 deletions

View File

@@ -72,6 +72,7 @@ from zerver.lib.stream_traffic import (
from zerver.lib.streams import (
StreamDict,
StreamsCategorizedByPermissions,
UserGroupMembershipDetails,
access_stream_by_id,
access_stream_by_name,
can_access_stream_history,
@@ -3247,7 +3248,7 @@ class StreamAdminTest(ZulipTestCase):
are on.
"""
result = self.attempt_unsubscribe_of_principal(
query_count=18,
query_count=19,
target_users=[self.example_user("cordelia")],
is_realm_admin=True,
is_subbed=True,
@@ -3264,7 +3265,7 @@ class StreamAdminTest(ZulipTestCase):
streams you aren't on.
"""
result = self.attempt_unsubscribe_of_principal(
query_count=18,
query_count=19,
target_users=[self.example_user("cordelia")],
is_realm_admin=True,
is_subbed=False,
@@ -5991,7 +5992,7 @@ class SubscriptionAPITest(ZulipTestCase):
# Sends 3 peer-remove events, 2 unsubscribe events
# and 2 stream delete events for private streams.
with (
self.assert_database_query_count(18),
self.assert_database_query_count(20),
self.assert_memcached_count(3),
self.capture_send_event_calls(expected_num_events=7) as events,
):
@@ -7782,6 +7783,80 @@ class AccessStreamTest(ZulipTestCase):
access_stream_by_id(sipbtest, mit_stream.id)
access_stream_by_name(sipbtest, mit_stream.name)
def test_access_stream_allow_metadata_access_flag(self) -> None:
"""
A comprehensive security test for the access_stream_by_* API functions.
"""
# Create a private stream for which Hamlet is the only subscriber.
hamlet = self.example_user("hamlet")
stream_name = "new_private_stream"
self.login_user(hamlet)
self.subscribe_via_post(hamlet, [stream_name], invite_only=True)
stream = get_stream(stream_name, hamlet.realm)
othello = self.example_user("othello")
iago = self.example_user("iago")
# Realm admin cannot access the private stream
with self.assertRaisesRegex(JsonableError, "Invalid channel ID"):
access_stream_by_id(iago, stream.id)
with self.assertRaisesRegex(JsonableError, "Invalid channel name 'new_private_stream'"):
access_stream_by_name(iago, stream.name)
# Realm admins can access private stream if
# require_content_access set to False
access_stream_by_id(iago, stream.id, require_content_access=False)
access_stream_by_name(iago, stream.name, require_content_access=False)
# Normal unsubscribed user cannot access a private stream
with self.assertRaisesRegex(JsonableError, "Invalid channel ID"):
access_stream_by_id(othello, stream.id)
with self.assertRaisesRegex(JsonableError, "Invalid channel name 'new_private_stream'"):
access_stream_by_name(othello, stream.name)
# Normal unsubscribed user cannot access a private stream with
# require_content_access set to False
with self.assertRaisesRegex(JsonableError, "Invalid channel ID"):
access_stream_by_id(othello, stream.id, require_content_access=False)
with self.assertRaisesRegex(JsonableError, "Invalid channel name 'new_private_stream'"):
access_stream_by_name(othello, stream.name, require_content_access=False)
othello_group = check_add_user_group(
othello.realm, "user_profile_group", [othello], acting_user=othello
)
nobody_group = NamedUserGroup.objects.get(
name="role:nobody", is_system_group=True, realm=othello.realm
)
do_change_stream_group_based_setting(
stream,
"can_administer_channel_group",
othello_group,
acting_user=None,
)
# Channel admins can access private stream if
# require_content_access is set to False
access_stream_by_id(othello, stream.id, require_content_access=False)
access_stream_by_name(othello, stream.name, require_content_access=False)
do_change_stream_group_based_setting(
stream,
"can_administer_channel_group",
nobody_group,
acting_user=None,
)
do_change_stream_group_based_setting(
stream,
"can_add_subscribers_group",
othello_group,
acting_user=None,
)
# Users in `can_add_subscribers_group` can access private
# stream if require_content_access is set to True
access_stream_by_id(othello, stream.id, require_content_access=False)
access_stream_by_name(othello, stream.name, require_content_access=False)
def test_stream_access_by_guest(self) -> None:
guest_user_profile = self.example_user("polonius")
self.login_user(guest_user_profile)
@@ -7830,26 +7905,90 @@ class AccessStreamTest(ZulipTestCase):
# Even guest user should have access to web public channel.
self.assertEqual(
user_has_content_access(guest_user, web_public_stream, is_subscribed=False), True
user_has_content_access(
guest_user,
web_public_stream,
user_group_membership_details=UserGroupMembershipDetails(
user_recursive_group_ids=None
),
is_subscribed=False,
),
True,
)
# User should have access to private channel if they are
# subscribed to it
self.assertEqual(user_has_content_access(aaron, private_stream, is_subscribed=True), True)
self.assertEqual(user_has_content_access(aaron, private_stream, is_subscribed=False), False)
self.assertEqual(
user_has_content_access(
aaron,
private_stream,
user_group_membership_details=UserGroupMembershipDetails(
user_recursive_group_ids=None
),
is_subscribed=True,
),
True,
)
self.assertEqual(
user_has_content_access(
aaron,
private_stream,
user_group_membership_details=UserGroupMembershipDetails(
user_recursive_group_ids=None
),
is_subscribed=False,
),
False,
)
# Non guest user should have access to public channel
# regardless of their subscription to the channel.
self.assertEqual(user_has_content_access(aaron, public_stream, is_subscribed=True), True)
self.assertEqual(user_has_content_access(aaron, public_stream, is_subscribed=False), True)
self.assertEqual(
user_has_content_access(
aaron,
public_stream,
user_group_membership_details=UserGroupMembershipDetails(
user_recursive_group_ids=None
),
is_subscribed=True,
),
True,
)
self.assertEqual(
user_has_content_access(
aaron,
public_stream,
user_group_membership_details=UserGroupMembershipDetails(
user_recursive_group_ids=None
),
is_subscribed=False,
),
True,
)
# Guest user should have access to public channel only if they
# are subscribed to it.
self.assertEqual(
user_has_content_access(guest_user, public_stream, is_subscribed=False), False
user_has_content_access(
guest_user,
public_stream,
user_group_membership_details=UserGroupMembershipDetails(
user_recursive_group_ids=None
),
is_subscribed=False,
),
False,
)
self.assertEqual(
user_has_content_access(guest_user, public_stream, is_subscribed=True), True
user_has_content_access(
guest_user,
public_stream,
user_group_membership_details=UserGroupMembershipDetails(
user_recursive_group_ids=None
),
is_subscribed=True,
),
True,
)
# User should be able to access private channel if they are
@@ -7862,7 +8001,17 @@ class AccessStreamTest(ZulipTestCase):
aaron_group,
acting_user=None,
)
self.assertEqual(user_has_content_access(aaron, private_stream, is_subscribed=False), True)
self.assertEqual(
user_has_content_access(
aaron,
private_stream,
user_group_membership_details=UserGroupMembershipDetails(
user_recursive_group_ids=None
),
is_subscribed=False,
),
True,
)
nobody_group = NamedUserGroup.objects.get(
name="role:nobody", realm=realm, is_system_group=True
)
@@ -7883,8 +8032,28 @@ class AccessStreamTest(ZulipTestCase):
aaron_group,
acting_user=None,
)
self.assertEqual(user_has_content_access(aaron, private_stream, is_subscribed=False), False)
self.assertEqual(user_has_content_access(aaron, private_stream, is_subscribed=True), True)
self.assertEqual(
user_has_content_access(
aaron,
private_stream,
user_group_membership_details=UserGroupMembershipDetails(
user_recursive_group_ids=None
),
is_subscribed=False,
),
False,
)
self.assertEqual(
user_has_content_access(
aaron,
private_stream,
user_group_membership_details=UserGroupMembershipDetails(
user_recursive_group_ids=None
),
is_subscribed=True,
),
True,
)
class StreamTrafficTest(ZulipTestCase):