mirror of
				https://github.com/zulip/zulip.git
				synced 2025-11-03 21:43:21 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			463 lines
		
	
	
		
			18 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			463 lines
		
	
	
		
			18 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# -*- coding: utf-8 -*-
 | 
						|
from __future__ import print_function
 | 
						|
 | 
						|
from typing import (Any, Dict, Iterable, List, Mapping,
 | 
						|
                    Optional, TypeVar, Text, Union)
 | 
						|
 | 
						|
from django.http import HttpResponse
 | 
						|
from django.test import TestCase
 | 
						|
 | 
						|
from zerver.lib.test_helpers import (
 | 
						|
    queries_captured, simulated_empty_cache,
 | 
						|
    tornado_redirected_to_list,
 | 
						|
    most_recent_message, make_client, avatar_disk_path,
 | 
						|
    get_test_image_file
 | 
						|
)
 | 
						|
from zerver.lib.test_classes import (
 | 
						|
    ZulipTestCase,
 | 
						|
)
 | 
						|
from zerver.lib.test_runner import slow
 | 
						|
 | 
						|
from zerver.models import UserProfile, Recipient, \
 | 
						|
    Realm, RealmDomain, UserActivity, \
 | 
						|
    get_user, get_realm, get_client, get_stream, \
 | 
						|
    Message, get_context_for_message, ScheduledEmail
 | 
						|
 | 
						|
from zerver.lib.avatar import avatar_url
 | 
						|
from zerver.lib.email_mirror import create_missed_message_address
 | 
						|
from zerver.lib.send_email import send_future_email
 | 
						|
from zerver.lib.actions import (
 | 
						|
    get_emails_from_user_ids,
 | 
						|
    do_deactivate_user,
 | 
						|
    do_reactivate_user,
 | 
						|
    do_change_is_admin,
 | 
						|
)
 | 
						|
 | 
						|
from django.conf import settings
 | 
						|
 | 
						|
import datetime
 | 
						|
import os
 | 
						|
import sys
 | 
						|
import time
 | 
						|
import ujson
 | 
						|
 | 
						|
K = TypeVar('K')
 | 
						|
V = TypeVar('V')
 | 
						|
def find_dict(lst, k, v):
 | 
						|
    # type: (Iterable[Dict[K, V]], K, V) -> Dict[K, V]
 | 
						|
    for dct in lst:
 | 
						|
        if dct[k] == v:
 | 
						|
            return dct
 | 
						|
    raise AssertionError('Cannot find element in list where key %s == %s' % (k, v))
 | 
						|
 | 
						|
