mirror of
https://github.com/zulip/zulip.git
synced 2025-11-03 21:43:21 +00:00
Every time we updated a UserProfile object, we were calling delete_display_recipient_cache(), which churns the cache and does an extra database hop to find subscriptions. This was due to saying `updated_fields` instead of `update_fields`. This made us prone to cache churn for fields like UserProfile.pointer that are fairly volatile. Now we use the helper function changed(). To prevent the opposite problem, we use all the fields that could invalidate the cache.
579 lines
21 KiB
Python
579 lines
21 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
from typing import (Any, Dict, Iterable, List, Mapping,
|
|
Optional, TypeVar, Text, Union)
|
|
|
|
from django.http import HttpResponse
|
|
from django.test import TestCase
|
|
|
|
from zerver.lib.test_helpers import (
|
|
queries_captured, simulated_empty_cache,
|
|
tornado_redirected_to_list, get_subscription,
|
|
most_recent_message, make_client, avatar_disk_path,
|
|
get_test_image_file
|
|
)
|
|
from zerver.lib.test_classes import (
|
|
ZulipTestCase,
|
|
)
|
|
from zerver.lib.test_runner import slow
|
|
|
|
from zerver.models import UserProfile, Recipient, \
|
|
Realm, RealmDomain, UserActivity, \
|
|
get_user, get_realm, get_client, get_stream, get_recipient, \
|
|
Message, get_context_for_message, ScheduledEmail
|
|
|
|
from zerver.lib.avatar import avatar_url
|
|
from zerver.lib.email_mirror import create_missed_message_address
|
|
from zerver.lib.send_email import send_future_email
|
|
from zerver.lib.actions import (
|
|
get_emails_from_user_ids,
|
|
get_recipient_info,
|
|
do_deactivate_user,
|
|
do_reactivate_user,
|
|
do_change_is_admin,
|
|
)
|
|
from zerver.lib.topic_mutes import add_topic_mute
|
|
from zerver.lib.stream_topic import StreamTopicTarget
|
|
|
|
from django.conf import settings
|
|
|
|
import datetime
|
|
import mock
|
|
import os
|
|
import sys
|
|
import time
|
|
import ujson
|
|
|
|
K = TypeVar('K')
|
|
V = TypeVar('V')
|
|
def find_dict(lst, k, v):
|
|
# type: (Iterable[Dict[K, V]], K, V) -> Dict[K, V]
|
|
for dct in lst:
|
|
if dct[k] == v:
|
|
return dct
|
|
raise AssertionError('Cannot find element in list where key %s == %s' % (k, v))
|
|
|
|
class PermissionTest(ZulipTestCase):
|
|
def test_get_admin_users(self):
|
|
# type: () -> None
|
|
user_profile = self.example_user('hamlet')
|
|
do_change_is_admin(user_profile, False)
|
|
admin_users = user_profile.realm.get_admin_users()
|
|
self.assertFalse(user_profile in admin_users)
|
|
do_change_is_admin(user_profile, True)
|
|
admin_users = user_profile.realm.get_admin_users()
|
|
self.assertTrue(user_profile in admin_users)
|
|
|
|
def test_updating_non_existent_user(self):
|
|
# type: () -> None
|
|
self.login(self.example_email("hamlet"))
|
|
admin = self.example_user('hamlet')
|
|
do_change_is_admin(admin, True)
|
|
|
|
result = self.client_patch('/json/users/nonexistentuser@zulip.com', {})
|
|
self.assert_json_error(result, 'No such user')
|
|
|
|
def test_admin_api(self):
|
|
# type: () -> None
|
|
self.login(self.example_email("hamlet"))
|
|
admin = self.example_user('hamlet')
|
|
user = self.example_user('othello')
|
|
realm = admin.realm
|
|
do_change_is_admin(admin, True)
|
|
|
|
# Make sure we see is_admin flag in /json/users
|
|
result = self.client_get('/json/users')
|
|
self.assert_json_success(result)
|
|
members = result.json()['members']
|
|
hamlet = find_dict(members, 'email', self.example_email("hamlet"))
|
|
self.assertTrue(hamlet['is_admin'])
|
|
othello = find_dict(members, 'email', self.example_email("othello"))
|
|
self.assertFalse(othello['is_admin'])
|
|
|
|
# Giveth
|
|
req = dict(is_admin=ujson.dumps(True))
|
|
|
|
events = [] # type: List[Mapping[str, Any]]
|
|
with tornado_redirected_to_list(events):
|
|
result = self.client_patch('/json/users/othello@zulip.com', req)
|
|
self.assert_json_success(result)
|
|
admin_users = realm.get_admin_users()
|
|
self.assertTrue(user in admin_users)
|
|
person = events[0]['event']['person']
|
|
self.assertEqual(person['email'], self.example_email("othello"))
|
|
self.assertEqual(person['is_admin'], True)
|
|
|
|
# Taketh away
|
|
req = dict(is_admin=ujson.dumps(False))
|
|
events = []
|
|
with tornado_redirected_to_list(events):
|
|
result = self.client_patch('/json/users/othello@zulip.com', req)
|
|
self.assert_json_success(result)
|
|
admin_users = realm.get_admin_users()
|
|
self.assertFalse(user in admin_users)
|
|
person = events[0]['event']['person']
|
|
self.assertEqual(person['email'], self.example_email("othello"))
|
|
self.assertEqual(person['is_admin'], False)
|
|
|
|
# Cannot take away from last admin
|
|
self.login(self.example_email("iago"))
|
|
req = dict(is_admin=ujson.dumps(False))
|
|
events = []
|
|
with tornado_redirected_to_list(events):
|
|
result = self.client_patch('/json/users/hamlet@zulip.com', req)
|
|
self.assert_json_success(result)
|
|
admin_users = realm.get_admin_users()
|
|
self.assertFalse(admin in admin_users)
|
|
person = events[0]['event']['person']
|
|
self.assertEqual(person['email'], self.example_email("hamlet"))
|
|
self.assertEqual(person['is_admin'], False)
|
|
with tornado_redirected_to_list([]):
|
|
result = self.client_patch('/json/users/iago@zulip.com', req)
|
|
self.assert_json_error(result, 'Cannot remove the only organization administrator')
|
|
|
|
# Make sure only admins can patch other user's info.
|
|
self.login(self.example_email("othello"))
|
|
result = self.client_patch('/json/users/hamlet@zulip.com', req)
|
|
self.assert_json_error(result, 'Insufficient permission')
|
|
|
|
def test_admin_user_can_change_full_name(self):
|
|
# type: () -> None
|
|
new_name = 'new name'
|
|
self.login(self.example_email("iago"))
|
|
req = dict(full_name=ujson.dumps(new_name))
|
|
result = self.client_patch('/json/users/hamlet@zulip.com', req)
|
|
self.assertTrue(result.status_code == 200)
|
|
hamlet = self.example_user('hamlet')
|
|
self.assertEqual(hamlet.full_name, new_name)
|
|
|
|
def test_non_admin_cannot_change_full_name(self):
|
|
# type: () -> None
|
|
self.login(self.example_email("hamlet"))
|
|
req = dict(full_name=ujson.dumps('new name'))
|
|
result = self.client_patch('/json/users/othello@zulip.com', req)
|
|
self.assert_json_error(result, 'Insufficient permission')
|
|
|
|
def test_admin_cannot_set_long_full_name(self):
|
|
# type: () -> None
|
|
new_name = 'a' * (UserProfile.MAX_NAME_LENGTH + 1)
|
|
self.login(self.example_email("iago"))
|
|
req = dict(full_name=ujson.dumps(new_name))
|
|
result = self.client_patch('/json/users/hamlet@zulip.com', req)
|
|
self.assert_json_error(result, 'Name too long!')
|
|
|
|
def test_admin_cannot_set_short_full_name(self):
|
|
# type: () -> None
|
|
new_name = 'a'
|
|
self.login(self.example_email("iago"))
|
|
req = dict(full_name=ujson.dumps(new_name))
|
|
result = self.client_patch('/json/users/hamlet@zulip.com', req)
|
|
self.assert_json_error(result, 'Name too short!')
|
|
|
|
def test_admin_cannot_set_full_name_with_invalid_characters(self):
|
|
# type: () -> None
|
|
new_name = 'Opheli*'
|
|
self.login(self.example_email("iago"))
|
|
req = dict(full_name=ujson.dumps(new_name))
|
|
result = self.client_patch('/json/users/hamlet@zulip.com', req)
|
|
self.assert_json_error(result, 'Invalid characters in name!')
|
|
|
|
class AdminCreateUserTest(ZulipTestCase):
|
|
def test_create_user_backend(self):
|
|
# type: () -> None
|
|
|
|
# This test should give us complete coverage on
|
|
# create_user_backend. It mostly exercises error
|
|
# conditions, and it also does a basic test of the success
|
|
# path.
|
|
|
|
admin = self.example_user('hamlet')
|
|
admin_email = admin.email
|
|
self.login(admin_email)
|
|
do_change_is_admin(admin, True)
|
|
|
|
result = self.client_post("/json/users", dict())
|
|
self.assert_json_error(result, "Missing 'email' argument")
|
|
|
|
result = self.client_post("/json/users", dict(
|
|
email='romeo@not-zulip.com',
|
|
))
|
|
self.assert_json_error(result, "Missing 'password' argument")
|
|
|
|
result = self.client_post("/json/users", dict(
|
|
email='romeo@not-zulip.com',
|
|
password='xxxx',
|
|
))
|
|
self.assert_json_error(result, "Missing 'full_name' argument")
|
|
|
|
result = self.client_post("/json/users", dict(
|
|
email='romeo@not-zulip.com',
|
|
password='xxxx',
|
|
full_name='Romeo Montague',
|
|
))
|
|
self.assert_json_error(result, "Missing 'short_name' argument")
|
|
|
|
result = self.client_post("/json/users", dict(
|
|
email='broken',
|
|
password='xxxx',
|
|
full_name='Romeo Montague',
|
|
short_name='Romeo',
|
|
))
|
|
self.assert_json_error(result, "Bad name or username")
|
|
|
|
result = self.client_post("/json/users", dict(
|
|
email='romeo@not-zulip.com',
|
|
password='xxxx',
|
|
full_name='Romeo Montague',
|
|
short_name='Romeo',
|
|
))
|
|
self.assert_json_error(result,
|
|
"Email 'romeo@not-zulip.com' not allowed for realm 'zulip'")
|
|
|
|
RealmDomain.objects.create(realm=get_realm('zulip'), domain='zulip.net')
|
|
|
|
# HAPPY PATH STARTS HERE
|
|
valid_params = dict(
|
|
email='romeo@zulip.net',
|
|
password='xxxx',
|
|
full_name='Romeo Montague',
|
|
short_name='Romeo',
|
|
)
|
|
result = self.client_post("/json/users", valid_params)
|
|
self.assert_json_success(result)
|
|
|
|
# Romeo is a newly registered user
|
|
new_user = get_user('romeo@zulip.net', get_realm('zulip'))
|
|
self.assertEqual(new_user.full_name, 'Romeo Montague')
|
|
self.assertEqual(new_user.short_name, 'Romeo')
|
|
|
|
# One more error condition to test--we can't create
|
|
# the same user twice.
|
|
result = self.client_post("/json/users", valid_params)
|
|
self.assert_json_error(result,
|
|
"Email 'romeo@zulip.net' already in use")
|
|
|
|
class UserProfileTest(ZulipTestCase):
|
|
def test_get_emails_from_user_ids(self):
|
|
# type: () -> None
|
|
hamlet = self.example_user('hamlet')
|
|
othello = self.example_user('othello')
|
|
dct = get_emails_from_user_ids([hamlet.id, othello.id])
|
|
self.assertEqual(dct[hamlet.id], self.example_email("hamlet"))
|
|
self.assertEqual(dct[othello.id], self.example_email("othello"))
|
|
|
|
def test_cache_invalidation(self):
|
|
# type: () -> None
|
|
hamlet = self.example_user('hamlet')
|
|
with mock.patch('zerver.lib.cache.delete_display_recipient_cache') as m:
|
|
hamlet.full_name = 'Hamlet Junior'
|
|
hamlet.save(update_fields=["full_name"])
|
|
|
|
self.assertTrue(m.called)
|
|
|
|
with mock.patch('zerver.lib.cache.delete_display_recipient_cache') as m:
|
|
hamlet.long_term_idle = True
|
|
hamlet.save(update_fields=["long_term_idle"])
|
|
|
|
self.assertFalse(m.called)
|
|
|
|
class ActivateTest(ZulipTestCase):
|
|
def test_basics(self):
|
|
# type: () -> None
|
|
user = self.example_user('hamlet')
|
|
do_deactivate_user(user)
|
|
self.assertFalse(user.is_active)
|
|
do_reactivate_user(user)
|
|
self.assertTrue(user.is_active)
|
|
|
|
def test_api(self):
|
|
# type: () -> None
|
|
admin = self.example_user('othello')
|
|
do_change_is_admin(admin, True)
|
|
self.login(self.example_email("othello"))
|
|
|
|
user = self.example_user('hamlet')
|
|
self.assertTrue(user.is_active)
|
|
|
|
result = self.client_delete('/json/users/hamlet@zulip.com')
|
|
self.assert_json_success(result)
|
|
user = self.example_user('hamlet')
|
|
self.assertFalse(user.is_active)
|
|
|
|
result = self.client_post('/json/users/hamlet@zulip.com/reactivate')
|
|
self.assert_json_success(result)
|
|
user = self.example_user('hamlet')
|
|
self.assertTrue(user.is_active)
|
|
|
|
def test_api_me_user(self):
|
|
# type: () -> None
|
|
"""This test helps ensure that our URL patterns for /users/me URLs
|
|
handle email addresses starting with "me" correctly."""
|
|
self.register(self.nonreg_email('me'), "testpassword")
|
|
self.login(self.example_email("iago"))
|
|
|
|
result = self.client_delete('/json/users/me@zulip.com')
|
|
self.assert_json_success(result)
|
|
user = self.nonreg_user('me')
|
|
self.assertFalse(user.is_active)
|
|
|
|
result = self.client_post('/json/users/{email}/reactivate'.format(email=self.nonreg_email('me')))
|
|
self.assert_json_success(result)
|
|
user = self.nonreg_user('me')
|
|
self.assertTrue(user.is_active)
|
|
|
|
def test_api_with_nonexistent_user(self):
|
|
# type: () -> None
|
|
admin = self.example_user('othello')
|
|
do_change_is_admin(admin, True)
|
|
self.login(self.example_email("othello"))
|
|
|
|
# Can not deactivate a user with the bot api
|
|
result = self.client_delete('/json/bots/hamlet@zulip.com')
|
|
self.assert_json_error(result, 'No such bot')
|
|
|
|
# Can not deactivate a nonexistent user.
|
|
result = self.client_delete('/json/users/nonexistent@zulip.com')
|
|
self.assert_json_error(result, 'No such user')
|
|
|
|
result = self.client_delete('/json/users/iago@zulip.com')
|
|
self.assert_json_success(result)
|
|
|
|
result = self.client_delete('/json/users/othello@zulip.com')
|
|
self.assert_json_error(result, 'Cannot deactivate the only organization administrator')
|
|
|
|
# Can not reactivate a nonexistent user.
|
|
result = self.client_post('/json/users/nonexistent@zulip.com/reactivate')
|
|
self.assert_json_error(result, 'No such user')
|
|
|
|
def test_api_with_insufficient_permissions(self):
|
|
# type: () -> None
|
|
non_admin = self.example_user('othello')
|
|
do_change_is_admin(non_admin, False)
|
|
self.login(self.example_email("othello"))
|
|
|
|
# Can not deactivate a user with the users api
|
|
result = self.client_delete('/json/users/hamlet@zulip.com')
|
|
self.assert_json_error(result, 'Insufficient permission')
|
|
|
|
# Can not reactivate a user
|
|
result = self.client_post('/json/users/hamlet@zulip.com/reactivate')
|
|
self.assert_json_error(result, 'Insufficient permission')
|
|
|
|
def test_clear_scheduled_jobs(self):
|
|
# type: () -> None
|
|
user = self.example_user('hamlet')
|
|
send_future_email('zerver/emails/followup_day1', to_user_id=user.id, delay=datetime.timedelta(hours=1))
|
|
self.assertEqual(ScheduledEmail.objects.count(), 1)
|
|
do_deactivate_user(user)
|
|
self.assertEqual(ScheduledEmail.objects.count(), 0)
|
|
|
|
class RecipientInfoTest(ZulipTestCase):
|
|
def test_stream_recipient_info(self):
|
|
# type: () -> None
|
|
hamlet = self.example_user('hamlet')
|
|
cordelia = self.example_user('cordelia')
|
|
othello = self.example_user('othello')
|
|
|
|
realm = hamlet.realm
|
|
|
|
stream_name = 'Test Stream'
|
|
topic_name = 'test topic'
|
|
|
|
for user in [hamlet, cordelia, othello]:
|
|
self.subscribe(user, stream_name)
|
|
|
|
stream = get_stream(stream_name, realm)
|
|
recipient = get_recipient(Recipient.STREAM, stream.id)
|
|
|
|
stream_topic = StreamTopicTarget(
|
|
stream_id=stream.id,
|
|
topic_name=topic_name,
|
|
)
|
|
|
|
sub = get_subscription(stream_name, hamlet)
|
|
sub.push_notifications = True
|
|
sub.save()
|
|
|
|
info = get_recipient_info(
|
|
recipient=recipient,
|
|
sender_id=hamlet.id,
|
|
stream_topic=stream_topic,
|
|
)
|
|
|
|
all_user_ids = {hamlet.id, cordelia.id, othello.id}
|
|
|
|
expected_info = dict(
|
|
active_user_ids=all_user_ids,
|
|
push_notify_user_ids=set(),
|
|
stream_push_user_ids={hamlet.id},
|
|
um_eligible_user_ids=all_user_ids,
|
|
long_term_idle_user_ids=set(),
|
|
service_bot_tuples=[],
|
|
)
|
|
|
|
self.assertEqual(info, expected_info)
|
|
|
|
# Now mute Hamlet to omit him from stream_push_user_ids.
|
|
add_topic_mute(
|
|
user_profile=hamlet,
|
|
stream_id=stream.id,
|
|
recipient_id=recipient.id,
|
|
topic_name=topic_name,
|
|
)
|
|
|
|
info = get_recipient_info(
|
|
recipient=recipient,
|
|
sender_id=hamlet.id,
|
|
stream_topic=stream_topic,
|
|
)
|
|
|
|
self.assertEqual(info['stream_push_user_ids'], set())
|
|
|
|
class BulkUsersTest(ZulipTestCase):
|
|
def test_client_gravatar_option(self):
|
|
# type: () -> None
|
|
self.login(self.example_email('cordelia'))
|
|
|
|
hamlet = self.example_user('hamlet')
|
|
|
|
def get_hamlet_avatar(client_gravatar):
|
|
# type: (bool) -> Optional[Text]
|
|
data = dict(client_gravatar=ujson.dumps(client_gravatar))
|
|
result = self.client_get('/json/users', data)
|
|
self.assert_json_success(result)
|
|
rows = result.json()['members']
|
|
hamlet_data = [
|
|
row for row in rows
|
|
if row['user_id'] == hamlet.id
|
|
][0]
|
|
return hamlet_data['avatar_url']
|
|
|
|
self.assertEqual(
|
|
get_hamlet_avatar(client_gravatar=True),
|
|
None
|
|
)
|
|
|
|
'''
|
|
The main purpose of this test is to make sure we
|
|
return None for avatar_url when client_gravatar is
|
|
set to True. And we do a sanity check for when it's
|
|
False, but we leave it to other tests to validate
|
|
the specific URL.
|
|
'''
|
|
self.assertIn(
|
|
'gravatar.com',
|
|
get_hamlet_avatar(client_gravatar=False),
|
|
)
|
|
|
|
class GetProfileTest(ZulipTestCase):
|
|
|
|
def common_update_pointer(self, email, pointer):
|
|
# type: (Text, int) -> None
|
|
self.login(email)
|
|
result = self.client_post("/json/users/me/pointer", {"pointer": pointer})
|
|
self.assert_json_success(result)
|
|
|
|
def common_get_profile(self, user_id):
|
|
# type: (str) -> Dict[Text, Any]
|
|
# Assumes all users are example users in realm 'zulip'
|
|
user_profile = self.example_user(user_id)
|
|
self.send_message(user_profile.email, "Verona", Recipient.STREAM, "hello")
|
|
|
|
result = self.client_get("/api/v1/users/me", **self.api_auth(user_profile.email))
|
|
|
|
max_id = most_recent_message(user_profile).id
|
|
|
|
self.assert_json_success(result)
|
|
json = result.json()
|
|
|
|
self.assertIn("client_id", json)
|
|
self.assertIn("max_message_id", json)
|
|
self.assertIn("pointer", json)
|
|
|
|
self.assertEqual(json["max_message_id"], max_id)
|
|
return json
|
|
|
|
def test_get_pointer(self):
|
|
# type: () -> None
|
|
email = self.example_email("hamlet")
|
|
self.login(email)
|
|
result = self.client_get("/json/users/me/pointer")
|
|
self.assert_json_success(result)
|
|
self.assertIn("pointer", result.json())
|
|
|
|
def test_cache_behavior(self):
|
|
# type: () -> None
|
|
"""Tests whether fetching a user object the normal way, with
|
|
`get_user`, makes 1 cache query and 1 database query.
|
|
"""
|
|
realm = get_realm("zulip")
|
|
email = self.example_email("hamlet")
|
|
with queries_captured() as queries:
|
|
with simulated_empty_cache() as cache_queries:
|
|
user_profile = get_user(email, realm)
|
|
|
|
self.assert_length(queries, 1)
|
|
self.assert_length(cache_queries, 1)
|
|
self.assertEqual(user_profile.email, email)
|
|
|
|
def test_get_user_profile(self):
|
|
# type: () -> None
|
|
self.login(self.example_email("hamlet"))
|
|
result = ujson.loads(self.client_get('/json/users/me').content)
|
|
self.assertEqual(result['short_name'], 'hamlet')
|
|
self.assertEqual(result['email'], self.example_email("hamlet"))
|
|
self.assertEqual(result['full_name'], 'King Hamlet')
|
|
self.assertIn("user_id", result)
|
|
self.assertFalse(result['is_bot'])
|
|
self.assertFalse(result['is_admin'])
|
|
self.login(self.example_email("iago"))
|
|
result = ujson.loads(self.client_get('/json/users/me').content)
|
|
self.assertEqual(result['short_name'], 'iago')
|
|
self.assertEqual(result['email'], self.example_email("iago"))
|
|
self.assertEqual(result['full_name'], 'Iago')
|
|
self.assertFalse(result['is_bot'])
|
|
self.assertTrue(result['is_admin'])
|
|
|
|
def test_api_get_empty_profile(self):
|
|
# type: () -> None
|
|
"""
|
|
Ensure GET /users/me returns a max message id and returns successfully
|
|
"""
|
|
json = self.common_get_profile("othello")
|
|
self.assertEqual(json["pointer"], -1)
|
|
|
|
def test_profile_with_pointer(self):
|
|
# type: () -> None
|
|
"""
|
|
Ensure GET /users/me returns a proper pointer id after the pointer is updated
|
|
"""
|
|
|
|
id1 = self.send_message(self.example_email("othello"), "Verona", Recipient.STREAM)
|
|
id2 = self.send_message(self.example_email("othello"), "Verona", Recipient.STREAM)
|
|
|
|
json = self.common_get_profile("hamlet")
|
|
|
|
self.common_update_pointer(self.example_email("hamlet"), id2)
|
|
json = self.common_get_profile("hamlet")
|
|
self.assertEqual(json["pointer"], id2)
|
|
|
|
self.common_update_pointer(self.example_email("hamlet"), id1)
|
|
json = self.common_get_profile("hamlet")
|
|
self.assertEqual(json["pointer"], id2) # pointer does not move backwards
|
|
|
|
result = self.client_post("/json/users/me/pointer", {"pointer": 99999999})
|
|
self.assert_json_error(result, "Invalid message ID")
|
|
|
|
def test_get_all_profiles_avatar_urls(self):
|
|
# type: () -> None
|
|
user_profile = self.example_user('hamlet')
|
|
result = self.client_get("/api/v1/users", **self.api_auth(self.example_email("hamlet")))
|
|
self.assert_json_success(result)
|
|
|
|
for user in result.json()['members']:
|
|
if user['email'] == self.example_email("hamlet"):
|
|
self.assertEqual(
|
|
user['avatar_url'],
|
|
avatar_url(user_profile),
|
|
)
|