mirror of
				https://github.com/zulip/zulip.git
				synced 2025-11-04 05:53:43 +00:00 
			
		
		
		
	realm owner: Add ability to change realm owner status of user.
This commit adds some basic checks while adding or removing realm owner status of a user and adds code to change owner status of a user using update_user_backend. This also adds restriction on removing owner status of the last owner of realm. This restriction was previously on revoking admin status, but as we have added a more privileged role of realm owner, we now have this restriction on owner instead of admin. We need to apply that restriction both in the role change code path and the deactivate code path.
This commit is contained in:
		@@ -158,11 +158,11 @@ class StreamWithIDDoesNotExistError(JsonableError):
 | 
			
		||||
 | 
			
		||||
class CannotDeactivateLastUserError(JsonableError):
 | 
			
		||||
    code = ErrorCode.CANNOT_DEACTIVATE_LAST_USER
 | 
			
		||||
    data_fields = ['is_last_admin', 'entity']
 | 
			
		||||
    data_fields = ['is_last_owner', 'entity']
 | 
			
		||||
 | 
			
		||||
    def __init__(self, is_last_admin: bool) -> None:
 | 
			
		||||
        self.is_last_admin = is_last_admin
 | 
			
		||||
        self.entity = _("organization administrator") if is_last_admin else _("user")
 | 
			
		||||
    def __init__(self, is_last_owner: bool) -> None:
 | 
			
		||||
        self.is_last_owner = is_last_owner
 | 
			
		||||
        self.entity = _("organization owner") if is_last_owner else _("user")
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def msg_format() -> str:
 | 
			
		||||
 
 | 
			
		||||
@@ -454,6 +454,11 @@ class Realm(models.Model):
 | 
			
		||||
        # TODO: Change return type to QuerySet[UserProfile]
 | 
			
		||||
        return UserProfile.objects.filter(realm=self, is_active=True).select_related()
 | 
			
		||||
 | 
			
		||||
    def get_human_owner_users(self) -> QuerySet:
 | 
			
		||||
        return UserProfile.objects.filter(realm=self, is_bot=False,
 | 
			
		||||
                                          role=UserProfile.ROLE_REALM_OWNER,
 | 
			
		||||
                                          is_active=True)
 | 
			
		||||
 | 
			
		||||
    def get_bot_domain(self) -> str:
 | 
			
		||||
        return get_fake_email_domain()
 | 
			
		||||
 | 
			
		||||