class PermissionTest(ZulipTestCase):
 | 
						|
    def test_get_admin_users(self):
 | 
						|
        # type: () -> None
 | 
						|
        user_profile = self.example_user('hamlet')
 | 
						|
        do_change_is_admin(user_profile, False)
 | 
						|
        admin_users = user_profile.realm.get_admin_users()
 | 
						|
        self.assertFalse(user_profile in admin_users)
 | 
						|
        do_change_is_admin(user_profile, True)
 | 
						|
        admin_users = user_profile.realm.get_admin_users()
 | 
						|
        self.assertTrue(user_profile in admin_users)
 | 
						|
 | 
						|
    def test_updating_non_existent_user(self):
 | 
						|
        # type: () -> None
 | 
						|
        self.login(self.example_email("hamlet"))
 | 
						|
        admin = self.example_user('hamlet')
 | 
						|
        do_change_is_admin(admin, True)
 | 
						|
 | 
						|
        result = self.client_patch('/json/users/nonexistentuser@zulip.com', {})
 | 
						|
        self.assert_json_error(result, 'No such user')
 | 
						|
 | 
						|
    def test_admin_api(self):
 | 
						|
        # type: () -> None
 | 
						|
        self.login(self.example_email("hamlet"))
 | 
						|
        admin = self.example_user('hamlet')
 | 
						|
        user = self.example_user('othello')
 | 
						|
        realm = admin.realm
 | 
						|
        do_change_is_admin(admin, True)
 | 
						|
 | 
						|
        # Make sure we see is_admin flag in /json/users
 | 
						|
        result = self.client_get('/json/users')
 | 
						|
        self.assert_json_success(result)
 | 
						|
        members = result.json()['members']
 | 
						|
        hamlet = find_dict(members, 'email', self.example_email("hamlet"))
 | 
						|
        self.assertTrue(hamlet['is_admin'])
 | 
						|
        othello = find_dict(members, 'email', self.example_email("othello"))
 | 
						|
        self.assertFalse(othello['is_admin'])
 | 
						|
 | 
						|
        # Giveth
 | 
						|
        req = dict(is_admin=ujson.dumps(True))
 | 
						|
 | 
						|
        events = []  # type: List[Mapping[str, Any]]
 | 
						|
        with tornado_redirected_to_list(events):
 | 
						|
            result = self.client_patch('/json/users/othello@zulip.com', req)
 | 
						|
        self.assert_json_success(result)
 | 
						|
        admin_users = realm.get_admin_users()
 | 
						|
        self.assertTrue(user in admin_users)
 | 
						|
        person = events[0]['event']['person']
 | 
						|
        self.assertEqual(person['email'], self.example_email("othello"))
 | 
						|
        self.assertEqual(person['is_admin'], True)
 | 
						|
 | 
						|
        # Taketh away
 | 
						|
        req = dict(is_admin=ujson.dumps(False))
 | 
						|
        events = []
 | 
						|
        with tornado_redirected_to_list(events):
 | 
						|
            result = self.client_patch('/json/users/othello@zulip.com', req)
 | 
						|
        self.assert_json_success(result)
 | 
						|
        admin_users = realm.get_admin_users()
 | 
						|
        self.assertFalse(user in admin_users)
 | 
						|
        person = events[0]['event']['person']
 | 
						|
        self.assertEqual(person['email'], self.example_email("othello"))
 | 
						|
        self.assertEqual(person['is_admin'], False)
 | 
						|
 | 
						|
        # Cannot take away from last admin
 | 
						|
        self.login(self.example_email("iago"))
 | 
						|
        req = dict(is_admin=ujson.dumps(False))
 | 
						|
        events = []
 | 
						|
        with tornado_redirected_to_list(events):
 | 
						|
            result = self.client_patch('/json/users/hamlet@zulip.com', req)
 | 
						|
        self.assert_json_success(result)
 | 
						|
        admin_users = realm.get_admin_users()
 | 
						|
        self.assertFalse(admin in admin_users)
 | 
						|
        person = events[0]['event']['person']
 | 
						|
        self.assertEqual(person['email'], self.example_email("hamlet"))
 | 
						|
        self.assertEqual(person['is_admin'], False)
 | 
						|
        with tornado_redirected_to_list([]):
 | 
						|
            result = self.client_patch('/json/users/iago@zulip.com', req)
 | 
						|
        self.assert_json_error(result, 'Cannot remove the only organization administrator')
 | 
						|
 | 
						|
        # Make sure only admins can patch other user's info.
 | 
						|
        self.login(self.example_email("othello"))
 | 
						|
        result = self.client_patch('/json/users/hamlet@zulip.com', req)
 | 
						|
        self.assert_json_error(result, 'Insufficient permission')
 | 
						|
 | 
						|
    def test_admin_user_can_change_full_name(self):
 | 
						|
        # type: () -> None
 | 
						|
        new_name = 'new name'
 | 
						|
        self.login(self.example_email("iago"))
 | 
						|
        req = dict(full_name=ujson.dumps(new_name))
 | 
						|
        result = self.client_patch('/json/users/hamlet@zulip.com', req)
 | 
						|
        self.assertTrue(result.status_code == 200)
 | 
						|
        hamlet = self.example_user('hamlet')
 | 
						|
        self.assertEqual(hamlet.full_name, new_name)
 | 
						|
 | 
						|
    def test_non_admin_cannot_change_full_name(self):
 | 
						|
        # type: () -> None
 | 
						|
        self.login(self.example_email("hamlet"))
 | 
						|
        req = dict(full_name=ujson.dumps('new name'))
 | 
						|
        result = self.client_patch('/json/users/othello@zulip.com', req)
 | 
						|
        self.assert_json_error(result, 'Insufficient permission')
 | 
						|
 | 
						|
    def test_admin_cannot_set_long_full_name(self):
 | 
						|
        # type: () -> None
 | 
						|
        new_name = 'a' * (UserProfile.MAX_NAME_LENGTH + 1)
 | 
						|
        self.login(self.example_email("iago"))
 | 
						|
        req = dict(full_name=ujson.dumps(new_name))
 | 
						|
        result = self.client_patch('/json/users/hamlet@zulip.com', req)
 | 
						|
        self.assert_json_error(result, 'Name too long!')
 | 
						|
 | 
						|
    def test_admin_cannot_set_short_full_name(self):
 | 
						|
        # type: () -> None
 | 
						|
        new_name = 'a'
 | 
						|
        self.login(self.example_email("iago"))
 | 
						|
        req = dict(full_name=ujson.dumps(new_name))
 | 
						|
        result = self.client_patch('/json/users/hamlet@zulip.com', req)
 | 
						|
        self.assert_json_error(result, 'Name too short!')
 | 
						|
 | 
						|
    def test_admin_cannot_set_full_name_with_invalid_characters(self):
 | 
						|
        # type: () -> None
 | 
						|
        new_name = 'Opheli*'
 | 
						|
        self.login(self.example_email("iago"))
 | 
						|
        req = dict(full_name=ujson.dumps(new_name))
 | 
						|
        result = self.client_patch('/json/users/hamlet@zulip.com', req)
 | 
						|
        self.assert_json_error(result, 'Invalid characters in name!')
 | 
						|
 | 
						|
