mirror of
				https://github.com/zulip/zulip.git
				synced 2025-11-04 05:53:43 +00:00 
			
		
		
		
	This new data model will be more extensible for future work on features like a primary administrator.
		
			
				
	
	
		
			1284 lines
		
	
	
		
			53 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			1284 lines
		
	
	
		
			53 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# -*- coding: utf-8 -*-
 | 
						|
 | 
						|
from email.utils import parseaddr
 | 
						|
 | 
						|
from typing import (Any, Dict, Iterable, List, Mapping,
 | 
						|
                    Optional, TypeVar, Union)
 | 
						|
 | 
						|
from zerver.lib.test_helpers import (
 | 
						|
    queries_captured, simulated_empty_cache,
 | 
						|
    tornado_redirected_to_list, get_subscription,
 | 
						|
    most_recent_message,
 | 
						|
)
 | 
						|
from zerver.lib.test_classes import (
 | 
						|
    ZulipTestCase,
 | 
						|
)
 | 
						|
 | 
						|
from zerver.models import UserProfile, Recipient, Realm, \
 | 
						|
    RealmDomain, UserHotspot, get_client, \
 | 
						|
    get_user, get_realm, get_stream, get_stream_recipient, \
 | 
						|
    get_source_profile, get_system_bot, \
 | 
						|
    ScheduledEmail, check_valid_user_ids, \
 | 
						|
    get_user_by_id_in_realm_including_cross_realm, CustomProfileField, \
 | 
						|
    InvalidFakeEmailDomain, get_fake_email_domain
 | 
						|
 | 
						|
from zerver.lib.avatar import avatar_url, get_gravatar_url
 | 
						|
from zerver.lib.exceptions import JsonableError
 | 
						|
from zerver.lib.send_email import send_future_email, clear_scheduled_emails, \
 | 
						|
    deliver_email
 | 
						|
from zerver.lib.actions import (
 | 
						|
    get_emails_from_user_ids,
 | 
						|
    get_recipient_info,
 | 
						|
    do_deactivate_user,
 | 
						|
    do_reactivate_user,
 | 
						|
    do_change_is_admin,
 | 
						|
    do_create_user,
 | 
						|
    do_set_realm_property,
 | 
						|
)
 | 
						|
from zerver.lib.create_user import copy_user_settings
 | 
						|
from zerver.lib.events import do_events_register
 | 
						|
from zerver.lib.topic_mutes import add_topic_mute
 | 
						|
from zerver.lib.stream_topic import StreamTopicTarget
 | 
						|
from zerver.lib.users import user_ids_to_users, access_user_by_id, \
 | 
						|
    get_accounts_for_email
 | 
						|
 | 
						|
from django.conf import settings
 | 
						|
from django.test import override_settings
 | 
						|
 | 
						|
import datetime
 | 
						|
import mock
 | 
						|
import ujson
 | 
						|
 | 
						|
K = TypeVar('K')
 | 
						|
V = TypeVar('V')
 | 
						|
def find_dict(lst: Iterable[Dict[K, V]], k: K, v: V) -> Dict[K, V]:
 | 
						|
    for dct in lst:
 | 
						|
        if dct[k] == v:
 | 
						|
            return dct
 | 
						|
    raise AssertionError('Cannot find element in list where key %s == %s' % (k, v))
 | 
						|
 | 
						|
