streams: Allow stream admin to update and deactivate streams.

The new Stream administrator role is allowed to manage a stream they
administer, including:
* Setting properties like name, description, privacy and post-policy.
* Removing subscribers
* Deactivating the stream

The access_stream_for_delete_or_update is modified and is used only
to get objects from database and further checks for administrative
rights is done by check_stream_access_for_delete_or_update.

We have also added a new exception class StreamAdministratorRequired.
This commit is contained in:
sahil839
2020-07-13 19:43:28 +05:30
committed by Tim Abbott
parent 78da9fd3ab
commit ca1a8ac78f
4 changed files with 286 additions and 78 deletions

View File

@@ -224,6 +224,18 @@ class OrganizationOwnerRequired(JsonableError):
def msg_format() -> str:
return OrganizationOwnerRequired.OWNER_REQUIRED_MESSAGE
class StreamAdministratorRequired(JsonableError):
code: ErrorCode = ErrorCode.UNAUTHORIZED_PRINCIPAL
ADMIN_REQUIRED_MESSAGE = _("Must be an organization or stream administrator")
def __init__(self) -> None:
super().__init__(self.ADMIN_REQUIRED_MESSAGE)
@staticmethod
def msg_format() -> str:
return StreamAdministratorRequired.ADMIN_REQUIRED_MESSAGE
class MarkdownRenderingException(Exception):
pass

View File