class AdminCreateUserTest(ZulipTestCase):
 | 
						|
    def test_create_user_backend(self):
 | 
						|
        # type: () -> None
 | 
						|
 | 
						|
        # This test should give us complete coverage on
 | 
						|
        # create_user_backend.  It mostly exercises error
 | 
						|
        # conditions, and it also does a basic test of the success
 | 
						|
        # path.
 | 
						|
 | 
						|
        admin = self.example_user('hamlet')
 | 
						|
        admin_email = admin.email
 | 
						|
        self.login(admin_email)
 | 
						|
        do_change_is_admin(admin, True)
 | 
						|
 | 
						|
        result = self.client_post("/json/users", dict())
 | 
						|
        self.assert_json_error(result, "Missing 'email' argument")
 | 
						|
 | 
						|
        result = self.client_post("/json/users", dict(
 | 
						|
            email='romeo@not-zulip.com',
 | 
						|
        ))
 | 
						|
        self.assert_json_error(result, "Missing 'password' argument")
 | 
						|
 | 
						|
        result = self.client_post("/json/users", dict(
 | 
						|
            email='romeo@not-zulip.com',
 | 
						|
            password='xxxx',
 | 
						|
        ))
 | 
						|
        self.assert_json_error(result, "Missing 'full_name' argument")
 | 
						|
 | 
						|
        result = self.client_post("/json/users", dict(
 | 
						|
            email='romeo@not-zulip.com',
 | 
						|
            password='xxxx',
 | 
						|
            full_name='Romeo Montague',
 | 
						|
        ))
 | 
						|
        self.assert_json_error(result, "Missing 'short_name' argument")
 | 
						|
 | 
						|
        result = self.client_post("/json/users", dict(
 | 
						|
            email='broken',
 | 
						|
            password='xxxx',
 | 
						|
            full_name='Romeo Montague',
 | 
						|
            short_name='Romeo',
 | 
						|
        ))
 | 
						|
        self.assert_json_error(result, "Bad name or username")
 | 
						|
 | 
						|
        result = self.client_post("/json/users", dict(
 | 
						|
            email='romeo@not-zulip.com',
 | 
						|
            password='xxxx',
 | 
						|
            full_name='Romeo Montague',
 | 
						|
            short_name='Romeo',
 | 
						|
        ))
 | 
						|
        self.assert_json_error(result,
 | 
						|
                               "Email 'romeo@not-zulip.com' not allowed for realm 'zulip'")
 | 
						|
 | 
						|
        RealmDomain.objects.create(realm=get_realm('zulip'), domain='zulip.net')
 | 
						|
 | 
						|
        # HAPPY PATH STARTS HERE
 | 
						|
        valid_params = dict(
 | 
						|
            email='romeo@zulip.net',
 | 
						|
            password='xxxx',
 | 
						|
            full_name='Romeo Montague',
 | 
						|
            short_name='Romeo',
 | 
						|
        )
 | 
						|
        result = self.client_post("/json/users", valid_params)
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
        # Romeo is a newly registered user
 | 
						|
        new_user = get_user('romeo@zulip.net', get_realm('zulip'))
 | 
						|
        self.assertEqual(new_user.full_name, 'Romeo Montague')
 | 
						|
        self.assertEqual(new_user.short_name, 'Romeo')
 | 
						|
 | 
						|
        # One more error condition to test--we can't create
 | 
						|
        # the same user twice.
 | 
						|
        result = self.client_post("/json/users", valid_params)
 | 
						|
        self.assert_json_error(result,
 | 
						|
                               "Email 'romeo@zulip.net' already in use")
 | 
						|
 | 
						|