class PermissionTest(ZulipTestCase):
 | 
						|
    def test_do_change_is_admin(self) -> None:
 | 
						|
        """
 | 
						|
        Ensures change_is_admin raises an AssertionError when invalid permissions
 | 
						|
        are provided to it.
 | 
						|
        """
 | 
						|
 | 
						|
        # this should work fine
 | 
						|
        user_profile = self.example_user('hamlet')
 | 
						|
        do_change_is_admin(user_profile, True)
 | 
						|
 | 
						|
        # this should work a-ok as well
 | 
						|
        do_change_is_admin(user_profile, True, permission='administer')
 | 
						|
 | 
						|
        # this should "fail" with an AssertionError
 | 
						|
        with self.assertRaises(AssertionError):
 | 
						|
            do_change_is_admin(user_profile, True, permission='totally-not-valid-perm')
 | 
						|
 | 
						|
    def test_get_admin_users(self) -> None:
 | 
						|
        user_profile = self.example_user('hamlet')
 | 
						|
        do_change_is_admin(user_profile, False)
 | 
						|
        admin_users = user_profile.realm.get_human_admin_users()
 | 
						|
        self.assertFalse(user_profile in admin_users)
 | 
						|
        admin_users = user_profile.realm.get_admin_users_and_bots()
 | 
						|
        self.assertFalse(user_profile in admin_users)
 | 
						|
 | 
						|
        do_change_is_admin(user_profile, True)
 | 
						|
        admin_users = user_profile.realm.get_human_admin_users()
 | 
						|
        self.assertTrue(user_profile in admin_users)
 | 
						|
        admin_users = user_profile.realm.get_admin_users_and_bots()
 | 
						|
        self.assertTrue(user_profile in admin_users)
 | 
						|
 | 
						|
    def test_updating_non_existent_user(self) -> None:
 | 
						|
        self.login(self.example_email("hamlet"))
 | 
						|
        admin = self.example_user('hamlet')
 | 
						|
        do_change_is_admin(admin, True)
 | 
						|
 | 
						|
        invalid_user_id = 1000
 | 
						|
        result = self.client_patch('/json/users/{}'.format(invalid_user_id), {})
 | 
						|
        self.assert_json_error(result, 'No such user')
 | 
						|
 | 
						|
    def test_admin_api(self) -> None:
 | 
						|
        self.login(self.example_email("hamlet"))
 | 
						|
        admin = self.example_user('hamlet')
 | 
						|
        user = self.example_user('othello')
 | 
						|
        realm = admin.realm
 | 
						|
        do_change_is_admin(admin, True)
 | 
						|
 | 
						|
        # Make sure we see is_admin flag in /json/users
 | 
						|
        result = self.client_get('/json/users')
 | 
						|
        self.assert_json_success(result)
 | 
						|
        members = result.json()['members']
 | 
						|
        hamlet = find_dict(members, 'email', self.example_email("hamlet"))
 | 
						|
        self.assertTrue(hamlet['is_admin'])
 | 
						|
        othello = find_dict(members, 'email', self.example_email("othello"))
 | 
						|
        self.assertFalse(othello['is_admin'])
 | 
						|
 | 
						|
        # Giveth
 | 
						|
        req = dict(is_admin=ujson.dumps(True))
 | 
						|
 | 
						|
        events = []  # type: List[Mapping[str, Any]]
 | 
						|
        with tornado_redirected_to_list(events):
 | 
						|
            result = self.client_patch('/json/users/{}'.format(self.example_user("othello").id), req)
 | 
						|
        self.assert_json_success(result)
 | 
						|
        admin_users = realm.get_human_admin_users()
 | 
						|
        self.assertTrue(user in admin_users)
 | 
						|
        person = events[0]['event']['person']
 | 
						|
        self.assertEqual(person['email'], self.example_email("othello"))
 | 
						|
        self.assertEqual(person['is_admin'], True)
 | 
						|
 | 
						|
        # Taketh away
 | 
						|
        req = dict(is_admin=ujson.dumps(False))
 | 
						|
        events = []
 | 
						|
        with tornado_redirected_to_list(events):
 | 
						|
            result = self.client_patch('/json/users/{}'.format(self.example_user("othello").id), req)
 | 
						|
        self.assert_json_success(result)
 | 
						|
        admin_users = realm.get_human_admin_users()
 | 
						|
        self.assertFalse(user in admin_users)
 | 
						|
        person = events[0]['event']['person']
 | 
						|
        self.assertEqual(person['email'], self.example_email("othello"))
 | 
						|
        self.assertEqual(person['is_admin'], False)
 | 
						|
 | 
						|
        # Cannot take away from last admin
 | 
						|
        self.login(self.example_email("iago"))
 | 
						|
        req = dict(is_admin=ujson.dumps(False))
 | 
						|
        events = []
 | 
						|
        with tornado_redirected_to_list(events):
 | 
						|
            result = self.client_patch('/json/users/{}'.format(self.example_user("hamlet").id), req)
 | 
						|
        self.assert_json_success(result)
 | 
						|
        admin_users = realm.get_human_admin_users()
 | 
						|
        self.assertFalse(admin in admin_users)
 | 
						|
        person = events[0]['event']['person']
 | 
						|
        self.assertEqual(person['email'], self.example_email("hamlet"))
 | 
						|
        self.assertEqual(person['is_admin'], False)
 | 
						|
        with tornado_redirected_to_list([]):
 | 
						|
            result = self.client_patch('/json/users/{}'.format(self.example_user("iago").id), req)
 | 
						|
        self.assert_json_error(result, 'Cannot remove the only organization administrator')
 | 
						|
 | 
						|
        # Make sure only admins can patch other user's info.
 | 
						|
        self.login(self.example_email("othello"))
 | 
						|
        result = self.client_patch('/json/users/{}'.format(self.example_user("hamlet").id), req)
 | 
						|
        self.assert_json_error(result, 'Insufficient permission')
 | 
						|
 | 
						|
    def test_admin_api_hide_emails(self) -> None:
 | 
						|
        user = self.example_user('hamlet')
 | 
						|
        admin = self.example_user('iago')
 | 
						|
        self.login(user.email)
 | 
						|
 | 
						|
        # First, verify client_gravatar works normally
 | 
						|
        result = self.client_get('/json/users?client_gravatar=true')
 | 
						|
        self.assert_json_success(result)
 | 
						|
        members = result.json()['members']
 | 
						|
        hamlet = find_dict(members, 'user_id', user.id)
 | 
						|
        self.assertEqual(hamlet['email'], user.email)
 | 
						|
        self.assertIsNone(hamlet['avatar_url'])
 | 
						|
        self.assertNotIn('delivery_email', hamlet)
 | 
						|
 | 
						|
        # Also verify the /events code path.  This is a bit hacky, but
 | 
						|
        # we need to verify client_gravatar is not being overridden.
 | 
						|
        with mock.patch('zerver.lib.events.request_event_queue',
 | 
						|
                        return_value=None) as mock_request_event_queue:
 | 
						|
            with self.assertRaises(JsonableError):
 | 
						|
                result = do_events_register(user, get_client("website"),
 | 
						|
                                            client_gravatar=True)
 | 
						|
            self.assertEqual(mock_request_event_queue.call_args_list[0][0][3],
 | 
						|
                             True)
 | 
						|
 | 
						|
        #############################################################
 | 
						|
        # Now, switch email address visibility, check client_gravatar
 | 
						|
        # is automatically disabled for the user.
 | 
						|
        do_set_realm_property(user.realm, "email_address_visibility",
 | 
						|
                              Realm.EMAIL_ADDRESS_VISIBILITY_ADMINS)
 | 
						|
        result = self.client_get('/json/users?client_gravatar=true')
 | 
						|
        self.assert_json_success(result)
 | 
						|
        members = result.json()['members']
 | 
						|
        hamlet = find_dict(members, 'user_id', user.id)
 | 
						|
        self.assertEqual(hamlet['email'], "user%s@zulip.testserver" % (user.id,))
 | 
						|
        self.assertEqual(hamlet['avatar_url'], get_gravatar_url(hamlet['email'], 1))
 | 
						|
        self.assertNotIn('delivery_email', hamlet)
 | 
						|
 | 
						|
        # Also verify the /events code path.  This is a bit hacky, but
 | 
						|
        # basically we want to verify client_gravatar is being
 | 
						|
        # overridden.
 | 
						|
        with mock.patch('zerver.lib.events.request_event_queue',
 | 
						|
                        return_value=None) as mock_request_event_queue:
 | 
						|
            with self.assertRaises(JsonableError):
 | 
						|
                result = do_events_register(user, get_client("website"),
 | 
						|
                                            client_gravatar=True)
 | 
						|
            self.assertEqual(mock_request_event_queue.call_args_list[0][0][3],
 | 
						|
                             False)
 | 
						|
 | 
						|
        # client_gravatar is still turned off for admins.  In theory,
 | 
						|
        # it doesn't need to be, but client-side changes would be
 | 
						|
        # required in apps like the mobile apps.
 | 
						|
        # delivery_email is sent for admins.
 | 
						|
        admin.refresh_from_db()
 | 
						|
        self.login(admin.delivery_email)
 | 
						|
        result = self.client_get('/json/users?client_gravatar=true')
 | 
						|
        self.assert_json_success(result)
 | 
						|
        members = result.json()['members']
 | 
						|
        hamlet = find_dict(members, 'user_id', user.id)
 | 
						|
        self.assertEqual(hamlet['email'], "user%s@zulip.testserver" % (user.id,))
 | 
						|
        self.assertEqual(hamlet['avatar_url'], get_gravatar_url(hamlet['email'], 1))
 | 
						|
        self.assertEqual(hamlet['delivery_email'], self.example_email("hamlet"))
 | 
						|
 | 
						|
    def test_user_cannot_promote_to_admin(self) -> None:
 | 
						|
        self.login(self.example_email("hamlet"))
 | 
						|
        req = dict(is_admin=ujson.dumps(True))
 | 
						|
        result = self.client_patch('/json/users/{}'.format(self.example_user('hamlet').id), req)
 | 
						|
        self.assert_json_error(result, 'Insufficient permission')
 | 
						|
 | 
						|
    def test_admin_user_can_change_full_name(self) -> None:
 | 
						|
        new_name = 'new name'
 | 
						|
        self.login(self.example_email("iago"))
 | 
						|
        hamlet = self.example_user('hamlet')
 | 
						|
        req = dict(full_name=ujson.dumps(new_name))
 | 
						|
        result = self.client_patch('/json/users/{}'.format(hamlet.id), req)
 | 
						|
        self.assert_json_success(result)
 | 
						|
        hamlet = self.example_user('hamlet')
 | 
						|
        self.assertEqual(hamlet.full_name, new_name)
 | 
						|
        req['is_admin'] = ujson.dumps(False)
 | 
						|
        result = self.client_patch('/json/users/{}'.format(hamlet.id), req)
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
    def test_non_admin_cannot_change_full_name(self) -> None:
 | 
						|
        self.login(self.example_email("hamlet"))
 | 
						|
        req = dict(full_name=ujson.dumps('new name'))
 | 
						|
        result = self.client_patch('/json/users/{}'.format(self.example_user('othello').id), req)
 | 
						|
        self.assert_json_error(result, 'Insufficient permission')
 | 
						|
 | 
						|
    def test_admin_cannot_set_long_full_name(self) -> None:
 | 
						|
        new_name = 'a' * (UserProfile.MAX_NAME_LENGTH + 1)
 | 
						|
        self.login(self.example_email("iago"))
 | 
						|
        req = dict(full_name=ujson.dumps(new_name))
 | 
						|
        result = self.client_patch('/json/users/{}'.format(self.example_user('hamlet').id), req)
 | 
						|
        self.assert_json_error(result, 'Name too long!')
 | 
						|
 | 
						|
    def test_admin_cannot_set_short_full_name(self) -> None:
 | 
						|
        new_name = 'a'
 | 
						|
        self.login(self.example_email("iago"))
 | 
						|
        req = dict(full_name=ujson.dumps(new_name))
 | 
						|
        result = self.client_patch('/json/users/{}'.format(self.example_user('hamlet').id), req)
 | 
						|
        self.assert_json_error(result, 'Name too short!')
 | 
						|
 | 
						|
    def test_admin_cannot_set_full_name_with_invalid_characters(self) -> None:
 | 
						|
        new_name = 'Opheli*'
 | 
						|
        self.login(self.example_email("iago"))
 | 
						|
        req = dict(full_name=ujson.dumps(new_name))
 | 
						|
        result = self.client_patch('/json/users/{}'.format(self.example_user('hamlet').id), req)
 | 
						|
        self.assert_json_error(result, 'Invalid characters in name!')
 | 
						|
 | 
						|
    def test_access_user_by_id(self) -> None:
 | 
						|
        iago = self.example_user("iago")
 | 
						|
 | 
						|
        # Must be a valid user ID in the realm
 | 
						|
        with self.assertRaises(JsonableError):
 | 
						|
            access_user_by_id(iago, 1234)
 | 
						|
        with self.assertRaises(JsonableError):
 | 
						|
            access_user_by_id(iago, self.mit_user("sipbtest").id)
 | 
						|
 | 
						|
        # Can only access bot users if allow_deactivated is passed
 | 
						|
        bot = self.example_user("default_bot")
 | 
						|
        access_user_by_id(iago, bot.id, allow_bots=True)
 | 
						|
        with self.assertRaises(JsonableError):
 | 
						|
            access_user_by_id(iago, bot.id)
 | 
						|
 | 
						|
        # Can only access deactivated users if allow_deactivated is passed
 | 
						|
        hamlet = self.example_user("hamlet")
 | 
						|
        do_deactivate_user(hamlet)
 | 
						|
        with self.assertRaises(JsonableError):
 | 
						|
            access_user_by_id(iago, hamlet.id)
 | 
						|
        access_user_by_id(iago, hamlet.id, allow_deactivated=True)
 | 
						|
 | 
						|
        # Non-admin user can't admin another user
 | 
						|
        with self.assertRaises(JsonableError):
 | 
						|
            access_user_by_id(self.example_user("cordelia"), self.example_user("aaron").id)
 | 
						|
 | 
						|
    def test_change_regular_member_to_guest(self) -> None:
 | 
						|
        iago = self.example_user("iago")
 | 
						|
        self.login(iago.email)
 | 
						|
 | 
						|
        hamlet = self.example_user("hamlet")
 | 
						|
        self.assertFalse(hamlet.is_guest)
 | 
						|
 | 
						|
        # Test failure of making user both admin and guest
 | 
						|
        req = dict(is_guest=ujson.dumps(True), is_admin=ujson.dumps(True))
 | 
						|
        result = self.client_patch('/json/users/{}'.format(hamlet.id), req)
 | 
						|
        self.assert_json_error(result, 'Guests cannot be organization administrators')
 | 
						|
        self.assertFalse(hamlet.is_guest)
 | 
						|
        self.assertFalse(hamlet.is_realm_admin)
 | 
						|
        hamlet = self.example_user("hamlet")
 | 
						|
 | 
						|
        req = dict(is_guest=ujson.dumps(True))
 | 
						|
        events = []  # type: List[Mapping[str, Any]]
 | 
						|
        with tornado_redirected_to_list(events):
 | 
						|
            result = self.client_patch('/json/users/{}'.format(hamlet.id), req)
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
        hamlet = self.example_user("hamlet")
 | 
						|
        self.assertTrue(hamlet.is_guest)
 | 
						|
        self.assertFalse(hamlet.can_access_all_realm_members())
 | 
						|
        person = events[0]['event']['person']
 | 
						|
        self.assertEqual(person['email'], hamlet.email)
 | 
						|
        self.assertTrue(person['is_guest'])
 | 
						|
 | 
						|
    def test_change_guest_to_regular_member(self) -> None:
 | 
						|
        iago = self.example_user("iago")
 | 
						|
        self.login(iago.email)
 | 
						|
 | 
						|
        polonius = self.example_user("polonius")
 | 
						|
        self.assertTrue(polonius.is_guest)
 | 
						|
        req = dict(is_guest=ujson.dumps(False))
 | 
						|
        events = []  # type: List[Mapping[str, Any]]
 | 
						|
        with tornado_redirected_to_list(events):
 | 
						|
            result = self.client_patch('/json/users/{}'.format(polonius.id), req)
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
        polonius = self.example_user("polonius")
 | 
						|
        self.assertFalse(polonius.is_guest)
 | 
						|
        person = events[0]['event']['person']
 | 
						|
        self.assertEqual(person['email'], polonius.email)
 | 
						|
        self.assertFalse(person['is_guest'])
 | 
						|
 | 
						|
    def test_change_admin_to_guest(self) -> None:
 | 
						|
        iago = self.example_user("iago")
 | 
						|
        self.login(iago.email)
 | 
						|
        hamlet = self.example_user("hamlet")
 | 
						|
        do_change_is_admin(hamlet, True)
 | 
						|
        self.assertFalse(hamlet.is_guest)
 | 
						|
        self.assertTrue(hamlet.is_realm_admin)
 | 
						|
 | 
						|
        # Test failure of making a admin to guest without revoking admin status
 | 
						|
        req = dict(is_guest=ujson.dumps(True))
 | 
						|
        result = self.client_patch('/json/users/{}'.format(hamlet.id), req)
 | 
						|
        self.assert_json_error(result, 'Guests cannot be organization administrators')
 | 
						|
 | 
						|
        # Test changing a user from admin to guest and revoking admin status
 | 
						|
        hamlet = self.example_user("hamlet")
 | 
						|
        self.assertFalse(hamlet.is_guest)
 | 
						|
        req = dict(is_admin=ujson.dumps(False), is_guest=ujson.dumps(True))
 | 
						|
        events = []  # type: List[Mapping[str, Any]]
 | 
						|
        with tornado_redirected_to_list(events):
 | 
						|
            result = self.client_patch('/json/users/{}'.format(hamlet.id), req)
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
        hamlet = self.example_user("hamlet")
 | 
						|
        self.assertTrue(hamlet.is_guest)
 | 
						|
        self.assertFalse(hamlet.is_realm_admin)
 | 
						|
 | 
						|
        person = events[0]['event']['person']
 | 
						|
        self.assertEqual(person['email'], hamlet.email)
 | 
						|
        self.assertFalse(person['is_admin'])
 | 
						|
 | 
						|
        person = events[1]['event']['person']
 | 
						|
        self.assertEqual(person['email'], hamlet.email)
 | 
						|
        self.assertTrue(person['is_guest'])
 | 
						|
 | 
						|
    def test_change_guest_to_admin(self) -> None:
 | 
						|
        iago = self.example_user("iago")
 | 
						|
        self.login(iago.email)
 | 
						|
        polonius = self.example_user("polonius")
 | 
						|
        self.assertTrue(polonius.is_guest)
 | 
						|
        self.assertFalse(polonius.is_realm_admin)
 | 
						|
 | 
						|
        # Test failure of making a guest to admin without revoking guest status
 | 
						|
        req = dict(is_admin=ujson.dumps(True))
 | 
						|
        result = self.client_patch('/json/users/{}'.format(polonius.id), req)
 | 
						|
        self.assert_json_error(result, 'Guests cannot be organization administrators')
 | 
						|
 | 
						|
        # Test changing a user from guest to admin and revoking guest status
 | 
						|
        polonius = self.example_user("polonius")
 | 
						|
        self.assertFalse(polonius.is_realm_admin)
 | 
						|
        req = dict(is_admin=ujson.dumps(True), is_guest=ujson.dumps(False))
 | 
						|
        events = []  # type: List[Mapping[str, Any]]
 | 
						|
        with tornado_redirected_to_list(events):
 | 
						|
            result = self.client_patch('/json/users/{}'.format(polonius.id), req)
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
        polonius = self.example_user("polonius")
 | 
						|
        self.assertFalse(polonius.is_guest)
 | 
						|
        self.assertTrue(polonius.is_realm_admin)
 | 
						|
 | 
						|
        person = events[0]['event']['person']
 | 
						|
        self.assertEqual(person['email'], polonius.email)
 | 
						|
        self.assertTrue(person['is_admin'])
 | 
						|
 | 
						|
    def test_admin_user_can_change_profile_data(self) -> None:
 | 
						|
        realm = get_realm('zulip')
 | 
						|
        self.login(self.example_email("iago"))
 | 
						|
        new_profile_data = []
 | 
						|
        cordelia = self.example_user("cordelia")
 | 
						|
 | 
						|
        # Test for all type of data
 | 
						|
        fields = {
 | 
						|
            'Phone number': 'short text data',
 | 
						|
            'Biography': 'long text data',
 | 
						|
            'Favorite food': 'short text data',
 | 
						|
            'Favorite editor': 'vim',
 | 
						|
            'Birthday': '1909-3-5',
 | 
						|
            'Favorite website': 'https://zulipchat.com',
 | 
						|
            'Mentor': [cordelia.id],
 | 
						|
            'GitHub': 'timabbott',
 | 
						|
        }
 | 
						|
 | 
						|
        for field_name in fields:
 | 
						|
            field = CustomProfileField.objects.get(name=field_name, realm=realm)
 | 
						|
            new_profile_data.append({
 | 
						|
                'id': field.id,
 | 
						|
                'value': fields[field_name],
 | 
						|
            })
 | 
						|
 | 
						|
        result = self.client_patch('/json/users/{}'.format(cordelia.id),
 | 
						|
                                   {'profile_data': ujson.dumps(new_profile_data)})
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
        cordelia = self.example_user("cordelia")
 | 
						|
        for field_dict in cordelia.profile_data:
 | 
						|
            with self.subTest(field_name=field_dict['name']):
 | 
						|
                self.assertEqual(field_dict['value'], fields[field_dict['name']])
 | 
						|
 | 
						|
        # Test admin user cannot set invalid profile data
 | 
						|
        invalid_fields = [
 | 
						|
            ('Favorite editor', 'invalid choice', "'invalid choice' is not a valid choice for 'Favorite editor'."),
 | 
						|
            ('Birthday', '1909-34-55', "Birthday is not a date"),
 | 
						|
            ('Favorite website', 'not url', "Favorite website is not a URL"),
 | 
						|
            ('Mentor', "not list of user ids", "User IDs is not a list"),
 | 
						|
        ]
 | 
						|
 | 
						|
        for field_name, field_value, error_msg in invalid_fields:
 | 
						|
            new_profile_data = []
 | 
						|
            field = CustomProfileField.objects.get(name=field_name, realm=realm)
 | 
						|
            new_profile_data.append({
 | 
						|
                'id': field.id,
 | 
						|
                'value': field_value,
 | 
						|
            })
 | 
						|
 | 
						|
            result = self.client_patch('/json/users/{}'.format(cordelia.id),
 | 
						|
                                       {'profile_data': ujson.dumps(new_profile_data)})
 | 
						|
            self.assert_json_error(result, error_msg)
 | 
						|
 | 
						|
        # non-existant field and no data
 | 
						|
        invalid_profile_data = [{
 | 
						|
            'id': 9001,
 | 
						|
            'value': ''
 | 
						|
        }]
 | 
						|
        result = self.client_patch('/json/users/{}'.format(cordelia.id),
 | 
						|
                                   {'profile_data': ujson.dumps(invalid_profile_data)})
 | 
						|
        self.assert_json_error(result, 'Field id 9001 not found.')
 | 
						|
 | 
						|
        # non-existant field and data
 | 
						|
        invalid_profile_data = [{
 | 
						|
            'id': 9001,
 | 
						|
            'value': 'some data'
 | 
						|
        }]
 | 
						|
        result = self.client_patch('/json/users/{}'.format(cordelia.id),
 | 
						|
                                   {'profile_data': ujson.dumps(invalid_profile_data)})
 | 
						|
        self.assert_json_error(result, 'Field id 9001 not found.')
 | 
						|
 | 
						|
        # Test for clearing/resetting field values.
 | 
						|
        empty_profile_data = []
 | 
						|
        for field_name in fields:
 | 
						|
            field = CustomProfileField.objects.get(name=field_name, realm=realm)
 | 
						|
            value = ''  # type: Union[str, None, List[Any]]
 | 
						|
            if field.field_type == CustomProfileField.USER:
 | 
						|
                value = []
 | 
						|
            empty_profile_data.append({
 | 
						|
                'id': field.id,
 | 
						|
                'value': value,
 | 
						|
            })
 | 
						|
        result = self.client_patch('/json/users/{}'.format(cordelia.id),
 | 
						|
                                   {'profile_data': ujson.dumps(empty_profile_data)})
 | 
						|
        self.assert_json_success(result)
 | 
						|
        for field_dict in cordelia.profile_data:
 | 
						|
            with self.subTest(field_name=field_dict['name']):
 | 
						|
                self.assertEqual(field_dict['value'], None)
 | 
						|
 | 
						|
        # Test adding some of the field values after removing all.
 | 
						|
        hamlet = self.example_user("hamlet")
 | 
						|
        new_fields = {
 | 
						|
            'Phone number': None,
 | 
						|
            'Biography': 'A test user',
 | 
						|
            'Favorite food': None,
 | 
						|
            'Favorite editor': None,
 | 
						|
            'Birthday': None,
 | 
						|
            'Favorite website': 'https://zulip.github.io',
 | 
						|
            'Mentor': [hamlet.id],
 | 
						|
            'GitHub': 'timabbott',
 | 
						|
        }
 | 
						|
        new_profile_data = []
 | 
						|
        for field_name in fields:
 | 
						|
            field = CustomProfileField.objects.get(name=field_name, realm=realm)
 | 
						|
            value = None
 | 
						|
            if new_fields[field_name]:
 | 
						|
                value = new_fields[field_name]
 | 
						|
            new_profile_data.append({
 | 
						|
                'id': field.id,
 | 
						|
                'value': value,
 | 
						|
            })
 | 
						|
        result = self.client_patch('/json/users/{}'.format(cordelia.id),
 | 
						|
                                   {'profile_data': ujson.dumps(new_profile_data)})
 | 
						|
        self.assert_json_success(result)
 | 
						|
        for field_dict in cordelia.profile_data:
 | 
						|
            with self.subTest(field_name=field_dict['name']):
 | 
						|
                self.assertEqual(field_dict['value'], new_fields[str(field_dict['name'])])
 | 
						|
 | 
						|
    def test_non_admin_user_cannot_change_profile_data(self) -> None:
 | 
						|
        self.login(self.example_email("cordelia"))
 | 
						|
        hamlet = self.example_user("hamlet")
 | 
						|
        realm = get_realm("zulip")
 | 
						|
 | 
						|
        new_profile_data = []
 | 
						|
        field = CustomProfileField.objects.get(name="Biography", realm=realm)
 | 
						|
        new_profile_data.append({
 | 
						|
            'id': field.id,
 | 
						|
            'value': "New hamlet Biography",
 | 
						|
        })
 | 
						|
        result = self.client_patch('/json/users/{}'.format(hamlet.id),
 | 
						|
                                   {'profile_data': ujson.dumps(new_profile_data)})
 | 
						|
        self.assert_json_error(result, 'Insufficient permission')
 | 
						|
 | 
						|
        result = self.client_patch('/json/users/{}'.format(self.example_user("cordelia").id),
 | 
						|
                                   {'profile_data': ujson.dumps(new_profile_data)})
 | 
						|
        self.assert_json_error(result, 'Insufficient permission')
 | 
						|
 | 
						|