@@ -5,6 +5,7 @@ from django.db.models.query import QuerySet
from django.utils.timezone import now as timezone_now
from django.utils.translation import ugettext as _
from zerver.lib.exceptions import StreamAdministratorRequired
from zerver.lib.markdown import markdown_convert
from zerver.lib.request import JsonableError
from zerver.models import (
@@ -203,22 +204,37 @@ def check_for_exactly_one_stream_arg(stream_id: Optional[int], stream: Optional[
if stream_id is not None and stream is not None:
raise JsonableError(_("Please choose one: 'stream' or 'stream_id'."))
def access_stream_for_delete_or_update(user_profile: UserProfile, stream_id: int) -> Stream:
# We should only ever use this for realm admins, who are allowed
# to delete or update all streams on their realm, even private streams
# to which they are not subscribed. We do an assert here, because
# all callers should have the require_realm_admin decorator.
assert(user_profile.is_realm_admin)
def check_stream_access_for_delete_or_update(user_profile: UserProfile, stream: Stream,
sub: Optional[Subscription]=None) -> None:
error = _("Invalid stream id")
try:
stream = Stream.objects.get(id=stream_id)
except Stream.DoesNotExist:
raise JsonableError(error)
if stream.realm_id != user_profile.realm_id:
raise JsonableError(error)
if user_profile.is_realm_admin:
return
if sub is None and stream.invite_only:
raise JsonableError(error)
if sub is not None and sub.is_stream_admin:
return
raise StreamAdministratorRequired()
def access_stream_for_delete_or_update(user_profile: UserProfile, stream_id: int) -> Stream:
try:
stream = Stream.objects.get(id=stream_id)
except Stream.DoesNotExist:
raise JsonableError(_("Invalid stream id"))
try:
sub = Subscription.objects.get(user_profile=user_profile,
recipient=stream.recipient,
active=True)
except Subscription.DoesNotExist:
sub = None
check_stream_access_for_delete_or_update(user_profile, stream, sub)
return stream
# Only set allow_realm_admin flag to True when you want to allow realm admin to
@@ -431,7 +447,8 @@ def filter_stream_authorization(user_profile: UserProfile,
def list_to_streams(streams_raw: Iterable[Mapping[str, Any]],
user_profile: UserProfile,
autocreate: bool=False) -> Tuple[List[Stream], List[Stream]]:
autocreate: bool=False,
admin_access_required: bool=False) -> Tuple[List[Stream], List[Stream]]:
"""Converts list of dicts to a list of Streams, validating input in the process
For each stream name, we validate it to ensure it meets our
@@ -459,6 +476,20 @@ def list_to_streams(streams_raw: Iterable[Mapping[str, Any]],
missing_stream_dicts: List[Mapping[str, Any]] = []
existing_stream_map = bulk_get_streams(user_profile.realm, stream_set)
if admin_access_required:
existing_stream_ids = [stream.id for stream in existing_stream_map.values()]
subs = Subscription.objects.select_related("recipient").filter(
user_profile=user_profile,
recipient__type=Recipient.STREAM,
recipient__type_id__in=existing_stream_ids,
active=True)
sub_dict_by_stream_ids = {sub.recipient.type_id: sub for sub in subs}
for stream in existing_stream_map.values():
sub = None
if stream.id in sub_dict_by_stream_ids:
sub = sub_dict_by_stream_ids[stream.id]
check_stream_access_for_delete_or_update(user_profile, stream, sub)
message_retention_days_not_none = False
for stream_dict in streams_raw:
stream_name = stream_dict["name"]

View File

@@ -24,6 +24,7 @@ from zerver.lib.actions import (
do_change_default_stream_group_name,
do_change_plan_type,
do_change_stream_post_policy,
do_change_subscription_property,
do_change_user_role,
do_create_default_stream_group,
do_create_realm,
@@ -349,30 +350,50 @@ class StreamAdminTest(ZulipTestCase):
def test_make_stream_public(self) -> None:
user_profile = self.example_user('hamlet')
self.login_user(user_profile)
self.make_stream('private_stream', invite_only=True)
self.make_stream('private_stream_1', invite_only=True)
self.make_stream('private_stream_2', invite_only=True)
do_change_user_role(user_profile, UserProfile.ROLE_REALM_ADMINISTRATOR)
params = {
'stream_name': orjson.dumps('private_stream').decode(),
'stream_name': orjson.dumps('private_stream_1').decode(),
'is_private': orjson.dumps(False).decode(),
}
stream_id = get_stream('private_stream', user_profile.realm).id
stream_id = get_stream('private_stream_1', user_profile.realm).id
result = self.client_patch(f"/json/streams/{stream_id}", params)
self.assert_json_error(result, 'Invalid stream id')
stream = self.subscribe(user_profile, 'private_stream')
stream = self.subscribe(user_profile, 'private_stream_1')
self.assertFalse(stream.is_in_zephyr_realm)
do_change_user_role(user_profile, UserProfile.ROLE_REALM_ADMINISTRATOR)
params = {
'stream_name': orjson.dumps('private_stream').decode(),
'stream_name': orjson.dumps('private_stream_1').decode(),
'is_private': orjson.dumps(False).decode(),
}
result = self.client_patch(f"/json/streams/{stream_id}", params)
self.assert_json_success(result)
realm = user_profile.realm
stream = get_stream('private_stream', realm)
stream = get_stream('private_stream_1', realm)
self.assertFalse(stream.invite_only)
self.assertTrue(stream.history_public_to_subscribers)
do_change_user_role(user_profile, UserProfile.ROLE_MEMBER)
params = {
'stream_name': orjson.dumps('private_stream_2').decode(),
'is_private': orjson.dumps(False).decode(),
}
stream = self.subscribe(user_profile, 'private_stream_2')
result = self.client_patch(f"/json/streams/{stream.id}", params)
self.assertTrue(stream.invite_only)
self.assert_json_error(result, "Must be an organization or stream administrator")
sub = get_subscription('private_stream_2', user_profile)
do_change_subscription_property(user_profile, sub, stream, "role", Subscription.ROLE_STREAM_ADMINISTRATOR)
result = self.client_patch(f"/json/streams/{stream.id}", params)
self.assert_json_success(result)
stream = get_stream('private_stream_2', realm)
self.assertFalse(stream.invite_only)
self.assertTrue(stream.history_public_to_subscribers)
@@ -380,17 +401,37 @@ class StreamAdminTest(ZulipTestCase):
user_profile = self.example_user('hamlet')
self.login_user(user_profile)
realm = user_profile.realm
self.make_stream('public_stream', realm=realm)
self.make_stream('public_stream_1', realm=realm)
self.make_stream('public_stream_2')
do_change_user_role(user_profile, UserProfile.ROLE_REALM_ADMINISTRATOR)
params = {
'stream_name': orjson.dumps('public_stream').decode(),
'stream_name': orjson.dumps('public_stream_1').decode(),
'is_private': orjson.dumps(True).decode(),
}
stream_id = get_stream('public_stream', realm).id
stream_id = get_stream('public_stream_1', realm).id
result = self.client_patch(f"/json/streams/{stream_id}", params)
self.assert_json_success(result)
stream = get_stream('public_stream', realm)
stream = get_stream('public_stream_1', realm)
self.assertTrue(stream.invite_only)
self.assertFalse(stream.history_public_to_subscribers)
do_change_user_role(user_profile, UserProfile.ROLE_MEMBER)
params = {
'stream_name': orjson.dumps('public_stream_2').decode(),
'is_private': orjson.dumps(True).decode(),
}
stream = self.subscribe(user_profile, 'public_stream_2')
result = self.client_patch(f"/json/streams/{stream.id}", params)
self.assertFalse(stream.invite_only)
self.assert_json_error(result, "Must be an organization or stream administrator")
sub = get_subscription('public_stream_2', user_profile)
do_change_subscription_property(user_profile, sub, stream, "role", Subscription.ROLE_STREAM_ADMINISTRATOR)
result = self.client_patch(f"/json/streams/{stream.id}", params)
self.assert_json_success(result)
stream = get_stream('public_stream_2', realm)
self.assertTrue(stream.invite_only)
self.assertFalse(stream.history_public_to_subscribers)
@@ -455,7 +496,7 @@ class StreamAdminTest(ZulipTestCase):
def test_deactivate_stream_backend(self) -> None:
user_profile = self.example_user('hamlet')
self.login_user(user_profile)
stream = self.make_stream('new_stream')
stream = self.make_stream('new_stream_1')
self.subscribe(user_profile, stream.name)
do_change_user_role(user_profile, UserProfile.ROLE_REALM_ADMINISTRATOR)
@@ -466,6 +507,19 @@ class StreamAdminTest(ZulipTestCase):
).exists()
self.assertFalse(subscription_exists)
do_change_user_role(user_profile, UserProfile.ROLE_MEMBER)
stream = self.make_stream('new_stream_2')
self.subscribe(user_profile, stream.name)
sub = get_subscription(stream.name, user_profile)
do_change_subscription_property(user_profile, sub, stream, "role", Subscription.ROLE_STREAM_ADMINISTRATOR)
result = self.client_delete(f'/json/streams/{stream.id}')
self.assert_json_success(result)
subscription_exists = get_active_subscriptions_for_stream_id(stream.id).filter(
user_profile=user_profile,
).exists()
self.assertFalse(subscription_exists)
def test_deactivate_stream_removes_default_stream(self) -> None:
stream = self.make_stream('new_stream')
do_add_default_stream(stream)
@@ -518,14 +572,15 @@ class StreamAdminTest(ZulipTestCase):
result = self.client_delete('/json/streams/999999999')
self.assert_json_error(result, 'Invalid stream id')
def test_deactivate_stream_backend_requires_realm_admin(self) -> None:
def test_deactivate_stream_backend_requires_admin(self) -> None:
user_profile = self.example_user('hamlet')
self.login_user(user_profile)
self.subscribe(user_profile, 'new_stream')
stream = self.subscribe(user_profile, 'new_stream')
sub = get_subscription('new_stream', user_profile)
self.assertFalse(sub.is_stream_admin)
stream_id = get_stream('new_stream', user_profile.realm).id
result = self.client_delete(f'/json/streams/{stream_id}')
self.assert_json_error(result, 'Must be an organization administrator')
result = self.client_delete(f'/json/streams/{stream.id}')
self.assert_json_error(result, 'Must be an organization or stream administrator')
def test_private_stream_live_updates(self) -> None:
user_profile = self.example_user('hamlet')
@@ -690,15 +745,35 @@ class StreamAdminTest(ZulipTestCase):
self.assertNotIn(self.example_user('prospero').id,
notified_user_ids)
def test_rename_stream_requires_realm_admin(self) -> None:
# Test renaming of stream by stream admin.
do_change_user_role(user_profile, UserProfile.ROLE_MEMBER)
new_stream = self.make_stream('new_stream', realm=user_profile.realm)
self.subscribe(user_profile, 'new_stream')
sub = get_subscription('new_stream', user_profile)
do_change_subscription_property(user_profile, sub, new_stream, "role", Subscription.ROLE_STREAM_ADMINISTRATOR)
del events[:]
with tornado_redirected_to_list(events):
result = self.client_patch(f'/json/streams/{new_stream.id}',
{'new_name': orjson.dumps('stream_rename').decode()})
self.assert_json_success(result)
self.assertEqual(len(events), 3)
stream_rename_exists = get_stream('stream_rename', realm)
self.assertTrue(stream_rename_exists)
def test_rename_stream_requires_admin(self) -> None:
user_profile = self.example_user('hamlet')
self.login_user(user_profile)
self.make_stream('stream_name1')
self.subscribe(user_profile, 'stream_name1')
sub = get_subscription('stream_name1', user_profile)
self.assertFalse(sub.is_stream_admin)
stream_id = get_stream('stream_name1', user_profile.realm).id
result = self.client_patch(f'/json/streams/{stream_id}',
{'new_name': orjson.dumps('stream_name2').decode()})
self.assert_json_error(result, 'Must be an organization administrator')
self.assert_json_error(result, 'Must be an organization or stream administrator')
def test_notify_on_stream_rename(self) -> None:
user_profile = self.example_user('hamlet')
@@ -747,6 +822,34 @@ class StreamAdminTest(ZulipTestCase):
'is_private': orjson.dumps(True).decode()})
self.assert_json_error(result, "Invalid stream id")
def test_non_admin_cannot_access_unsub_private_stream(self) -> None:
iago = self.example_user('iago')
hamlet = self.example_user('hamlet')
self.login_user(hamlet)
result = self.common_subscribe_to_streams(hamlet, ["private_stream_1"],
dict(principals=orjson.dumps([iago.id]).decode()),
invite_only=True)
self.assert_json_success(result)
stream_id = get_stream('private_stream_1', hamlet.realm).id
result = self.client_patch(f'/json/streams/{stream_id}',
{'new_name': orjson.dumps('private_stream_2').decode()})
self.assert_json_error(result, "Invalid stream id")
result = self.client_patch(f'/json/streams/{stream_id}',
{'new_description': orjson.dumps('new description').decode()})
self.assert_json_error(result, "Invalid stream id")
result = self.client_patch(f'/json/streams/{stream_id}',
{'stream_name': orjson.dumps('private_stream_1').decode(),
'is_private': orjson.dumps(True).decode()})
self.assert_json_error(result, "Invalid stream id")
result = self.client_delete(f'/json/streams/{stream_id}')
self.assert_json_error(result, "Invalid stream id")
def test_change_stream_description(self) -> None:
user_profile = self.example_user('iago')
self.login_user(user_profile)
@@ -807,17 +910,33 @@ class StreamAdminTest(ZulipTestCase):
'<p>See <a href="https://zulip.com/team">https://zulip.com/team</a></p>',
)
def test_change_stream_description_requires_realm_admin(self) -> None:
# Test changing stream description by stream admin.
do_change_user_role(user_profile, UserProfile.ROLE_MEMBER)
sub = get_subscription('stream_name1', user_profile)
do_change_subscription_property(user_profile, sub, stream, "role", Subscription.ROLE_STREAM_ADMINISTRATOR)
with tornado_redirected_to_list(events):
stream_id = get_stream('stream_name1', realm).id
result = self.client_patch(f'/json/streams/{stream_id}',
{'description': orjson.dumps('Test description').decode()})
self.assert_json_success(result)
stream = get_stream('stream_name1', realm)
self.assertEqual(stream.description, 'Test description')
def test_change_stream_description_requires_admin(self) -> None:
user_profile = self.example_user('hamlet')
self.login_user(user_profile)
self.subscribe(user_profile, 'stream_name1')
stream = self.subscribe(user_profile, 'stream_name1')
sub = get_subscription('stream_name1', user_profile)
do_change_user_role(user_profile, UserProfile.ROLE_MEMBER)
do_change_subscription_property(user_profile, sub, stream, "role", Subscription.ROLE_MEMBER)
stream_id = get_stream('stream_name1', user_profile.realm).id
result = self.client_patch(f'/json/streams/{stream_id}',
{'description': orjson.dumps('Test description').decode()})
self.assert_json_error(result, 'Must be an organization administrator')
self.assert_json_error(result, 'Must be an organization or stream administrator')
def test_change_to_stream_post_policy_admins(self) -> None:
user_profile = self.example_user('hamlet')
@@ -833,12 +952,15 @@ class StreamAdminTest(ZulipTestCase):
stream = get_stream('stream_name1', user_profile.realm)
self.assertTrue(stream.stream_post_policy == Stream.STREAM_POST_POLICY_ADMINS)
def test_change_stream_post_policy_requires_realm_admin(self) -> None:
def test_change_stream_post_policy_requires_admin(self) -> None:
user_profile = self.example_user('hamlet')
self.login_user(user_profile)
self.subscribe(user_profile, 'stream_name1')
stream = self.subscribe(user_profile, 'stream_name1')
sub = get_subscription('stream_name1', user_profile)
do_change_user_role(user_profile, UserProfile.ROLE_MEMBER)
do_change_subscription_property(user_profile, sub, stream, "role", Subscription.ROLE_MEMBER)
do_set_realm_property(user_profile.realm, 'waiting_period_threshold', 10)
@@ -849,14 +971,23 @@ class StreamAdminTest(ZulipTestCase):
stream_id = get_stream('stream_name1', user_profile.realm).id
result = self.client_patch(f'/json/streams/{stream_id}',
{'stream_post_policy': orjson.dumps(policy).decode()})
self.assert_json_error(result, 'Must be an organization administrator')
self.assert_json_error(result, 'Must be an organization or stream administrator')
policies = [Stream.STREAM_POST_POLICY_ADMINS, Stream.STREAM_POST_POLICY_RESTRICT_NEW_MEMBERS]
for policy in policies:
test_non_admin(how_old=15, is_new=False, policy=policy)
test_non_admin(how_old=5, is_new=True, policy=policy)
do_change_subscription_property(user_profile, sub, stream, "role", Subscription.ROLE_STREAM_ADMINISTRATOR)
for policy in policies:
stream_id = get_stream('stream_name1', user_profile.realm).id
result = self.client_patch(f'/json/streams/{stream_id}',
{'stream_post_policy': orjson.dumps(policy).decode()})
self.assert_json_success(result)
stream = get_stream('stream_name1', user_profile.realm)
self.assertEqual(stream.stream_post_policy, policy)
do_change_user_role(user_profile, UserProfile.ROLE_REALM_ADMINISTRATOR)
for policy in policies:
@@ -1105,7 +1236,7 @@ class StreamAdminTest(ZulipTestCase):
stream = self.make_stream('other_realm_stream', realm=other_realm)
result = self.client_delete('/json/streams/' + str(stream.id))
self.assert_json_error(result, 'Must be an organization administrator')
self.assert_json_error(result, 'Invalid stream id')
# Even becoming a realm admin doesn't help us for an out-of-realm
# stream.
@@ -1142,12 +1273,13 @@ class StreamAdminTest(ZulipTestCase):
self.delete_stream(priv_stream)
def attempt_unsubscribe_of_principal(self, query_count: int, target_users: List[UserProfile],
is_admin: bool=False, is_subbed: bool=True, invite_only: bool=False,
is_realm_admin: bool=False, is_stream_admin: bool=False,
is_subbed: bool=True, invite_only: bool=False,
target_users_subbed: bool=True, using_legacy_emails: bool=False,
other_sub_users: Sequence[UserProfile]=[]) -> HttpResponse:
# Set up the main user, who is in most cases an admin.
if is_admin:
if is_realm_admin:
user_profile = self.example_user('iago')
else:
user_profile = self.example_user('hamlet')
@@ -1168,7 +1300,11 @@ class StreamAdminTest(ZulipTestCase):
# Subscribe the admin and/or principal as specified in the flags.
if is_subbed:
self.subscribe(user_profile, stream_name)
stream = self.subscribe(user_profile, stream_name)
if is_stream_admin:
sub = get_subscription(stream_name, user_profile)
do_change_subscription_property(user_profile, sub, stream, "role",
Subscription.ROLE_STREAM_ADMINISTRATOR)
if target_users_subbed:
for user in target_users:
self.subscribe(user, stream_name)
@@ -1195,78 +1331,113 @@ class StreamAdminTest(ZulipTestCase):
If you're not an admin, you can't remove other people from streams.
"""
result = self.attempt_unsubscribe_of_principal(
query_count=3, target_users=[self.example_user('cordelia')], is_admin=False, is_subbed=True,
invite_only=False, target_users_subbed=True)
query_count=5, target_users=[self.example_user('cordelia')], is_realm_admin=False,
is_stream_admin=False, is_subbed=True, invite_only=False, target_users_subbed=True)
self.assert_json_error(
result, "This action requires administrative rights")
result, "Must be an organization or stream administrator")
def test_admin_remove_others_from_public_stream(self) -> None:
def test_realm_admin_remove_others_from_public_stream(self) -> None:
"""
If you're an admin, you can remove people from public streams, even
If you're a realm admin, you can remove people from public streams, even
those you aren't on.
"""
result = self.attempt_unsubscribe_of_principal(
query_count=21, target_users=[self.example_user('cordelia')], is_admin=True, is_subbed=True,
invite_only=False, target_users_subbed=True)
query_count=22, target_users=[self.example_user('cordelia')], is_realm_admin=True,
is_subbed=True, invite_only=False, target_users_subbed=True)
json = self.assert_json_success(result)
self.assertEqual(len(json["removed"]), 1)
self.assertEqual(len(json["not_removed"]), 0)
def test_admin_remove_multiple_users_from_stream(self) -> None:
def test_realm_admin_remove_multiple_users_from_stream(self) -> None:
"""
If you're an admin, you can remove multiple users from a stream,
If you're a realm admin, you can remove multiple users from a stream,
"""
result = self.attempt_unsubscribe_of_principal(
query_count=30, target_users=[self.example_user('cordelia'), self.example_user('prospero')],
is_admin=True, is_subbed=True, invite_only=False, target_users_subbed=True)
query_count=31, target_users=[self.example_user('cordelia'), self.example_user('prospero')],
is_realm_admin=True, is_subbed=True, invite_only=False, target_users_subbed=True)
json = self.assert_json_success(result)
self.assertEqual(len(json["removed"]), 2)
self.assertEqual(len(json["not_removed"]), 0)
def test_admin_remove_others_from_subbed_private_stream(self) -> None:
def test_realm_admin_remove_others_from_subbed_private_stream(self) -> None:
"""
If you're an admin, you can remove other people from private streams you
If you're a realm admin, you can remove other people from private streams you
are on.
"""
result = self.attempt_unsubscribe_of_principal(
query_count=21, target_users=[self.example_user('cordelia')], is_admin=True, is_subbed=True,
invite_only=True, target_users_subbed=True)
query_count=22, target_users=[self.example_user('cordelia')], is_realm_admin=True,
is_subbed=True, invite_only=True, target_users_subbed=True)
json = self.assert_json_success(result)
self.assertEqual(len(json["removed"]), 1)
self.assertEqual(len(json["not_removed"]), 0)
def test_admin_remove_others_from_unsubbed_private_stream(self) -> None:
def test_realm_admin_remove_others_from_unsubbed_private_stream(self) -> None:
"""
If you're an admin, you can remove people from private
If you're a realm admin, you can remove people from private
streams you aren't on.
"""
result = self.attempt_unsubscribe_of_principal(
query_count=21, target_users=[self.example_user('cordelia')], is_admin=True, is_subbed=False,
invite_only=True, target_users_subbed=True, other_sub_users=[self.example_user("othello")])
query_count=22, target_users=[self.example_user('cordelia')], is_realm_admin=True,
is_subbed=False, invite_only=True, target_users_subbed=True, other_sub_users=[self.example_user("othello")])
json = self.assert_json_success(result)
self.assertEqual(len(json["removed"]), 1)
self.assertEqual(len(json["not_removed"]), 0)
def test_stream_admin_remove_others_from_public_stream(self) -> None:
"""
You can remove others from public streams you're a stream administrator of.
"""
result = self.attempt_unsubscribe_of_principal(
query_count=22, target_users=[self.example_user('cordelia')], is_realm_admin=False,
is_stream_admin=True, is_subbed=True, invite_only=False, target_users_subbed=True)
json = self.assert_json_success(result)
self.assertEqual(len(json["removed"]), 1)
self.assertEqual(len(json["not_removed"]), 0)
def test_stream_admin_remove_multiple_users_from_stream(self) -> None:
"""
You can remove multiple users from public streams you're a stream administrator of.
"""
result = self.attempt_unsubscribe_of_principal(
query_count=31, target_users=[self.example_user('cordelia'), self.example_user('prospero')],
is_realm_admin=False, is_stream_admin=True, is_subbed=True, invite_only=False,
target_users_subbed=True)
json = self.assert_json_success(result)
self.assertEqual(len(json["removed"]), 2)
self.assertEqual(len(json["not_removed"]), 0)
def test_stream_admin_remove_others_from_private_stream(self) -> None:
"""
You can remove others from private streams you're a stream administrator of.
"""
result = self.attempt_unsubscribe_of_principal(
query_count=22, target_users=[self.example_user('cordelia')], is_realm_admin=False,
is_stream_admin=True, is_subbed=True, invite_only=True, target_users_subbed=True)
json = self.assert_json_success(result)
self.assertEqual(len(json["removed"]), 1)
self.assertEqual(len(json["not_removed"]), 0)
def test_cant_remove_others_from_stream_legacy_emails(self) -> None:
result = self.attempt_unsubscribe_of_principal(
query_count=3, is_admin=False, is_subbed=True, invite_only=False,
target_users=[self.example_user('cordelia')], target_users_subbed=True,
query_count=5, is_realm_admin=False, is_stream_admin=False, is_subbed=True,
invite_only=False, target_users=[self.example_user('cordelia')], target_users_subbed=True,
using_legacy_emails=True)
self.assert_json_error(
result, "This action requires administrative rights")
result, "Must be an organization or stream administrator")
def test_admin_remove_others_from_stream_legacy_emails(self) -> None:
result = self.attempt_unsubscribe_of_principal(
query_count=21, target_users=[self.example_user('cordelia')], is_admin=True, is_subbed=True,
invite_only=False, target_users_subbed=True, using_legacy_emails=True)
query_count=22, target_users=[self.example_user('cordelia')], is_realm_admin=True,
is_subbed=True, invite_only=False, target_users_subbed=True, using_legacy_emails=True)
json = self.assert_json_success(result)
self.assertEqual(len(json["removed"]), 1)
self.assertEqual(len(json["not_removed"]), 0)
def test_admin_remove_multiple_users_from_stream_legacy_emails(self) -> None:
result = self.attempt_unsubscribe_of_principal(
query_count=30, target_users=[self.example_user('cordelia'), self.example_user('prospero')],
is_admin=True, is_subbed=True, invite_only=False, target_users_subbed=True, using_legacy_emails=True)
query_count=31, target_users=[self.example_user('cordelia'), self.example_user('prospero')],
is_realm_admin=True, is_subbed=True, invite_only=False, target_users_subbed=True,
using_legacy_emails=True)
json = self.assert_json_success(result)
self.assertEqual(len(json["removed"]), 2)
self.assertEqual(len(json["not_removed"]), 0)
@@ -1402,8 +1573,8 @@ class StreamAdminTest(ZulipTestCase):
fails gracefully.
"""
result = self.attempt_unsubscribe_of_principal(
query_count=11, target_users=[self.example_user('cordelia')], is_admin=True, is_subbed=False,
invite_only=False, target_users_subbed=False)
query_count=12, target_users=[self.example_user('cordelia')], is_realm_admin=True,
is_subbed=False, invite_only=False, target_users_subbed=False)
json = self.assert_json_success(result)
self.assertEqual(len(json["removed"]), 0)
self.assertEqual(len(json["not_removed"]), 1)

View File

@@ -132,7 +132,6 @@ def check_if_removing_someone_else(user_profile: UserProfile,
else:
return principals[0] != user_profile.email
@require_realm_admin
def deactivate_stream_backend(request: HttpRequest,
user_profile: UserProfile,
stream_id: int) -> HttpResponse:
@@ -218,7 +217,6 @@ def remove_default_stream(request: HttpRequest,
do_remove_default_stream(stream)
return json_success()
@require_realm_admin
@has_request_variables
def update_stream_backend(
request: HttpRequest, user_profile: UserProfile,
@@ -356,16 +354,12 @@ def remove_subscriptions_backend(
removing_someone_else = check_if_removing_someone_else(user_profile, principals)
if removing_someone_else and not user_profile.is_realm_admin:
# You can only unsubscribe other people from a stream if you are a realm
# admin (whether the stream is public or private).
return json_error(_("This action requires administrative rights"))
streams_as_dict = []
for stream_name in streams_raw:
streams_as_dict.append({"name": stream_name.strip()})
streams, __ = list_to_streams(streams_as_dict, user_profile)
streams, __ = list_to_streams(streams_as_dict, user_profile,
admin_access_required=removing_someone_else)
if principals:
people_to_unsub = {principal_to_user_profile(