class UserProfileTest(ZulipTestCase):
 | 
						|
    def test_get_emails_from_user_ids(self):
 | 
						|
        # type: () -> None
 | 
						|
        hamlet = self.example_user('hamlet')
 | 
						|
        othello = self.example_user('othello')
 | 
						|
        dct = get_emails_from_user_ids([hamlet.id, othello.id])
 | 
						|
        self.assertEqual(dct[hamlet.id], self.example_email("hamlet"))
 | 
						|
        self.assertEqual(dct[othello.id], self.example_email("othello"))
 | 
						|
 | 
						|
class ActivateTest(ZulipTestCase):
 | 
						|
    def test_basics(self):
 | 
						|
        # type: () -> None
 | 
						|
        user = self.example_user('hamlet')
 | 
						|
        do_deactivate_user(user)
 | 
						|
        self.assertFalse(user.is_active)
 | 
						|
        do_reactivate_user(user)
 | 
						|
        self.assertTrue(user.is_active)
 | 
						|
 | 
						|
    def test_api(self):
 | 
						|
        # type: () -> None
 | 
						|
        admin = self.example_user('othello')
 | 
						|
        do_change_is_admin(admin, True)
 | 
						|
        self.login(self.example_email("othello"))
 | 
						|
 | 
						|
        user = self.example_user('hamlet')
 | 
						|
        self.assertTrue(user.is_active)
 | 
						|
 | 
						|
        result = self.client_delete('/json/users/hamlet@zulip.com')
 | 
						|
        self.assert_json_success(result)
 | 
						|
        user = self.example_user('hamlet')
 | 
						|
        self.assertFalse(user.is_active)
 | 
						|
 | 
						|
        result = self.client_post('/json/users/hamlet@zulip.com/reactivate')
 | 
						|
        self.assert_json_success(result)
 | 
						|
        user = self.example_user('hamlet')
 | 
						|
        self.assertTrue(user.is_active)
 | 
						|
 | 
						|
    def test_api_me_user(self):
 | 
						|
        # type: () -> None
 | 
						|
        """This test helps ensure that our URL patterns for /users/me URLs
 | 
						|
        handle email addresses starting with "me" correctly."""
 | 
						|
        self.register(self.nonreg_email('me'), "testpassword")
 | 
						|
        self.login(self.example_email("iago"))
 | 
						|
 | 
						|
        result = self.client_delete('/json/users/me@zulip.com')
 | 
						|
        self.assert_json_success(result)
 | 
						|
        user = self.nonreg_user('me')
 | 
						|
        self.assertFalse(user.is_active)
 | 
						|
 | 
						|
        result = self.client_post('/json/users/{email}/reactivate'.format(email=self.nonreg_email('me')))
 | 
						|
        self.assert_json_success(result)
 | 
						|
        user = self.nonreg_user('me')
 | 
						|
        self.assertTrue(user.is_active)
 | 
						|
 | 
						|
    def test_api_with_nonexistent_user(self):
 | 
						|
        # type: () -> None
 | 
						|
        admin = self.example_user('othello')
 | 
						|
        do_change_is_admin(admin, True)
 | 
						|
        self.login(self.example_email("othello"))
 | 
						|
 | 
						|
        # Can not deactivate a user with the bot api
 | 
						|
        result = self.client_delete('/json/bots/hamlet@zulip.com')
 | 
						|
        self.assert_json_error(result, 'No such bot')
 | 
						|
 | 
						|
        # Can not deactivate a nonexistent user.
 | 
						|
        result = self.client_delete('/json/users/nonexistent@zulip.com')
 | 
						|
        self.assert_json_error(result, 'No such user')
 | 
						|
 | 
						|
        result = self.client_delete('/json/users/iago@zulip.com')
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
        result = self.client_delete('/json/users/othello@zulip.com')
 | 
						|
        self.assert_json_error(result, 'Cannot deactivate the only organization administrator')
 | 
						|
 | 
						|
        # Can not reactivate a nonexistent user.
 | 
						|
        result = self.client_post('/json/users/nonexistent@zulip.com/reactivate')
 | 
						|
        self.assert_json_error(result, 'No such user')
 | 
						|
 | 
						|
    def test_api_with_insufficient_permissions(self):
 | 
						|
        # type: () -> None
 | 
						|
        non_admin = self.example_user('othello')
 | 
						|
        do_change_is_admin(non_admin, False)
 | 
						|
        self.login(self.example_email("othello"))
 | 
						|
 | 
						|
        # Can not deactivate a user with the users api
 | 
						|
        result = self.client_delete('/json/users/hamlet@zulip.com')
 | 
						|
        self.assert_json_error(result, 'Insufficient permission')
 | 
						|
 | 
						|
        # Can not reactivate a user
 | 
						|
        result = self.client_post('/json/users/hamlet@zulip.com/reactivate')
 | 
						|
        self.assert_json_error(result, 'Insufficient permission')
 | 
						|
 | 
						|
    def test_clear_scheduled_jobs(self):
 | 
						|
        # type: () -> None
 | 
						|
        user = self.example_user('hamlet')
 | 
						|
        send_future_email('zerver/emails/followup_day1', to_user_id=user.id, delay=datetime.timedelta(hours=1))
 | 
						|
        self.assertEqual(ScheduledEmail.objects.count(), 1)
 | 
						|
        do_deactivate_user(user)
 | 
						|
        self.assertEqual(ScheduledEmail.objects.count(), 0)
 | 
						|
 | 
						|