class AdminCreateUserTest(ZulipTestCase):
 | 
						|
    def test_create_user_backend(self) -> None:
 | 
						|
 | 
						|
        # This test should give us complete coverage on
 | 
						|
        # create_user_backend.  It mostly exercises error
 | 
						|
        # conditions, and it also does a basic test of the success
 | 
						|
        # path.
 | 
						|
 | 
						|
        admin = self.example_user('hamlet')
 | 
						|
        admin_email = admin.email
 | 
						|
        realm = admin.realm
 | 
						|
        self.login(admin_email)
 | 
						|
        do_change_is_admin(admin, True)
 | 
						|
 | 
						|
        result = self.client_post("/json/users", dict())
 | 
						|
        self.assert_json_error(result, "Missing 'email' argument")
 | 
						|
 | 
						|
        result = self.client_post("/json/users", dict(
 | 
						|
            email='romeo@not-zulip.com',
 | 
						|
        ))
 | 
						|
        self.assert_json_error(result, "Missing 'password' argument")
 | 
						|
 | 
						|
        result = self.client_post("/json/users", dict(
 | 
						|
            email='romeo@not-zulip.com',
 | 
						|
            password='xxxx',
 | 
						|
        ))
 | 
						|
        self.assert_json_error(result, "Missing 'full_name' argument")
 | 
						|
 | 
						|
        result = self.client_post("/json/users", dict(
 | 
						|
            email='romeo@not-zulip.com',
 | 
						|
            password='xxxx',
 | 
						|
            full_name='Romeo Montague',
 | 
						|
        ))
 | 
						|
        self.assert_json_error(result, "Missing 'short_name' argument")
 | 
						|
 | 
						|
        result = self.client_post("/json/users", dict(
 | 
						|
            email='broken',
 | 
						|
            password='xxxx',
 | 
						|
            full_name='Romeo Montague',
 | 
						|
            short_name='Romeo',
 | 
						|
        ))
 | 
						|
        self.assert_json_error(result, "Bad name or username")
 | 
						|
 | 
						|
        result = self.client_post("/json/users", dict(
 | 
						|
            email='romeo@not-zulip.com',
 | 
						|
            password='xxxx',
 | 
						|
            full_name='Romeo Montague',
 | 
						|
            short_name='Romeo',
 | 
						|
        ))
 | 
						|
        self.assert_json_error(result,
 | 
						|
                               "Email 'romeo@not-zulip.com' not allowed in this organization")
 | 
						|
 | 
						|
        RealmDomain.objects.create(realm=get_realm('zulip'), domain='zulip.net')
 | 
						|
        valid_params = dict(
 | 
						|
            email='romeo@zulip.net',
 | 
						|
            password='xxxx',
 | 
						|
            full_name='Romeo Montague',
 | 
						|
            short_name='Romeo',
 | 
						|
        )
 | 
						|
        result = self.client_post("/json/users", valid_params)
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
        # Romeo is a newly registered user
 | 
						|
        new_user = get_user('romeo@zulip.net', get_realm('zulip'))
 | 
						|
        self.assertEqual(new_user.full_name, 'Romeo Montague')
 | 
						|
        self.assertEqual(new_user.short_name, 'Romeo')
 | 
						|
 | 
						|
        # we can't create the same user twice.
 | 
						|
        result = self.client_post("/json/users", valid_params)
 | 
						|
        self.assert_json_error(result,
 | 
						|
                               "Email 'romeo@zulip.net' already in use")
 | 
						|
 | 
						|
        # Don't allow user to sign up with disposable email.
 | 
						|
        realm.emails_restricted_to_domains = False
 | 
						|
        realm.disallow_disposable_email_addresses = True
 | 
						|
        realm.save()
 | 
						|
 | 
						|
        valid_params["email"] = "abc@mailnator.com"
 | 
						|
        result = self.client_post("/json/users", valid_params)
 | 
						|
        self.assert_json_error(result, "Disposable email addresses are not allowed in this organization")
 | 
						|
 | 
						|
        # Don't allow creating a user with + in their email address when realm
 | 
						|
        # is restricted to a domain.
 | 
						|
        realm.emails_restricted_to_domains = True
 | 
						|
        realm.save()
 | 
						|
 | 
						|
        valid_params["email"] = "iago+label@zulip.com"
 | 
						|
        result = self.client_post("/json/users", valid_params)
 | 
						|
        self.assert_json_error(result, "Email addresses containing + are not allowed.")
 | 
						|
 | 
						|
        # Users can be created with + in their email address when realm
 | 
						|
        # is not restricted to a domain.
 | 
						|
        realm.emails_restricted_to_domains = False
 | 
						|
        realm.save()
 | 
						|
 | 
						|
        valid_params["email"] = "iago+label@zulip.com"
 | 
						|
        result = self.client_post("/json/users", valid_params)
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
class UserProfileTest(ZulipTestCase):
 | 
						|
    def test_get_emails_from_user_ids(self) -> None:
 | 
						|
        hamlet = self.example_user('hamlet')
 | 
						|
        othello = self.example_user('othello')
 | 
						|
        dct = get_emails_from_user_ids([hamlet.id, othello.id])
 | 
						|
        self.assertEqual(dct[hamlet.id], self.example_email("hamlet"))
 | 
						|
        self.assertEqual(dct[othello.id], self.example_email("othello"))
 | 
						|
 | 
						|
    def test_valid_user_id(self) -> None:
 | 
						|
        realm = get_realm("zulip")
 | 
						|
        hamlet = self.example_user('hamlet')
 | 
						|
        othello = self.example_user('othello')
 | 
						|
        bot = self.example_user("default_bot")
 | 
						|
 | 
						|
        # Invalid user ID
 | 
						|
        invalid_uid = 1000  # type: Any
 | 
						|
        self.assertEqual(check_valid_user_ids(realm.id, invalid_uid),
 | 
						|
                         "User IDs is not a list")
 | 
						|
        self.assertEqual(check_valid_user_ids(realm.id, [invalid_uid]),
 | 
						|
                         "Invalid user ID: %d" % (invalid_uid,))
 | 
						|
 | 
						|
        invalid_uid = "abc"
 | 
						|
        self.assertEqual(check_valid_user_ids(realm.id, [invalid_uid]),
 | 
						|
                         "User IDs[0] is not an integer")
 | 
						|
        invalid_uid = str(othello.id)
 | 
						|
        self.assertEqual(check_valid_user_ids(realm.id, [invalid_uid]),
 | 
						|
                         "User IDs[0] is not an integer")
 | 
						|
 | 
						|
        # User is in different realm
 | 
						|
        self.assertEqual(check_valid_user_ids(get_realm("zephyr").id, [hamlet.id]),
 | 
						|
                         "Invalid user ID: %d" % (hamlet.id,))
 | 
						|
 | 
						|
        # User is not active
 | 
						|
        hamlet.is_active = False
 | 
						|
        hamlet.save()
 | 
						|
        self.assertEqual(check_valid_user_ids(realm.id, [hamlet.id]),
 | 
						|
                         "User with ID %d is deactivated" % (hamlet.id,))
 | 
						|
        self.assertEqual(check_valid_user_ids(realm.id, [hamlet.id], allow_deactivated=True),
 | 
						|
                         None)
 | 
						|
 | 
						|
        # User is a bot
 | 
						|
        self.assertEqual(check_valid_user_ids(realm.id, [bot.id]),
 | 
						|
                         "User with ID %d is a bot" % (bot.id,))
 | 
						|
 | 
						|
        # Succesfully get non-bot, active user belong to your realm
 | 
						|
        self.assertEqual(check_valid_user_ids(realm.id, [othello.id]), None)
 | 
						|
 | 
						|
    def test_cache_invalidation(self) -> None:
 | 
						|
        hamlet = self.example_user('hamlet')
 | 
						|
        with mock.patch('zerver.lib.cache.delete_display_recipient_cache') as m:
 | 
						|
            hamlet.full_name = 'Hamlet Junior'
 | 
						|
            hamlet.save(update_fields=["full_name"])
 | 
						|
 | 
						|
        self.assertTrue(m.called)
 | 
						|
 | 
						|
        with mock.patch('zerver.lib.cache.delete_display_recipient_cache') as m:
 | 
						|
            hamlet.long_term_idle = True
 | 
						|
            hamlet.save(update_fields=["long_term_idle"])
 | 
						|
 | 
						|
        self.assertFalse(m.called)
 | 
						|
 | 
						|
    def test_user_ids_to_users(self) -> None:
 | 
						|
        real_user_ids = [
 | 
						|
            self.example_user('hamlet').id,
 | 
						|
            self.example_user('cordelia').id,
 | 
						|
        ]
 | 
						|
 | 
						|
        self.assertEqual(user_ids_to_users([], get_realm("zulip")), [])
 | 
						|
        self.assertEqual(set([user_profile.id for user_profile in user_ids_to_users(real_user_ids, get_realm("zulip"))]),
 | 
						|
                         set(real_user_ids))
 | 
						|
        with self.assertRaises(JsonableError):
 | 
						|
            user_ids_to_users([1234], get_realm("zephyr"))
 | 
						|
        with self.assertRaises(JsonableError):
 | 
						|
            user_ids_to_users(real_user_ids, get_realm("zephyr"))
 | 
						|
 | 
						|
    def test_bulk_get_users(self) -> None:
 | 
						|
        from zerver.lib.users import bulk_get_users
 | 
						|
        hamlet = self.example_email("hamlet")
 | 
						|
        cordelia = self.example_email("cordelia")
 | 
						|
        webhook_bot = self.example_email("webhook_bot")
 | 
						|
        result = bulk_get_users([hamlet, cordelia], get_realm("zulip"))
 | 
						|
        self.assertEqual(result[hamlet].email, hamlet)
 | 
						|
        self.assertEqual(result[cordelia].email, cordelia)
 | 
						|
 | 
						|
        result = bulk_get_users([hamlet, cordelia, webhook_bot], None,
 | 
						|
                                base_query=UserProfile.objects.all())
 | 
						|
        self.assertEqual(result[hamlet].email, hamlet)
 | 
						|
        self.assertEqual(result[cordelia].email, cordelia)
 | 
						|
        self.assertEqual(result[webhook_bot].email, webhook_bot)
 | 
						|
 | 
						|
    def test_get_accounts_for_email(self) -> None:
 | 
						|
        def check_account_present_in_accounts(user: UserProfile, accounts: List[Dict[str, Optional[str]]]) -> None:
 | 
						|
            for account in accounts:
 | 
						|
                realm = user.realm
 | 
						|
                if account["avatar"] == avatar_url(user) and account["full_name"] == user.full_name \
 | 
						|
                        and account["realm_name"] == realm.name and account["string_id"] == realm.string_id:
 | 
						|
                    return
 | 
						|
            raise AssertionError("Account not found")
 | 
						|
 | 
						|
        lear_realm = get_realm("lear")
 | 
						|
        cordelia_in_zulip = self.example_user("cordelia")
 | 
						|
        cordelia_in_lear = get_user("cordelia@zulip.com", lear_realm)
 | 
						|
 | 
						|
        email = "cordelia@zulip.com"
 | 
						|
        accounts = get_accounts_for_email(email)
 | 
						|
        self.assert_length(accounts, 2)
 | 
						|
        check_account_present_in_accounts(cordelia_in_zulip, accounts)
 | 
						|
        check_account_present_in_accounts(cordelia_in_lear, accounts)
 | 
						|
 | 
						|
        email = "CORDelia@zulip.com"
 | 
						|
        accounts = get_accounts_for_email(email)
 | 
						|
        self.assert_length(accounts, 2)
 | 
						|
        check_account_present_in_accounts(cordelia_in_zulip, accounts)
 | 
						|
        check_account_present_in_accounts(cordelia_in_lear, accounts)
 | 
						|
 | 
						|
        email = "IAGO@ZULIP.COM"
 | 
						|
        accounts = get_accounts_for_email(email)
 | 
						|
        self.assert_length(accounts, 1)
 | 
						|
        check_account_present_in_accounts(self.example_user("iago"), accounts)
 | 
						|
 | 
						|
    def test_get_source_profile(self) -> None:
 | 
						|
        iago = get_source_profile("iago@zulip.com", "zulip")
 | 
						|
        assert iago is not None
 | 
						|
        self.assertEqual(iago.email, "iago@zulip.com")
 | 
						|
        self.assertEqual(iago.realm, get_realm("zulip"))
 | 
						|
 | 
						|
        iago = get_source_profile("IAGO@ZULIP.com", "zulip")
 | 
						|
        assert iago is not None
 | 
						|
        self.assertEqual(iago.email, "iago@zulip.com")
 | 
						|
 | 
						|
        cordelia = get_source_profile("cordelia@zulip.com", "lear")
 | 
						|
        assert cordelia is not None
 | 
						|
        self.assertEqual(cordelia.email, "cordelia@zulip.com")
 | 
						|
 | 
						|
        self.assertIsNone(get_source_profile("iagod@zulip.com", "zulip"))
 | 
						|
        self.assertIsNone(get_source_profile("iago@zulip.com", "ZULIP"))
 | 
						|
        self.assertIsNone(get_source_profile("iago@zulip.com", "lear"))
 | 
						|
 | 
						|
    def test_copy_user_settings(self) -> None:
 | 
						|
        iago = self.example_user("iago")
 | 
						|
        cordelia = self.example_user("cordelia")
 | 
						|
        hamlet = self.example_user("hamlet")
 | 
						|
 | 
						|
        cordelia.default_language = "de"
 | 
						|
        cordelia.emojiset = "twitter"
 | 
						|
        cordelia.timezone = "America/Phoenix"
 | 
						|
        cordelia.night_mode = True
 | 
						|
        cordelia.enable_offline_email_notifications = False
 | 
						|
        cordelia.enable_stream_push_notifications = True
 | 
						|
        cordelia.enter_sends = False
 | 
						|
        cordelia.save()
 | 
						|
 | 
						|
        UserHotspot.objects.filter(user=cordelia).delete()
 | 
						|
        UserHotspot.objects.filter(user=iago).delete()
 | 
						|
        hotspots_completed = ['intro_reply', 'intro_streams', 'intro_topics']
 | 
						|
        for hotspot in hotspots_completed:
 | 
						|
            UserHotspot.objects.create(user=cordelia, hotspot=hotspot)
 | 
						|
 | 
						|
        copy_user_settings(cordelia, iago)
 | 
						|
 | 
						|
        # We verify that cordelia and iago match, but hamlet has the defaults.
 | 
						|
        self.assertEqual(iago.full_name, "Cordelia Lear")
 | 
						|
        self.assertEqual(cordelia.full_name, "Cordelia Lear")
 | 
						|
        self.assertEqual(hamlet.full_name, "King Hamlet")
 | 
						|
 | 
						|
        self.assertEqual(iago.default_language, "de")
 | 
						|
        self.assertEqual(cordelia.default_language, "de")
 | 
						|
        self.assertEqual(hamlet.default_language, "en")
 | 
						|
 | 
						|
        self.assertEqual(iago.emojiset, "twitter")
 | 
						|
        self.assertEqual(cordelia.emojiset, "twitter")
 | 
						|
        self.assertEqual(hamlet.emojiset, "google-blob")
 | 
						|
 | 
						|
        self.assertEqual(iago.timezone, "America/Phoenix")
 | 
						|
        self.assertEqual(cordelia.timezone, "America/Phoenix")
 | 
						|
        self.assertEqual(hamlet.timezone, "")
 | 
						|
 | 
						|
        self.assertEqual(iago.night_mode, True)
 | 
						|
        self.assertEqual(cordelia.night_mode, True)
 | 
						|
        self.assertEqual(hamlet.night_mode, False)
 | 
						|
 | 
						|
        self.assertEqual(iago.enable_offline_email_notifications, False)
 | 
						|
        self.assertEqual(cordelia.enable_offline_email_notifications, False)
 | 
						|
        self.assertEqual(hamlet.enable_offline_email_notifications, True)
 | 
						|
 | 
						|
        self.assertEqual(iago.enable_stream_push_notifications, True)
 | 
						|
        self.assertEqual(cordelia.enable_stream_push_notifications, True)
 | 
						|
        self.assertEqual(hamlet.enable_stream_push_notifications, False)
 | 
						|
 | 
						|
        self.assertEqual(iago.enter_sends, False)
 | 
						|
        self.assertEqual(cordelia.enter_sends, False)
 | 
						|
        self.assertEqual(hamlet.enter_sends, True)
 | 
						|
 | 
						|
        hotspots = list(UserHotspot.objects.filter(user=iago).values_list('hotspot', flat=True))
 | 
						|
        self.assertEqual(hotspots, hotspots_completed)
 | 
						|
 | 
						|
    def test_get_user_by_id_in_realm_including_cross_realm(self) -> None:
 | 
						|
        realm = get_realm('zulip')
 | 
						|
        hamlet = self.example_user('hamlet')
 | 
						|
        othello = self.example_user('othello')
 | 
						|
        bot = get_system_bot(settings.WELCOME_BOT)
 | 
						|
 | 
						|
        # Pass in the ID of a cross-realm bot and a valid realm
 | 
						|
        cross_realm_bot = get_user_by_id_in_realm_including_cross_realm(
 | 
						|
            bot.id, realm)
 | 
						|
        self.assertEqual(cross_realm_bot.email, bot.email)
 | 
						|
        self.assertEqual(cross_realm_bot.id, bot.id)
 | 
						|
 | 
						|
        # Pass in the ID of a cross-realm bot but with a invalid realm,
 | 
						|
        # note that the realm should be irrelevant here
 | 
						|
        cross_realm_bot = get_user_by_id_in_realm_including_cross_realm(
 | 
						|
            bot.id, None)
 | 
						|
        self.assertEqual(cross_realm_bot.email, bot.email)
 | 
						|
        self.assertEqual(cross_realm_bot.id, bot.id)
 | 
						|
 | 
						|
        # Pass in the ID of a non-cross-realm user with a realm
 | 
						|
        user_profile = get_user_by_id_in_realm_including_cross_realm(
 | 
						|
            othello.id, realm)
 | 
						|
        self.assertEqual(user_profile.email, othello.email)
 | 
						|
        self.assertEqual(user_profile.id, othello.id)
 | 
						|
 | 
						|
        # If the realm doesn't match, or if the ID is not that of a
 | 
						|
        # cross-realm bot, UserProfile.DoesNotExist is raised
 | 
						|
        with self.assertRaises(UserProfile.DoesNotExist):
 | 
						|
            get_user_by_id_in_realm_including_cross_realm(
 | 
						|
                hamlet.id, None)
 | 
						|
 | 
						|
