mirror of
https://github.com/zulip/zulip.git
synced 2025-11-17 20:41:46 +00:00
user-groups: Add backend enforcing for new modification settings.
Add function in user-groups.py for getting member ids for a group. Update view to enforce checks for modifying user-groups. Only admins and user group members can modify user-groups.
This commit is contained in:
@@ -4687,7 +4687,7 @@ def do_send_delete_user_group_event(user_group_id: int, realm_id: int) -> None:
|
||||
group_id=user_group_id)
|
||||
send_event(event, active_user_ids(realm_id))
|
||||
|
||||
def check_delete_user_group(user_group_id: int, realm: Realm) -> None:
|
||||
user_group = access_user_group_by_id(user_group_id, realm)
|
||||
def check_delete_user_group(user_group_id: int, user_profile: UserProfile) -> None:
|
||||
user_group = access_user_group_by_id(user_group_id, user_profile)
|
||||
user_group.delete()
|
||||
do_send_delete_user_group_event(user_group_id, realm.id)
|
||||
do_send_delete_user_group_event(user_group_id, user_profile.realm.id)
|
||||
|
||||
@@ -7,9 +7,13 @@ from zerver.lib.exceptions import JsonableError
|
||||
from zerver.models import UserProfile, Realm, UserGroupMembership, UserGroup
|
||||
from typing import Dict, Iterable, List, Text, Tuple, Any
|
||||
|
||||
def access_user_group_by_id(user_group_id: int, realm: Realm) -> UserGroup:
|
||||
def access_user_group_by_id(user_group_id: int, user_profile: UserProfile) -> UserGroup:
|
||||
try:
|
||||
user_group = UserGroup.objects.get(id=user_group_id, realm=realm)
|
||||
user_group = UserGroup.objects.get(id=user_group_id, realm=user_profile.realm)
|
||||
group_member_ids = get_user_group_members(user_group)
|
||||
msg = _("Only group members and organization administrators can administer this group.")
|
||||
if (not user_profile.is_realm_admin and user_profile.id not in group_member_ids):
|
||||
raise JsonableError(msg)
|
||||
except UserGroup.DoesNotExist:
|
||||
raise JsonableError(_("Invalid user group"))
|
||||
return user_group
|
||||
@@ -74,6 +78,10 @@ def create_user_group(name: Text, members: List[UserProfile], realm: Realm,
|
||||
])
|
||||
return user_group
|
||||
|
||||
def get_user_group_members(user_group: UserGroup) -> List[UserProfile]:
|
||||
members = UserGroupMembership.objects.filter(user_group=user_group)
|
||||
return [member.user_profile.id for member in members]
|
||||
|
||||
def get_memberships_of_users(user_group: UserGroup, members: List[UserProfile]) -> List[int]:
|
||||
return list(UserGroupMembership.objects.filter(
|
||||
user_group=user_group,
|
||||
|
||||
@@ -1086,7 +1086,7 @@ class EventsRegisterTest(ZulipTestCase):
|
||||
('op', equals('remove')),
|
||||
('group_id', check_int),
|
||||
])
|
||||
events = self.do_test(lambda: check_delete_user_group(backend.id, backend.realm))
|
||||
events = self.do_test(lambda: check_delete_user_group(backend.id, othello))
|
||||
error = user_group_remove_checker('events[0]', events[0])
|
||||
self.assert_on_error(error)
|
||||
|
||||
|
||||
@@ -119,8 +119,7 @@ class UserGroupAPITestCase(ZulipTestCase):
|
||||
'description': 'Support team',
|
||||
}
|
||||
self.client_post('/json/user_groups/create', info=params)
|
||||
user_group = UserGroup.objects.first()
|
||||
|
||||
user_group = UserGroup.objects.get(name='support')
|
||||
# Test success
|
||||
params = {
|
||||
'name': 'help',
|
||||
@@ -140,6 +139,29 @@ class UserGroupAPITestCase(ZulipTestCase):
|
||||
result = self.client_patch('/json/user_groups/1111', info=params)
|
||||
self.assert_json_error(result, "Invalid user group")
|
||||
|
||||
self.logout()
|
||||
# Test when user not a member of user group tries to modify it
|
||||
cordelia = self.example_user('cordelia')
|
||||
self.login(cordelia.email)
|
||||
params = {
|
||||
'name': 'help',
|
||||
'description': 'Troubleshooting',
|
||||
}
|
||||
result = self.client_patch('/json/user_groups/{}'.format(user_group.id), info=params)
|
||||
self.assert_json_error(result, "Only group members and organization administrators can administer this group.")
|
||||
|
||||
self.logout()
|
||||
# Test when organization admin tries to modify group
|
||||
iago = self.example_user('iago')
|
||||
self.login(iago.email)
|
||||
params = {
|
||||
'name': 'help',
|
||||
'description': 'Troubleshooting',
|
||||
}
|
||||
result = self.client_patch('/json/user_groups/{}'.format(user_group.id), info=params)
|
||||
self.assert_json_success(result)
|
||||
self.assertEqual(result.json()['description'], 'Description successfully updated.')
|
||||
|
||||
def test_user_group_delete(self) -> None:
|
||||
hamlet = self.example_user('hamlet')
|
||||
self.login(self.example_email("hamlet"))
|
||||
@@ -150,7 +172,6 @@ class UserGroupAPITestCase(ZulipTestCase):
|
||||
}
|
||||
self.client_post('/json/user_groups/create', info=params)
|
||||
user_group = UserGroup.objects.get(name='support')
|
||||
|
||||
# Test success
|
||||
self.assertEqual(UserGroup.objects.count(), 2)
|
||||
self.assertEqual(UserGroupMembership.objects.count(), 3)
|
||||
@@ -163,6 +184,33 @@ class UserGroupAPITestCase(ZulipTestCase):
|
||||
result = self.client_delete('/json/user_groups/1111')
|
||||
self.assert_json_error(result, "Invalid user group")
|
||||
|
||||
# Test when user not a member of user group tries to delete it
|
||||
params = {
|
||||
'name': 'Development',
|
||||
'members': ujson.dumps([hamlet.id]),
|
||||
'description': 'Development team',
|
||||
}
|
||||
self.client_post('/json/user_groups/create', info=params)
|
||||
user_group = UserGroup.objects.get(name='Development')
|
||||
self.assertEqual(UserGroup.objects.count(), 2)
|
||||
self.logout()
|
||||
cordelia = self.example_user('cordelia')
|
||||
self.login(cordelia.email)
|
||||
|
||||
result = self.client_delete('/json/user_groups/{}'.format(user_group.id))
|
||||
self.assert_json_error(result, "Only group members and organization administrators can administer this group.")
|
||||
self.assertEqual(UserGroup.objects.count(), 2)
|
||||
|
||||
self.logout()
|
||||
# Test when organization admin tries to delete group
|
||||
iago = self.example_user('iago')
|
||||
self.login(iago.email)
|
||||
|
||||
result = self.client_delete('/json/user_groups/{}'.format(user_group.id))
|
||||
self.assert_json_success(result)
|
||||
self.assertEqual(UserGroup.objects.count(), 1)
|
||||
self.assertEqual(UserGroupMembership.objects.count(), 2)
|
||||
|
||||
def test_update_members_of_user_group(self) -> None:
|
||||
hamlet = self.example_user('hamlet')
|
||||
self.login(self.example_email("hamlet"))
|
||||
@@ -172,10 +220,10 @@ class UserGroupAPITestCase(ZulipTestCase):
|
||||
'description': 'Support team',
|
||||
}
|
||||
self.client_post('/json/user_groups/create', info=params)
|
||||
user_group = UserGroup.objects.first()
|
||||
|
||||
user_group = UserGroup.objects.get(name='support')
|
||||
# Test add members
|
||||
self.assertEqual(UserGroupMembership.objects.count(), 3)
|
||||
|
||||
othello = self.example_user('othello')
|
||||
add = [othello.id]
|
||||
params = {'add': ujson.dumps(add)}
|
||||
@@ -194,26 +242,74 @@ class UserGroupAPITestCase(ZulipTestCase):
|
||||
members = get_memberships_of_users(user_group, [hamlet, othello])
|
||||
self.assertEqual(len(members), 2)
|
||||
|
||||
# Test remove members
|
||||
params = {'delete': ujson.dumps([hamlet.id, othello.id])}
|
||||
self.logout()
|
||||
# Test when user not a member of user group tries to add members to it
|
||||
cordelia = self.example_user('cordelia')
|
||||
self.login(cordelia.email)
|
||||
add = [cordelia.id]
|
||||
params = {'add': ujson.dumps(add)}
|
||||
result = self.client_post('/json/user_groups/{}/members'.format(user_group.id),
|
||||
info=params)
|
||||
self.assert_json_success(result)
|
||||
self.assertEqual(UserGroupMembership.objects.count(), 2)
|
||||
members = get_memberships_of_users(user_group, [hamlet, othello])
|
||||
self.assertEqual(len(members), 0)
|
||||
self.assert_json_error(result, "Only group members and organization administrators can administer this group.")
|
||||
self.assertEqual(UserGroupMembership.objects.count(), 4)
|
||||
|
||||
# Test remove a member that's already removed; arguably we should make this an error.
|
||||
params = {'delete': ujson.dumps([hamlet.id, othello.id])}
|
||||
self.logout()
|
||||
# Test when organization admin tries to add members to group
|
||||
iago = self.example_user('iago')
|
||||
self.login(iago.email)
|
||||
aaron = self.example_user('aaron')
|
||||
add = [aaron.id]
|
||||
params = {'add': ujson.dumps(add)}
|
||||
result = self.client_post('/json/user_groups/{}/members'.format(user_group.id),
|
||||
info=params)
|
||||
self.assert_json_success(result)
|
||||
self.assertEqual(UserGroupMembership.objects.count(), 2)
|
||||
members = get_memberships_of_users(user_group, [hamlet, othello])
|
||||
self.assertEqual(len(members), 0)
|
||||
self.assertEqual(UserGroupMembership.objects.count(), 5)
|
||||
members = get_memberships_of_users(user_group, [hamlet, othello, aaron])
|
||||
self.assertEqual(len(members), 3)
|
||||
|
||||
# For normal testing we again login with hamlet
|
||||
self.logout()
|
||||
self.login(hamlet.email)
|
||||
# Test remove members
|
||||
params = {'delete': ujson.dumps([othello.id])}
|
||||
result = self.client_post('/json/user_groups/{}/members'.format(user_group.id),
|
||||
info=params)
|
||||
self.assert_json_success(result)
|
||||
self.assertEqual(UserGroupMembership.objects.count(), 4)
|
||||
members = get_memberships_of_users(user_group, [hamlet, othello, aaron])
|
||||
self.assertEqual(len(members), 2)
|
||||
|
||||
# Test remove a member that's already removed
|
||||
params = {'delete': ujson.dumps([othello.id])}
|
||||
result = self.client_post('/json/user_groups/{}/members'.format(user_group.id),
|
||||
info=params)
|
||||
self.assert_json_error(result, "There is no member '6' in this user group")
|
||||
self.assertEqual(UserGroupMembership.objects.count(), 4)
|
||||
members = get_memberships_of_users(user_group, [hamlet, othello, aaron])
|
||||
self.assertEqual(len(members), 2)
|
||||
|
||||
# Test when nothing is provided
|
||||
result = self.client_post('/json/user_groups/{}/members'.format(user_group.id),
|
||||
info={})
|
||||
msg = 'Nothing to do. Specify at least one of "add" or "delete".'
|
||||
self.assert_json_error(result, msg)
|
||||
|
||||
# Test when user not a member of user group tries to remove members
|
||||
self.logout()
|
||||
self.login(cordelia.email)
|
||||
params = {'delete': ujson.dumps([hamlet.id])}
|
||||
result = self.client_post('/json/user_groups/{}/members'.format(user_group.id),
|
||||
info=params)
|
||||
self.assert_json_error(result, "Only group members and organization administrators can administer this group.")
|
||||
self.assertEqual(UserGroupMembership.objects.count(), 4)
|
||||
|
||||
self.logout()
|
||||
# Test when organization admin tries to remove members from group
|
||||
iago = self.example_user('iago')
|
||||
self.login(iago.email)
|
||||
result = self.client_post('/json/user_groups/{}/members'.format(user_group.id),
|
||||
info=params)
|
||||
self.assert_json_success(result)
|
||||
self.assertEqual(UserGroupMembership.objects.count(), 3)
|
||||
members = get_memberships_of_users(user_group, [hamlet, othello, aaron])
|
||||
self.assertEqual(len(members), 1)
|
||||
|
||||
@@ -13,7 +13,7 @@ from zerver.lib.response import json_success, json_error
|
||||
from zerver.lib.users import user_ids_to_users
|
||||
from zerver.lib.validator import check_list, check_string, check_int, \
|
||||
check_short_string
|
||||
from zerver.lib.user_groups import access_user_group_by_id, get_memberships_of_users
|
||||
from zerver.lib.user_groups import access_user_group_by_id, get_memberships_of_users, get_user_group_members
|
||||
from zerver.models import UserProfile, UserGroup, UserGroupMembership
|
||||
from zerver.views.streams import compose_views, FuncKwargPair
|
||||
|
||||
@@ -34,7 +34,7 @@ def edit_user_group(request: HttpRequest, user_profile: UserProfile,
|
||||
if not (name or description):
|
||||
return json_error(_("No new data supplied"))
|
||||
|
||||
user_group = access_user_group_by_id(user_group_id, realm=user_profile.realm)
|
||||
user_group = access_user_group_by_id(user_group_id, user_profile)
|
||||
|
||||
result = {}
|
||||
if name != user_group.name:
|
||||
@@ -50,7 +50,8 @@ def edit_user_group(request: HttpRequest, user_profile: UserProfile,
|
||||
@has_request_variables
|
||||
def delete_user_group(request: HttpRequest, user_profile: UserProfile,
|
||||
user_group_id: int=REQ(validator=check_int)) -> HttpResponse:
|
||||
check_delete_user_group(user_group_id, user_profile.realm)
|
||||
|
||||
check_delete_user_group(user_group_id, user_profile)
|
||||
return json_success()
|
||||
|
||||
@has_request_variables
|
||||
@@ -75,10 +76,10 @@ def add_members_to_group_backend(request: HttpRequest, user_profile: UserProfile
|
||||
if not members:
|
||||
return json_success()
|
||||
|
||||
user_group = access_user_group_by_id(user_group_id, user_profile.realm)
|
||||
user_group = access_user_group_by_id(user_group_id, user_profile)
|
||||
user_profiles = user_ids_to_users(members, user_profile.realm)
|
||||
|
||||
existing_member_ids = set(get_memberships_of_users(user_group, user_profiles))
|
||||
|
||||
for user_profile in user_profiles:
|
||||
if user_profile.id in existing_member_ids:
|
||||
raise JsonableError(_("User %s is already a member of this group" % (user_profile.id,)))
|
||||
@@ -92,6 +93,11 @@ def remove_members_from_group_backend(request: HttpRequest, user_profile: UserPr
|
||||
return json_success()
|
||||
|
||||
user_profiles = user_ids_to_users(members, user_profile.realm)
|
||||
user_group = access_user_group_by_id(user_group_id, user_profile.realm)
|
||||
user_group = access_user_group_by_id(user_group_id, user_profile)
|
||||
group_member_ids = get_user_group_members(user_group)
|
||||
for member in members:
|
||||
if (member not in group_member_ids):
|
||||
raise JsonableError(_("There is no member '%s' in this user group" % (member,)))
|
||||
|
||||
remove_members_from_user_group(user_group, user_profiles)
|
||||
return json_success()
|
||||
|
||||
Reference in New Issue
Block a user