class GetProfileTest(ZulipTestCase):
 | 
						|
 | 
						|
    def common_update_pointer(self, email, pointer):
 | 
						|
        # type: (Text, int) -> None
 | 
						|
        self.login(email)
 | 
						|
        result = self.client_post("/json/users/me/pointer", {"pointer": pointer})
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
    def common_get_profile(self, user_id):
 | 
						|
        # type: (str) -> Dict[Text, Any]
 | 
						|
        # Assumes all users are example users in realm 'zulip'
 | 
						|
        user_profile = self.example_user(user_id)
 | 
						|
        self.send_message(user_profile.email, "Verona", Recipient.STREAM, "hello")
 | 
						|
 | 
						|
        result = self.client_get("/api/v1/users/me", **self.api_auth(user_profile.email))
 | 
						|
 | 
						|
        max_id = most_recent_message(user_profile).id
 | 
						|
 | 
						|
        self.assert_json_success(result)
 | 
						|
        json = result.json()
 | 
						|
 | 
						|
        self.assertIn("client_id", json)
 | 
						|
        self.assertIn("max_message_id", json)
 | 
						|
        self.assertIn("pointer", json)
 | 
						|
 | 
						|
        self.assertEqual(json["max_message_id"], max_id)
 | 
						|
        return json
 | 
						|
 | 
						|
    def test_get_pointer(self):
 | 
						|
        # type: () -> None
 | 
						|
        email = self.example_email("hamlet")
 | 
						|
        self.login(email)
 | 
						|
        result = self.client_get("/json/users/me/pointer")
 | 
						|
        self.assert_json_success(result)
 | 
						|
        self.assertIn("pointer", result.json())
 | 
						|
 | 
						|
    def test_cache_behavior(self):
 | 
						|
        # type: () -> None
 | 
						|
        """Tests whether fetching a user object the normal way, with
 | 
						|
        `get_user`, makes 1 cache query and 1 database query.
 | 
						|
        """
 | 
						|
        realm = get_realm("zulip")
 | 
						|
        email = self.example_email("hamlet")
 | 
						|
        with queries_captured() as queries:
 | 
						|
            with simulated_empty_cache() as cache_queries:
 | 
						|
                user_profile = get_user(email, realm)
 | 
						|
 | 
						|
        self.assert_length(queries, 1)
 | 
						|
        self.assert_length(cache_queries, 1)
 | 
						|
        self.assertEqual(user_profile.email, email)
 | 
						|
 | 
						|
    def test_get_user_profile(self):
 | 
						|
        # type: () -> None
 | 
						|
        self.login(self.example_email("hamlet"))
 | 
						|
        result = ujson.loads(self.client_get('/json/users/me').content)
 | 
						|
        self.assertEqual(result['short_name'], 'hamlet')
 | 
						|
        self.assertEqual(result['email'], self.example_email("hamlet"))
 | 
						|
        self.assertEqual(result['full_name'], 'King Hamlet')
 | 
						|
        self.assertIn("user_id", result)
 | 
						|
        self.assertFalse(result['is_bot'])
 | 
						|
        self.assertFalse(result['is_admin'])
 | 
						|
        self.login(self.example_email("iago"))
 | 
						|
        result = ujson.loads(self.client_get('/json/users/me').content)
 | 
						|
        self.assertEqual(result['short_name'], 'iago')
 | 
						|
        self.assertEqual(result['email'], self.example_email("iago"))
 | 
						|
        self.assertEqual(result['full_name'], 'Iago')
 | 
						|
        self.assertFalse(result['is_bot'])
 | 
						|
        self.assertTrue(result['is_admin'])
 | 
						|
 | 
						|
    def test_api_get_empty_profile(self):
 | 
						|
        # type: () -> None
 | 
						|
        """
 | 
						|
        Ensure GET /users/me returns a max message id and returns successfully
 | 
						|
        """
 | 
						|
        json = self.common_get_profile("othello")
 | 
						|
        self.assertEqual(json["pointer"], -1)
 | 
						|
 | 
						|
    def test_profile_with_pointer(self):
 | 
						|
        # type: () -> None
 | 
						|
        """
 | 
						|
        Ensure GET /users/me returns a proper pointer id after the pointer is updated
 | 
						|
        """
 | 
						|
 | 
						|
        id1 = self.send_message(self.example_email("othello"), "Verona", Recipient.STREAM)
 | 
						|
        id2 = self.send_message(self.example_email("othello"), "Verona", Recipient.STREAM)
 | 
						|
 | 
						|
        json = self.common_get_profile("hamlet")
 | 
						|
 | 
						|
        self.common_update_pointer(self.example_email("hamlet"), id2)
 | 
						|
        json = self.common_get_profile("hamlet")
 | 
						|
        self.assertEqual(json["pointer"], id2)
 | 
						|
 | 
						|
        self.common_update_pointer(self.example_email("hamlet"), id1)
 | 
						|
        json = self.common_get_profile("hamlet")
 | 
						|
        self.assertEqual(json["pointer"], id2)  # pointer does not move backwards
 | 
						|
 | 
						|
        result = self.client_post("/json/users/me/pointer", {"pointer": 99999999})
 | 
						|
        self.assert_json_error(result, "Invalid message ID")
 | 
						|
 | 
						|
    def test_get_all_profiles_avatar_urls(self):
 | 
						|
        # type: () -> None
 | 
						|
        user_profile = self.example_user('hamlet')
 | 
						|
        result = self.client_get("/api/v1/users", **self.api_auth(self.example_email("hamlet")))
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
        for user in result.json()['members']:
 | 
						|
            if user['email'] == self.example_email("hamlet"):
 | 
						|
                self.assertEqual(
 | 
						|
                    user['avatar_url'],
 | 
						|
                    avatar_url(user_profile),
 | 
						|
                )
 |