class ActivateTest(ZulipTestCase):
 | 
						|
    def test_basics(self) -> None:
 | 
						|
        user = self.example_user('hamlet')
 | 
						|
        do_deactivate_user(user)
 | 
						|
        self.assertFalse(user.is_active)
 | 
						|
        do_reactivate_user(user)
 | 
						|
        self.assertTrue(user.is_active)
 | 
						|
 | 
						|
    def test_api(self) -> None:
 | 
						|
        admin = self.example_user('othello')
 | 
						|
        do_change_is_admin(admin, True)
 | 
						|
        self.login(self.example_email("othello"))
 | 
						|
 | 
						|
        user = self.example_user('hamlet')
 | 
						|
        self.assertTrue(user.is_active)
 | 
						|
 | 
						|
        result = self.client_delete('/json/users/{}'.format(user.id))
 | 
						|
        self.assert_json_success(result)
 | 
						|
        user = self.example_user('hamlet')
 | 
						|
        self.assertFalse(user.is_active)
 | 
						|
 | 
						|
        result = self.client_post('/json/users/{}/reactivate'.format(user.id))
 | 
						|
        self.assert_json_success(result)
 | 
						|
        user = self.example_user('hamlet')
 | 
						|
        self.assertTrue(user.is_active)
 | 
						|
 | 
						|
    def test_api_with_nonexistent_user(self) -> None:
 | 
						|
        admin = self.example_user('othello')
 | 
						|
        do_change_is_admin(admin, True)
 | 
						|
        self.login(self.example_email("othello"))
 | 
						|
 | 
						|
        # Cannot deactivate a user with the bot api
 | 
						|
        result = self.client_delete('/json/bots/{}'.format(self.example_user("hamlet").id))
 | 
						|
        self.assert_json_error(result, 'No such bot')
 | 
						|
 | 
						|
        # Cannot deactivate a nonexistent user.
 | 
						|
        invalid_user_id = 1000
 | 
						|
        result = self.client_delete('/json/users/{}'.format(invalid_user_id))
 | 
						|
        self.assert_json_error(result, 'No such user')
 | 
						|
 | 
						|
        result = self.client_delete('/json/users/{}'.format(self.example_user("webhook_bot").id))
 | 
						|
        self.assert_json_error(result, 'No such user')
 | 
						|
 | 
						|
        result = self.client_delete('/json/users/{}'.format(self.example_user("iago").id))
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
        result = self.client_delete('/json/users/{}'.format(admin.id))
 | 
						|
        self.assert_json_error(result, 'Cannot deactivate the only organization administrator')
 | 
						|
 | 
						|
        # Cannot reactivate a nonexistent user.
 | 
						|
        invalid_user_id = 1000
 | 
						|
        result = self.client_post('/json/users/{}/reactivate'.format(invalid_user_id))
 | 
						|
        self.assert_json_error(result, 'No such user')
 | 
						|
 | 
						|
    def test_api_with_insufficient_permissions(self) -> None:
 | 
						|
        non_admin = self.example_user('othello')
 | 
						|
        do_change_is_admin(non_admin, False)
 | 
						|
        self.login(self.example_email("othello"))
 | 
						|
 | 
						|
        # Cannot deactivate a user with the users api
 | 
						|
        result = self.client_delete('/json/users/{}'.format(self.example_user("hamlet").id))
 | 
						|
        self.assert_json_error(result, 'Insufficient permission')
 | 
						|
 | 
						|
        # Cannot reactivate a user
 | 
						|
        result = self.client_post('/json/users/{}/reactivate'.format(self.example_user("hamlet").id))
 | 
						|
        self.assert_json_error(result, 'Insufficient permission')
 | 
						|
 | 
						|
    def test_clear_scheduled_jobs(self) -> None:
 | 
						|
        user = self.example_user('hamlet')
 | 
						|
        send_future_email('zerver/emails/followup_day1', user.realm,
 | 
						|
                          to_user_ids=[user.id], delay=datetime.timedelta(hours=1))
 | 
						|
        self.assertEqual(ScheduledEmail.objects.count(), 1)
 | 
						|
        do_deactivate_user(user)
 | 
						|
        self.assertEqual(ScheduledEmail.objects.count(), 0)
 | 
						|
 | 
						|
    def test_send_future_email_with_multiple_recipients(self) -> None:
 | 
						|
        hamlet = self.example_user('hamlet')
 | 
						|
        iago = self.example_user('iago')
 | 
						|
        send_future_email('zerver/emails/followup_day1', iago.realm,
 | 
						|
                          to_user_ids=[hamlet.id, iago.id], delay=datetime.timedelta(hours=1))
 | 
						|
        self.assertEqual(ScheduledEmail.objects.filter(users__in=[hamlet, iago]).distinct().count(), 1)
 | 
						|
        email = ScheduledEmail.objects.all().first()
 | 
						|
        self.assertEqual(email.users.count(), 2)
 | 
						|
 | 
						|
    def test_clear_scheduled_emails_with_multiple_user_ids(self) -> None:
 | 
						|
        hamlet = self.example_user('hamlet')
 | 
						|
        iago = self.example_user('iago')
 | 
						|
        send_future_email('zerver/emails/followup_day1', iago.realm,
 | 
						|
                          to_user_ids=[hamlet.id, iago.id], delay=datetime.timedelta(hours=1))
 | 
						|
        self.assertEqual(ScheduledEmail.objects.count(), 1)
 | 
						|
        clear_scheduled_emails([hamlet.id, iago.id])
 | 
						|
        self.assertEqual(ScheduledEmail.objects.count(), 0)
 | 
						|
 | 
						|
    def test_clear_schedule_emails_with_one_user_id(self) -> None:
 | 
						|
        hamlet = self.example_user('hamlet')
 | 
						|
        iago = self.example_user('iago')
 | 
						|
        send_future_email('zerver/emails/followup_day1', iago.realm,
 | 
						|
                          to_user_ids=[hamlet.id, iago.id], delay=datetime.timedelta(hours=1))
 | 
						|
        self.assertEqual(ScheduledEmail.objects.count(), 1)
 | 
						|
        clear_scheduled_emails([hamlet.id])
 | 
						|
        self.assertEqual(ScheduledEmail.objects.count(), 1)
 | 
						|
        self.assertEqual(ScheduledEmail.objects.filter(users=hamlet).count(), 0)
 | 
						|
        self.assertEqual(ScheduledEmail.objects.filter(users=iago).count(), 1)
 | 
						|
 | 
						|
    def test_deliver_email(self) -> None:
 | 
						|
        iago = self.example_user('iago')
 | 
						|
        hamlet = self.example_user('hamlet')
 | 
						|
        send_future_email('zerver/emails/followup_day1', iago.realm,
 | 
						|
                          to_user_ids=[hamlet.id, iago.id], delay=datetime.timedelta(hours=1))
 | 
						|
        self.assertEqual(ScheduledEmail.objects.count(), 1)
 | 
						|
        email = ScheduledEmail.objects.all().first()
 | 
						|
        deliver_email(email)
 | 
						|
        from django.core.mail import outbox
 | 
						|
        self.assertEqual(len(outbox), 1)
 | 
						|
        for message in outbox:
 | 
						|
            to_fields = [parseaddr(to_field)[1] for to_field in message.to]
 | 
						|
            self.assertEqual(set([hamlet.delivery_email, iago.delivery_email]), set(to_fields))
 | 
						|
        self.assertEqual(ScheduledEmail.objects.count(), 0)
 | 
						|
 | 
						|
