diff --git a/zerver/lib/users.py b/zerver/lib/users.py index b2c9fc5e57..04ca7a24d3 100644 --- a/zerver/lib/users.py +++ b/zerver/lib/users.py @@ -42,6 +42,7 @@ from zerver.models import ( get_fake_email_domain, get_realm_user_dicts, get_user, + get_user_by_id_in_realm_including_cross_realm, get_user_profile_by_id_in_realm, is_cross_realm_bot_email, ) @@ -284,6 +285,23 @@ def access_user_by_id( return access_user_common(target, user_profile, allow_deactivated, allow_bots, for_admin) +def access_user_by_id_including_cross_realm( + user_profile: UserProfile, + target_user_id: int, + *, + allow_deactivated: bool = False, + allow_bots: bool = False, + for_admin: bool, +) -> UserProfile: + """Variant of access_user_by_id allowing cross-realm bots to be accessed.""" + try: + target = get_user_by_id_in_realm_including_cross_realm(target_user_id, user_profile.realm) + except UserProfile.DoesNotExist: + raise JsonableError(_("No such user")) + + return access_user_common(target, user_profile, allow_deactivated, allow_bots, for_admin) + + def access_user_by_email( user_profile: UserProfile, email: str, diff --git a/zerver/tests/test_users.py b/zerver/tests/test_users.py index 84330c75ac..d8a5a6bb48 100644 --- a/zerver/tests/test_users.py +++ b/zerver/tests/test_users.py @@ -50,6 +50,7 @@ from zerver.lib.user_groups import get_system_user_group_for_user from zerver.lib.users import ( Account, access_user_by_id, + access_user_by_id_including_cross_realm, get_accounts_for_email, get_cross_realm_dicts, user_ids_to_users, @@ -428,37 +429,76 @@ class PermissionTest(ZulipTestCase): def test_access_user_by_id(self) -> None: iago = self.example_user("iago") + internal_realm = get_realm(settings.SYSTEM_BOT_REALM) # Must be a valid user ID in the realm with self.assertRaises(JsonableError): access_user_by_id(iago, 1234, for_admin=False) + with self.assertRaises(JsonableError): + access_user_by_id_including_cross_realm(iago, 1234, for_admin=False) with self.assertRaises(JsonableError): access_user_by_id(iago, self.mit_user("sipbtest").id, for_admin=False) + with self.assertRaises(JsonableError): + access_user_by_id_including_cross_realm( + iago, self.mit_user("sipbtest").id, for_admin=False + ) # Can only access bot users if allow_bots is passed bot = self.example_user("default_bot") access_user_by_id(iago, bot.id, allow_bots=True, for_admin=True) + access_user_by_id_including_cross_realm(iago, bot.id, allow_bots=True, for_admin=True) with self.assertRaises(JsonableError): access_user_by_id(iago, bot.id, for_admin=True) + with self.assertRaises(JsonableError): + access_user_by_id_including_cross_realm(iago, bot.id, for_admin=True) + + # Only the including_cross_realm variant works for system bots. + system_bot = get_system_bot(settings.WELCOME_BOT, internal_realm.id) + with self.assertRaises(JsonableError): + access_user_by_id(iago, system_bot.id, allow_bots=True, for_admin=False) + access_user_by_id_including_cross_realm( + iago, system_bot.id, allow_bots=True, for_admin=False + ) + # And even then, only if `allow_bots` was passed. + with self.assertRaises(JsonableError): + access_user_by_id(iago, system_bot.id, for_admin=False) + with self.assertRaises(JsonableError): + access_user_by_id_including_cross_realm(iago, system_bot.id, for_admin=False) # Can only access deactivated users if allow_deactivated is passed hamlet = self.example_user("hamlet") do_deactivate_user(hamlet, acting_user=None) with self.assertRaises(JsonableError): access_user_by_id(iago, hamlet.id, for_admin=False) + with self.assertRaises(JsonableError): + access_user_by_id_including_cross_realm(iago, hamlet.id, for_admin=False) + with self.assertRaises(JsonableError): access_user_by_id(iago, hamlet.id, for_admin=True) + with self.assertRaises(JsonableError): + access_user_by_id_including_cross_realm(iago, hamlet.id, for_admin=True) access_user_by_id(iago, hamlet.id, allow_deactivated=True, for_admin=True) + access_user_by_id_including_cross_realm( + iago, hamlet.id, allow_deactivated=True, for_admin=True + ) # Non-admin user can't admin another user with self.assertRaises(JsonableError): access_user_by_id( self.example_user("cordelia"), self.example_user("aaron").id, for_admin=True ) + with self.assertRaises(JsonableError): + access_user_by_id_including_cross_realm( + self.example_user("cordelia"), self.example_user("aaron").id, for_admin=True + ) + # But does have read-only access to it. access_user_by_id( self.example_user("cordelia"), self.example_user("aaron").id, for_admin=False ) + access_user_by_id_including_cross_realm( + self.example_user("cordelia"), self.example_user("aaron").id, for_admin=False + ) def check_property_for_role(self, user_profile: UserProfile, role: int) -> bool: if role == UserProfile.ROLE_REALM_ADMINISTRATOR: diff --git a/zerver/views/muted_users.py b/zerver/views/muted_users.py index 42e7b97ffd..f4e621aa1d 100644 --- a/zerver/views/muted_users.py +++ b/zerver/views/muted_users.py @@ -7,7 +7,7 @@ from zerver.actions.muted_users import do_mute_user, do_unmute_user from zerver.lib.exceptions import JsonableError from zerver.lib.muted_users import get_mute_object from zerver.lib.response import json_success -from zerver.lib.users import access_user_by_id +from zerver.lib.users import access_user_by_id_including_cross_realm from zerver.models import UserProfile @@ -23,7 +23,7 @@ def mute_user(request: HttpRequest, user_profile: UserProfile, muted_user_id: in # # But it's quite possibly something nobody will try to do, so we # just reuse the existing shared code path. - muted_user = access_user_by_id( + muted_user = access_user_by_id_including_cross_realm( user_profile, muted_user_id, allow_bots=True, allow_deactivated=True, for_admin=False ) date_muted = timezone_now() @@ -39,7 +39,7 @@ def mute_user(request: HttpRequest, user_profile: UserProfile, muted_user_id: in def unmute_user( request: HttpRequest, user_profile: UserProfile, muted_user_id: int ) -> HttpResponse: - muted_user = access_user_by_id( + muted_user = access_user_by_id_including_cross_realm( user_profile, muted_user_id, allow_bots=True, allow_deactivated=True, for_admin=False ) mute_object = get_mute_object(user_profile, muted_user)