@@ -873,6 +878,7 @@ class UserProfile(AbstractBaseUser, PermissionsMixin):
 | 
			
		||||
    role: int = models.PositiveSmallIntegerField(default=ROLE_MEMBER, db_index=True)
 | 
			
		||||
 | 
			
		||||
    ROLE_TYPES = [
 | 
			
		||||
        ROLE_REALM_OWNER,
 | 
			
		||||
        ROLE_REALM_ADMINISTRATOR,
 | 
			
		||||
        ROLE_MEMBER,
 | 
			
		||||
        ROLE_GUEST,
 | 
			
		||||
 
 | 
			
		||||
@@ -3878,23 +3878,21 @@ class DeactivateUserTest(ZulipTestCase):
 | 
			
		||||
        password = initial_password(email)
 | 
			
		||||
        self.assert_login_failure(email, password=password)
 | 
			
		||||
 | 
			
		||||
    def test_do_not_deactivate_final_admin(self) -> None:
 | 
			
		||||
        user = self.example_user('iago')
 | 
			
		||||
        user_2 = self.example_user('desdemona')
 | 
			
		||||
        do_change_user_role(user_2, UserProfile.ROLE_MEMBER)
 | 
			
		||||
        self.assertFalse(user_2.is_realm_admin)
 | 
			
		||||
    def test_do_not_deactivate_final_owner(self) -> None:
 | 
			
		||||
        user = self.example_user('desdemona')
 | 
			
		||||
        user_2 = self.example_user('iago')
 | 
			
		||||
        self.login_user(user)
 | 
			
		||||
        self.assertTrue(user.is_active)
 | 
			
		||||
        result = self.client_delete('/json/users/me')
 | 
			
		||||
        self.assert_json_error(result, "Cannot deactivate the only organization administrator.")
 | 
			
		||||
        user = self.example_user('iago')
 | 
			
		||||
        self.assert_json_error(result, "Cannot deactivate the only organization owner.")
 | 
			
		||||
        user = self.example_user('desdemona')
 | 
			
		||||
        self.assertTrue(user.is_active)
 | 
			
		||||
        self.assertTrue(user.is_realm_admin)
 | 
			
		||||
        do_change_user_role(user_2, UserProfile.ROLE_REALM_ADMINISTRATOR)
 | 
			
		||||
        self.assertTrue(user_2.is_realm_admin)
 | 
			
		||||
        self.assertTrue(user.is_realm_owner)
 | 
			
		||||
        do_change_user_role(user_2, UserProfile.ROLE_REALM_OWNER)
 | 
			
		||||
        self.assertTrue(user_2.is_realm_owner)
 | 
			
		||||
        result = self.client_delete('/json/users/me')
 | 
			
		||||
        self.assert_json_success(result)
 | 
			
		||||
        do_change_user_role(user, UserProfile.ROLE_REALM_ADMINISTRATOR)
 | 
			
		||||
        do_change_user_role(user, UserProfile.ROLE_REALM_OWNER)
 | 
			
		||||
 | 
			
		||||
    def test_do_not_deactivate_final_user(self) -> None:
 | 
			
		||||
        realm = get_realm('zulip')
 | 
			
		||||
 
 | 
			
		||||
@@ -119,12 +119,73 @@ class PermissionTest(ZulipTestCase):
 | 
			
		||||
        result = self.client_patch(f'/json/users/{invalid_user_id}', {})
 | 
			
		||||
        self.assert_json_error(result, 'No such user')
 | 
			
		||||
 | 
			
		||||
    def test_owner_api(self) -> None:
 | 
			
		||||
        self.login('iago')
 | 
			
		||||
 | 
			
		||||
        desdemona = self.example_user('desdemona')
 | 
			
		||||
        othello = self.example_user('othello')
 | 
			
		||||
        iago = self.example_user('iago')
 | 
			
		||||
        realm = iago.realm
 | 
			
		||||
 | 
			
		||||
        do_change_user_role(iago, UserProfile.ROLE_REALM_OWNER)
 | 
			
		||||
 | 
			
		||||
        result = self.client_get('/json/users')
 | 
			
		||||
        self.assert_json_success(result)
 | 
			
		||||
        members = result.json()['members']
 | 
			
		||||
        iago_dict = find_dict(members, 'email', iago.email)
 | 
			
		||||
        self.assertTrue(iago_dict['is_owner'])
 | 
			
		||||
        othello_dict = find_dict(members, 'email', othello.email)
 | 
			
		||||
        self.assertFalse(othello_dict['is_owner'])
 | 
			
		||||
 | 
			
		||||
        req = dict(role=UserProfile.ROLE_REALM_OWNER)
 | 
			
		||||
        events: List[Mapping[str, Any]] = []
 | 
			
		||||
        with tornado_redirected_to_list(events):
 | 
			
		||||
            result = self.client_patch(f'/json/users/{othello.id}', req)
 | 
			
		||||
        self.assert_json_success(result)
 | 
			
		||||
        owner_users = realm.get_human_owner_users()
 | 
			
		||||
        self.assertTrue(othello in owner_users)
 | 
			
		||||
        person = events[0]['event']['person']
 | 
			
		||||
        self.assertEqual(person['user_id'], othello.id)
 | 
			
		||||
        self.assertEqual(person['role'], UserProfile.ROLE_REALM_OWNER)
 | 
			
		||||
 | 
			
		||||
        req = dict(role=UserProfile.ROLE_MEMBER)
 | 
			
		||||
        events = []
 | 
			
		||||
        with tornado_redirected_to_list(events):
 | 
			
		||||
            result = self.client_patch(f'/json/users/{othello.id}', req)
 | 
			
		||||
        self.assert_json_success(result)
 | 
			
		||||
        owner_users = realm.get_human_owner_users()
 | 
			
		||||
        self.assertFalse(othello in owner_users)
 | 
			
		||||
        person = events[0]['event']['person']
 | 
			
		||||
        self.assertEqual(person['user_id'], othello.id)
 | 
			
		||||
        self.assertEqual(person['role'], UserProfile.ROLE_MEMBER)
 | 
			
		||||
 | 
			
		||||
        # Cannot take away from last owner
 | 
			
		||||
        self.login('desdemona')
 | 
			
		||||
        req = dict(role=UserProfile.ROLE_MEMBER)
 | 
			
		||||
        events = []
 | 
			
		||||
        with tornado_redirected_to_list(events):
 | 
			
		||||
            result = self.client_patch(f'/json/users/{iago.id}', req)
 | 
			
		||||
        self.assert_json_success(result)
 | 
			
		||||
        owner_users = realm.get_human_owner_users()
 | 
			
		||||
        self.assertFalse(iago in owner_users)
 | 
			
		||||
        person = events[0]['event']['person']
 | 
			
		||||
        self.assertEqual(person['user_id'], iago.id)
 | 
			
		||||
        self.assertEqual(person['role'], UserProfile.ROLE_MEMBER)
 | 
			
		||||
        with tornado_redirected_to_list([]):
 | 
			
		||||
            result = self.client_patch(f'/json/users/{desdemona.id}', req)
 | 
			
		||||
        self.assert_json_error(result, 'The owner permission cannot be removed from the only organization owner.')
 | 
			
		||||
 | 
			
		||||
        do_change_user_role(iago, UserProfile.ROLE_REALM_ADMINISTRATOR)
 | 
			
		||||
        self.login('iago')
 | 
			
		||||
        with tornado_redirected_to_list([]):
 | 
			
		||||
            result = self.client_patch(f'/json/users/{desdemona.id}', req)
 | 
			
		||||
        self.assert_json_error(result, 'Only organization owners can add or remove the owner permission.')
 | 
			
		||||
 | 
			
		||||
    def test_admin_api(self) -> None:
 | 
			
		||||
        self.login('desdemona')
 | 
			
		||||
 | 
			
		||||
        hamlet = self.example_user('hamlet')
 | 
			
		||||
        othello = self.example_user('othello')
 | 
			
		||||
        iago = self.example_user('iago')
 | 
			
		||||
        desdemona = self.example_user('desdemona')
 | 
			
		||||
        realm = hamlet.realm
 | 
			
		||||
 | 
			
		||||
@@ -162,22 +223,6 @@ class PermissionTest(ZulipTestCase):
 | 
			
		||||
        self.assertEqual(person['user_id'], othello.id)
 | 
			
		||||
        self.assertEqual(person['role'], UserProfile.ROLE_MEMBER)
 | 
			
		||||
 | 
			
		||||
        # Cannot take away from last admin
 | 
			
		||||
        self.login('iago')
 | 
			
		||||
        req = dict(role=ujson.dumps(UserProfile.ROLE_MEMBER))
 | 
			
		||||
        events = []
 | 
			
		||||
        with tornado_redirected_to_list(events):
 | 
			
		||||
            result = self.client_patch(f'/json/users/{desdemona.id}', req)
 | 
			
		||||
        self.assert_json_success(result)
 | 
			
		||||
        admin_users = realm.get_human_admin_users()
 | 
			
		||||
        self.assertFalse(desdemona in admin_users)
 | 
			
		||||
        person = events[0]['event']['person']
 | 
			
		||||
        self.assertEqual(person['user_id'], desdemona.id)
 | 
			
		||||
        self.assertEqual(person['role'], UserProfile.ROLE_MEMBER)
 | 
			
		||||
        with tornado_redirected_to_list([]):
 | 
			
		||||
            result = self.client_patch(f'/json/users/{iago.id}', req)
 | 
			
		||||
        self.assert_json_error(result, 'Cannot remove the only organization administrator')
 | 
			
		||||
 | 
			
		||||
        # Make sure only admins can patch other user's info.
 | 
			
		||||
        self.login('othello')
 | 
			
		||||
        result = self.client_patch(f'/json/users/{hamlet.id}', req)
 | 
			
		||||
@@ -432,6 +477,100 @@ class PermissionTest(ZulipTestCase):
 | 
			
		||||
        self.assertEqual(person['user_id'], polonius.id)
 | 
			
		||||
        self.assertEqual(person['role'], UserProfile.ROLE_REALM_ADMINISTRATOR)
 | 
			
		||||
 | 
			
		||||
    def test_change_owner_to_guest(self) -> None:
 | 
			
		||||
        self.login("desdemona")
 | 
			
		||||
        iago = self.example_user("iago")
 | 
			
		||||
        do_change_user_role(iago, UserProfile.ROLE_REALM_OWNER)
 | 
			
		||||
        self.assertFalse(iago.is_guest)
 | 
			
		||||
        self.assertTrue(iago.is_realm_owner)
 | 
			
		||||
 | 
			
		||||
        # Test changing a user from owner to guest and revoking owner status
 | 
			
		||||
        iago = self.example_user("iago")
 | 
			
		||||
        self.assertFalse(iago.is_guest)
 | 
			
		||||
        req = dict(role=UserProfile.ROLE_GUEST)
 | 
			
		||||
        events: List[Mapping[str, Any]] = []
 | 
			
		||||
        with tornado_redirected_to_list(events):
 | 
			
		||||
            result = self.client_patch('/json/users/{}'.format(iago.id), req)
 | 
			
		||||
        self.assert_json_success(result)
 | 
			
		||||
 | 
			
		||||
        iago = self.example_user("iago")
 | 
			
		||||
        self.assertTrue(iago.is_guest)
 | 
			
		||||
        self.assertFalse(iago.is_realm_owner)
 | 
			
		||||
 | 
			
		||||
        person = events[0]['event']['person']
 | 
			
		||||
        self.assertEqual(person['user_id'], iago.id)
 | 
			
		||||
        self.assertEqual(person['role'], UserProfile.ROLE_GUEST)
 | 
			
		||||
 | 
			
		||||
    def test_change_guest_to_owner(self) -> None:
 | 
			
		||||
        desdemona = self.example_user("desdemona")
 | 
			
		||||
        self.login_user(desdemona)
 | 
			
		||||
        polonius = self.example_user("polonius")
 | 
			
		||||
        self.assertTrue(polonius.is_guest)
 | 
			
		||||
        self.assertFalse(polonius.is_realm_owner)
 | 
			
		||||
 | 
			
		||||
        # Test changing a user from guest to admin and revoking guest status
 | 
			
		||||
        polonius = self.example_user("polonius")
 | 
			
		||||
        self.assertFalse(polonius.is_realm_owner)
 | 
			
		||||
        req = dict(role=UserProfile.ROLE_REALM_OWNER)
 | 
			
		||||
        events: List[Mapping[str, Any]] = []
 | 
			
		||||
        with tornado_redirected_to_list(events):
 | 
			
		||||
            result = self.client_patch('/json/users/{}'.format(polonius.id), req)
 | 
			
		||||
        self.assert_json_success(result)
 | 
			
		||||
 | 
			
		||||
        polonius = self.example_user("polonius")
 | 
			
		||||
        self.assertFalse(polonius.is_guest)
 | 
			
		||||
        self.assertTrue(polonius.is_realm_owner)
 | 
			
		||||
 | 
			
		||||
        person = events[0]['event']['person']
 | 
			
		||||
        self.assertEqual(person['user_id'], polonius.id)
 | 
			
		||||
        self.assertEqual(person['role'], UserProfile.ROLE_REALM_OWNER)
 | 
			
		||||
 | 
			
		||||
    def test_change_admin_to_owner(self) -> None:
 | 
			
		||||
        desdemona = self.example_user("desdemona")
 | 
			
		||||
        self.login_user(desdemona)
 | 
			
		||||
        iago = self.example_user("iago")
 | 
			
		||||
        self.assertTrue(iago.is_realm_admin)
 | 
			
		||||
        self.assertFalse(iago.is_realm_owner)
 | 
			
		||||
 | 
			
		||||
        # Test changing a user from admin to owner and revoking admin status
 | 
			
		||||
        iago = self.example_user("iago")
 | 
			
		||||
        self.assertFalse(iago.is_realm_owner)
 | 
			
		||||
        req = dict(role=UserProfile.ROLE_REALM_OWNER)
 | 
			
		||||
        events: List[Mapping[str, Any]] = []
 | 
			
		||||
        with tornado_redirected_to_list(events):
 | 
			
		||||
            result = self.client_patch('/json/users/{}'.format(iago.id), req)
 | 
			
		||||
        self.assert_json_success(result)
 | 
			
		||||
 | 
			
		||||
        iago = self.example_user("iago")
 | 
			
		||||
        self.assertTrue(iago.is_realm_owner)
 | 
			
		||||
 | 
			
		||||
        person = events[0]['event']['person']
 | 
			
		||||
        self.assertEqual(person['user_id'], iago.id)
 | 
			
		||||
        self.assertEqual(person['role'], UserProfile.ROLE_REALM_OWNER)
 | 
			
		||||
 | 
			
		||||
    def test_change_owner_to_admin(self) -> None:
 | 
			
		||||
        desdemona = self.example_user("desdemona")
 | 
			
		||||
        self.login_user(desdemona)
 | 
			
		||||
        iago = self.example_user("iago")
 | 
			
		||||
        do_change_user_role(iago, UserProfile.ROLE_REALM_OWNER)
 | 
			
		||||
        self.assertTrue(iago.is_realm_owner)
 | 
			
		||||
 | 
			
		||||
        # Test changing a user from admin to owner and revoking admin status
 | 
			
		||||
        iago = self.example_user("iago")
 | 
			
		||||
        self.assertTrue(iago.is_realm_owner)
 | 
			
		||||
        req = dict(role=UserProfile.ROLE_REALM_ADMINISTRATOR)
 | 
			
		||||
        events: List[Mapping[str, Any]] = []
 | 
			
		||||
        with tornado_redirected_to_list(events):
 | 
			
		||||
            result = self.client_patch('/json/users/{}'.format(iago.id), req)
 | 
			
		||||
        self.assert_json_success(result)
 | 
			
		||||
 | 
			
		||||
        iago = self.example_user("iago")
 | 
			
		||||
        self.assertFalse(iago.is_realm_owner)
 | 
			
		||||
 | 
			
		||||
        person = events[0]['event']['person']
 | 
			
		||||
        self.assertEqual(person['user_id'], iago.id)
 | 
			
		||||
        self.assertEqual(person['role'], UserProfile.ROLE_REALM_ADMINISTRATOR)
 | 
			
		||||
 | 
			
		||||
    def test_admin_user_can_change_profile_data(self) -> None:
 | 
			
		||||
        realm = get_realm('zulip')
 | 
			
		||||
        self.login('iago')
 | 
			
		||||
@@ -981,8 +1120,10 @@ class ActivateTest(ZulipTestCase):
 | 
			
		||||
        self.assertTrue(user.is_active)
 | 
			
		||||
 | 
			
		||||
    def test_api_with_nonexistent_user(self) -> None:
 | 
			
		||||
        admin = self.example_user('desdemona')
 | 
			
		||||
        self.login_user(admin)
 | 
			
		||||
        self.login('desdemona')
 | 
			
		||||
 | 
			
		||||
        iago = self.example_user('iago')
 | 
			
		||||
        do_change_user_role(iago, UserProfile.ROLE_REALM_OWNER)
 | 
			
		||||
 | 
			
		||||
        # Cannot deactivate a user with the bot api
 | 
			
		||||
        result = self.client_delete('/json/bots/{}'.format(self.example_user("hamlet").id))
 | 
			
		||||
@@ -996,11 +1137,11 @@ class ActivateTest(ZulipTestCase):
 | 
			
		||||
        result = self.client_delete('/json/users/{}'.format(self.example_user("webhook_bot").id))
 | 
			
		||||
        self.assert_json_error(result, 'No such user')
 | 
			
		||||
 | 
			
		||||
        result = self.client_delete('/json/users/{}'.format(self.example_user("iago").id))
 | 
			
		||||
        result = self.client_delete('/json/users/{}'.format(iago.id))
 | 
			
		||||
        self.assert_json_success(result)
 | 
			
		||||
 | 
			
		||||
        result = self.client_delete(f'/json/users/{admin.id}')
 | 
			
		||||
        self.assert_json_error(result, 'Cannot deactivate the only organization administrator')
 | 
			
		||||
        result = self.client_delete(f'/json/users/{self.example_user("desdemona").id}')
 | 
			
		||||
        self.assert_json_error(result, 'Cannot deactivate the only organization owner')
 | 
			
		||||
 | 
			
		||||
        # Cannot reactivate a nonexistent user.
 | 
			
		||||
        invalid_user_id = 1000
 | 
			
		||||
 
 | 
			
		||||
@@ -39,26 +39,26 @@ from zerver.models import UserProfile, Stream, Message, \
 | 
			
		||||
    InvalidFakeEmailDomain
 | 
			
		||||
from zproject.backends import check_password_strength
 | 
			
		||||
 | 
			
		||||
def check_last_owner(user_profile: UserProfile) -> bool:
 | 
			
		||||
    owners = set(user_profile.realm.get_human_owner_users())
 | 
			
		||||
    return user_profile.is_realm_owner and not user_profile.is_bot and len(owners) == 1
 | 
			
		||||
 | 
			
		||||
def deactivate_user_backend(request: HttpRequest, user_profile: UserProfile,
 | 
			
		||||
                            user_id: int) -> HttpResponse:
 | 
			
		||||
    target = access_user_by_id(user_profile, user_id)
 | 
			
		||||
    if check_last_admin(target):
 | 
			
		||||
        return json_error(_('Cannot deactivate the only organization administrator'))
 | 
			
		||||
    if check_last_owner(target):
 | 
			
		||||
        return json_error(_('Cannot deactivate the only organization owner'))
 | 
			
		||||
    return _deactivate_user_profile_backend(request, user_profile, target)
 | 
			
		||||
 | 
			
		||||
def deactivate_user_own_backend(request: HttpRequest, user_profile: UserProfile) -> HttpResponse:
 | 
			
		||||
    if UserProfile.objects.filter(realm=user_profile.realm, is_active=True).count() == 1:
 | 
			
		||||
        raise CannotDeactivateLastUserError(is_last_admin=False)
 | 
			
		||||
    if user_profile.is_realm_admin and check_last_admin(user_profile):
 | 
			
		||||
        raise CannotDeactivateLastUserError(is_last_admin=True)
 | 
			
		||||
        raise CannotDeactivateLastUserError(is_last_owner=False)
 | 
			
		||||
    if user_profile.is_realm_owner and check_last_owner(user_profile):
 | 
			
		||||
        raise CannotDeactivateLastUserError(is_last_owner=True)
 | 
			
		||||
 | 
			
		||||
    do_deactivate_user(user_profile, acting_user=user_profile)
 | 
			
		||||
    return json_success()
 | 
			
		||||
 | 
			
		||||
def check_last_admin(user_profile: UserProfile) -> bool:
 | 
			
		||||
    admins = set(user_profile.realm.get_human_admin_users())
 | 
			
		||||
    return user_profile.is_realm_admin and not user_profile.is_bot and len(admins) == 1
 | 
			
		||||
 | 
			
		||||
def deactivate_bot_backend(request: HttpRequest, user_profile: UserProfile,
 | 
			
		||||
                           bot_id: int) -> HttpResponse:
 | 
			
		||||
    target = access_bot_by_id(user_profile, bot_id)
 | 
			
		||||
@@ -89,8 +89,10 @@ def update_user_backend(request: HttpRequest, user_profile: UserProfile, user_id
 | 
			
		||||
    target = access_user_by_id(user_profile, user_id, allow_deactivated=True, allow_bots=True)
 | 
			
		||||
 | 
			
		||||
    if role is not None and target.role != role:
 | 
			
		||||
        if target.role == UserProfile.ROLE_REALM_ADMINISTRATOR and check_last_admin(user_profile):
 | 
			
		||||
            return json_error(_('Cannot remove the only organization administrator'))
 | 
			
		||||
        if target.role == UserProfile.ROLE_REALM_OWNER and check_last_owner(user_profile):
 | 
			
		||||
            return json_error(_('The owner permission cannot be removed from the only organization owner.'))
 | 
			
		||||
        if UserProfile.ROLE_REALM_OWNER in [role, target.role] and not user_profile.is_realm_owner:
 | 
			
		||||
            return json_error(_('Only organization owners can add or remove the owner permission.'))
 | 
			
		||||
        do_change_user_role(target, role)
 | 
			
		||||
 | 
			
		||||
    if (full_name is not None and target.full_name != full_name and
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user