class RecipientInfoTest(ZulipTestCase):
 | 
						|
    def test_stream_recipient_info(self) -> None:
 | 
						|
        hamlet = self.example_user('hamlet')
 | 
						|
        cordelia = self.example_user('cordelia')
 | 
						|
        othello = self.example_user('othello')
 | 
						|
 | 
						|
        realm = hamlet.realm
 | 
						|
 | 
						|
        stream_name = 'Test Stream'
 | 
						|
        topic_name = 'test topic'
 | 
						|
 | 
						|
        for user in [hamlet, cordelia, othello]:
 | 
						|
            self.subscribe(user, stream_name)
 | 
						|
 | 
						|
        stream = get_stream(stream_name, realm)
 | 
						|
        recipient = get_stream_recipient(stream.id)
 | 
						|
 | 
						|
        stream_topic = StreamTopicTarget(
 | 
						|
            stream_id=stream.id,
 | 
						|
            topic_name=topic_name,
 | 
						|
        )
 | 
						|
 | 
						|
        info = get_recipient_info(
 | 
						|
            recipient=recipient,
 | 
						|
            sender_id=hamlet.id,
 | 
						|
            stream_topic=stream_topic,
 | 
						|
        )
 | 
						|
 | 
						|
        all_user_ids = {hamlet.id, cordelia.id, othello.id}
 | 
						|
 | 
						|
        expected_info = dict(
 | 
						|
            active_user_ids=all_user_ids,
 | 
						|
            push_notify_user_ids=set(),
 | 
						|
            stream_push_user_ids=set(),
 | 
						|
            stream_email_user_ids=set(),
 | 
						|
            um_eligible_user_ids=all_user_ids,
 | 
						|
            long_term_idle_user_ids=set(),
 | 
						|
            default_bot_user_ids=set(),
 | 
						|
            service_bot_tuples=[],
 | 
						|
        )
 | 
						|
 | 
						|
        self.assertEqual(info, expected_info)
 | 
						|
 | 
						|
        hamlet.enable_stream_push_notifications = True
 | 
						|
        hamlet.save()
 | 
						|
        info = get_recipient_info(
 | 
						|
            recipient=recipient,
 | 
						|
            sender_id=hamlet.id,
 | 
						|
            stream_topic=stream_topic,
 | 
						|
        )
 | 
						|
        self.assertEqual(info['stream_push_user_ids'], {hamlet.id})
 | 
						|
 | 
						|
        sub = get_subscription(stream_name, hamlet)
 | 
						|
        sub.push_notifications = False
 | 
						|
        sub.save()
 | 
						|
        info = get_recipient_info(
 | 
						|
            recipient=recipient,
 | 
						|
            sender_id=hamlet.id,
 | 
						|
            stream_topic=stream_topic,
 | 
						|
        )
 | 
						|
        self.assertEqual(info['stream_push_user_ids'], set())
 | 
						|
 | 
						|
        hamlet.enable_stream_push_notifications = False
 | 
						|
        hamlet.save()
 | 
						|
        sub = get_subscription(stream_name, hamlet)
 | 
						|
        sub.push_notifications = True
 | 
						|
        sub.save()
 | 
						|
        info = get_recipient_info(
 | 
						|
            recipient=recipient,
 | 
						|
            sender_id=hamlet.id,
 | 
						|
            stream_topic=stream_topic,
 | 
						|
        )
 | 
						|
        self.assertEqual(info['stream_push_user_ids'], {hamlet.id})
 | 
						|
 | 
						|
        # Now mute Hamlet to omit him from stream_push_user_ids.
 | 
						|
        add_topic_mute(
 | 
						|
            user_profile=hamlet,
 | 
						|
            stream_id=stream.id,
 | 
						|
            recipient_id=recipient.id,
 | 
						|
            topic_name=topic_name,
 | 
						|
        )
 | 
						|
 | 
						|
        info = get_recipient_info(
 | 
						|
            recipient=recipient,
 | 
						|
            sender_id=hamlet.id,
 | 
						|
            stream_topic=stream_topic,
 | 
						|
        )
 | 
						|
 | 
						|
        self.assertEqual(info['stream_push_user_ids'], set())
 | 
						|
 | 
						|
        # Add a service bot.
 | 
						|
        service_bot = do_create_user(
 | 
						|
            email='service-bot@zulip.com',
 | 
						|
            password='',
 | 
						|
            realm=realm,
 | 
						|
            full_name='',
 | 
						|
            short_name='',
 | 
						|
            bot_type=UserProfile.EMBEDDED_BOT,
 | 
						|
        )
 | 
						|
 | 
						|
        info = get_recipient_info(
 | 
						|
            recipient=recipient,
 | 
						|
            sender_id=hamlet.id,
 | 
						|
            stream_topic=stream_topic,
 | 
						|
            possibly_mentioned_user_ids={service_bot.id}
 | 
						|
        )
 | 
						|
        self.assertEqual(info['service_bot_tuples'], [
 | 
						|
            (service_bot.id, UserProfile.EMBEDDED_BOT),
 | 
						|
        ])
 | 
						|
 | 
						|
        # Add a normal bot.
 | 
						|
        normal_bot = do_create_user(
 | 
						|
            email='normal-bot@zulip.com',
 | 
						|
            password='',
 | 
						|
            realm=realm,
 | 
						|
            full_name='',
 | 
						|
            short_name='',
 | 
						|
            bot_type=UserProfile.DEFAULT_BOT,
 | 
						|
        )
 | 
						|
 | 
						|
        info = get_recipient_info(
 | 
						|
            recipient=recipient,
 | 
						|
            sender_id=hamlet.id,
 | 
						|
            stream_topic=stream_topic,
 | 
						|
            possibly_mentioned_user_ids={service_bot.id, normal_bot.id}
 | 
						|
        )
 | 
						|
        self.assertEqual(info['default_bot_user_ids'], {normal_bot.id})
 | 
						|
 | 
						|
    def test_get_recipient_info_invalid_recipient_type(self) -> None:
 | 
						|
        hamlet = self.example_user('hamlet')
 | 
						|
        realm = hamlet.realm
 | 
						|
 | 
						|
        stream = get_stream('Rome', realm)
 | 
						|
        stream_topic = StreamTopicTarget(
 | 
						|
            stream_id=stream.id,
 | 
						|
            topic_name='test topic',
 | 
						|
        )
 | 
						|
 | 
						|
        # Make sure get_recipient_info asserts on invalid recipient types
 | 
						|
        with self.assertRaisesRegex(ValueError, 'Bad recipient type'):
 | 
						|
            invalid_recipient = Recipient(type=999)  # 999 is not a valid type
 | 
						|
            get_recipient_info(
 | 
						|
                recipient=invalid_recipient,
 | 
						|
                sender_id=hamlet.id,
 | 
						|
                stream_topic=stream_topic,
 | 
						|
            )
 | 
						|
 | 
						|
class BulkUsersTest(ZulipTestCase):
 | 
						|
    def test_client_gravatar_option(self) -> None:
 | 
						|
        self.login(self.example_email('cordelia'))
 | 
						|
 | 
						|
        hamlet = self.example_user('hamlet')
 | 
						|
 | 
						|
        def get_hamlet_avatar(client_gravatar: bool) -> Optional[str]:
 | 
						|
            data = dict(client_gravatar=ujson.dumps(client_gravatar))
 | 
						|
            result = self.client_get('/json/users', data)
 | 
						|
            self.assert_json_success(result)
 | 
						|
            rows = result.json()['members']
 | 
						|
            hamlet_data = [
 | 
						|
                row for row in rows
 | 
						|
                if row['user_id'] == hamlet.id
 | 
						|
            ][0]
 | 
						|
            return hamlet_data['avatar_url']
 | 
						|
 | 
						|
        self.assertEqual(
 | 
						|
            get_hamlet_avatar(client_gravatar=True),
 | 
						|
            None
 | 
						|
        )
 | 
						|
 | 
						|
        '''
 | 
						|
        The main purpose of this test is to make sure we
 | 
						|
        return None for avatar_url when client_gravatar is
 | 
						|
        set to True.  And we do a sanity check for when it's
 | 
						|
        False, but we leave it to other tests to validate
 | 
						|
        the specific URL.
 | 
						|
        '''
 | 
						|
        self.assertIn(
 | 
						|
            'gravatar.com',
 | 
						|
            get_hamlet_avatar(client_gravatar=False),
 | 
						|
        )
 | 
						|
 | 
						|
class GetProfileTest(ZulipTestCase):
 | 
						|
 | 
						|
    def common_update_pointer(self, email: str, pointer: int) -> None:
 | 
						|
        self.login(email)
 | 
						|
        result = self.client_post("/json/users/me/pointer", {"pointer": pointer})
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
    def common_get_profile(self, user_id: str) -> Dict[str, Any]:
 | 
						|
        # Assumes all users are example users in realm 'zulip'
 | 
						|
        user_profile = self.example_user(user_id)
 | 
						|
        self.send_stream_message(user_profile.email, "Verona", "hello")
 | 
						|
 | 
						|
        result = self.api_get(user_profile.email, "/api/v1/users/me")
 | 
						|
 | 
						|
        max_id = most_recent_message(user_profile).id
 | 
						|
 | 
						|
        self.assert_json_success(result)
 | 
						|
        json = result.json()
 | 
						|
 | 
						|
        self.assertIn("client_id", json)
 | 
						|
        self.assertIn("max_message_id", json)
 | 
						|
        self.assertIn("pointer", json)
 | 
						|
 | 
						|
        self.assertEqual(json["max_message_id"], max_id)
 | 
						|
        return json
 | 
						|
 | 
						|
    def test_get_pointer(self) -> None:
 | 
						|
        email = self.example_email("hamlet")
 | 
						|
        self.login(email)
 | 
						|
        result = self.client_get("/json/users/me/pointer")
 | 
						|
        self.assert_json_success(result)
 | 
						|
        self.assertIn("pointer", result.json())
 | 
						|
 | 
						|
    def test_cache_behavior(self) -> None:
 | 
						|
        """Tests whether fetching a user object the normal way, with
 | 
						|
        `get_user`, makes 1 cache query and 1 database query.
 | 
						|
        """
 | 
						|
        realm = get_realm("zulip")
 | 
						|
        email = self.example_email("hamlet")
 | 
						|
        with queries_captured() as queries:
 | 
						|
            with simulated_empty_cache() as cache_queries:
 | 
						|
                user_profile = get_user(email, realm)
 | 
						|
 | 
						|
        self.assert_length(queries, 1)
 | 
						|
        self.assert_length(cache_queries, 1)
 | 
						|
        self.assertEqual(user_profile.email, email)
 | 
						|
 | 
						|
    def test_get_user_profile(self) -> None:
 | 
						|
        self.login(self.example_email("hamlet"))
 | 
						|
        result = ujson.loads(self.client_get('/json/users/me').content)
 | 
						|
        self.assertEqual(result['short_name'], 'hamlet')
 | 
						|
        self.assertEqual(result['email'], self.example_email("hamlet"))
 | 
						|
        self.assertEqual(result['full_name'], 'King Hamlet')
 | 
						|
        self.assertIn("user_id", result)
 | 
						|
        self.assertFalse(result['is_bot'])
 | 
						|
        self.assertFalse(result['is_admin'])
 | 
						|
        self.login(self.example_email("iago"))
 | 
						|
        result = ujson.loads(self.client_get('/json/users/me').content)
 | 
						|
        self.assertEqual(result['short_name'], 'iago')
 | 
						|
        self.assertEqual(result['email'], self.example_email("iago"))
 | 
						|
        self.assertEqual(result['full_name'], 'Iago')
 | 
						|
        self.assertFalse(result['is_bot'])
 | 
						|
        self.assertTrue(result['is_admin'])
 | 
						|
 | 
						|
    def test_api_get_empty_profile(self) -> None:
 | 
						|
        """
 | 
						|
        Ensure GET /users/me returns a max message id and returns successfully
 | 
						|
        """
 | 
						|
        json = self.common_get_profile("othello")
 | 
						|
        self.assertEqual(json["pointer"], -1)
 | 
						|
 | 
						|
    def test_profile_with_pointer(self) -> None:
 | 
						|
        """
 | 
						|
        Ensure GET /users/me returns a proper pointer id after the pointer is updated
 | 
						|
        """
 | 
						|
 | 
						|
        id1 = self.send_stream_message(self.example_email("othello"), "Verona")
 | 
						|
        id2 = self.send_stream_message(self.example_email("othello"), "Verona")
 | 
						|
 | 
						|
        json = self.common_get_profile("hamlet")
 | 
						|
 | 
						|
        self.common_update_pointer(self.example_email("hamlet"), id2)
 | 
						|
        json = self.common_get_profile("hamlet")
 | 
						|
        self.assertEqual(json["pointer"], id2)
 | 
						|
 | 
						|
        self.common_update_pointer(self.example_email("hamlet"), id1)
 | 
						|
        json = self.common_get_profile("hamlet")
 | 
						|
        self.assertEqual(json["pointer"], id2)  # pointer does not move backwards
 | 
						|
 | 
						|
        result = self.client_post("/json/users/me/pointer", {"pointer": 99999999})
 | 
						|
        self.assert_json_error(result, "Invalid message ID")
 | 
						|
 | 
						|
    def test_get_all_profiles_avatar_urls(self) -> None:
 | 
						|
        user_profile = self.example_user('hamlet')
 | 
						|
        result = self.api_get(self.example_email("hamlet"), "/api/v1/users")
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
        for user in result.json()['members']:
 | 
						|
            if user['email'] == self.example_email("hamlet"):
 | 
						|
                self.assertEqual(
 | 
						|
                    user['avatar_url'],
 | 
						|
                    avatar_url(user_profile),
 | 
						|
                )
 | 
						|
 | 
						|
class FakeEmailDomainTest(ZulipTestCase):
 | 
						|
    @override_settings(FAKE_EMAIL_DOMAIN="invaliddomain")
 | 
						|
    def test_invalid_fake_email_domain(self) -> None:
 | 
						|
        with self.assertRaises(InvalidFakeEmailDomain):
 | 
						|
            get_fake_email_domain()
 | 
						|
 | 
						|
    @override_settings(FAKE_EMAIL_DOMAIN="127.0.0.1")
 | 
						|
    def test_invalid_fake_email_domain_ip(self) -> None:
 | 
						|
        with self.assertRaises(InvalidFakeEmailDomain):
 | 
						|
            get_fake_email_domain()
 |