# -*- coding: utf-8 -*- # See http://zulip.readthedocs.io/en/latest/events-system.html for # high-level documentation on how this system works. from __future__ import absolute_import from __future__ import print_function from typing import Any, Callable, Dict, List, Optional from django.conf import settings from django.http import HttpRequest, HttpResponse from django.test import TestCase from django.utils import timezone from zerver.models import ( get_client, get_realm, get_recipient, get_stream, get_user_profile_by_email, Message, RealmAlias, Recipient, UserMessage, UserPresence, UserProfile ) from zerver.lib.actions import ( bulk_add_subscriptions, bulk_remove_subscriptions, do_add_alert_words, check_add_realm_emoji, check_send_typing_notification, do_add_realm_filter, do_add_reaction, do_remove_reaction, do_change_avatar_fields, do_change_default_language, do_change_default_all_public_streams, do_change_default_events_register_stream, do_change_default_sending_stream, do_change_full_name, do_change_bot_owner, do_change_is_admin, do_change_stream_description, do_change_subscription_property, do_change_timezone, do_create_user, do_deactivate_stream, do_deactivate_user, do_reactivate_user, do_refer_friend, do_regenerate_api_key, do_remove_alert_words, do_remove_realm_emoji, do_remove_realm_filter, do_rename_stream, do_add_default_stream, do_remove_default_stream, do_set_muted_topics, do_set_realm_property, do_set_realm_authentication_methods, do_set_realm_message_editing, do_update_embedded_data, do_update_message, do_update_message_flags, do_update_muted_topic, do_update_pointer, do_update_user_presence, do_change_twenty_four_hour_time, do_change_left_side_userlist, do_change_emoji_alt_code, do_change_enable_stream_desktop_notifications, do_change_enable_stream_sounds, do_change_enable_desktop_notifications, do_change_enable_sounds, do_change_enable_offline_email_notifications, do_change_enable_offline_push_notifications, do_change_enable_online_push_notifications, do_change_pm_content_in_desktop_notifications, do_change_enable_digest_emails, do_add_realm_alias, do_change_realm_alias, do_remove_realm_alias, do_change_icon_source, ) from zerver.lib.events import ( apply_events, fetch_initial_state_data, ) from zerver.lib.message import render_markdown from zerver.lib.test_helpers import POSTRequestMock, get_subscription from zerver.lib.test_classes import ( ZulipTestCase, ) from zerver.lib.validator import ( check_bool, check_dict, check_dict_only, check_float, check_int, check_list, check_string, equals, check_none_or, Validator ) from zerver.views.events_register import _default_all_public_streams, _default_narrow from zerver.tornado.event_queue import allocate_client_descriptor, EventQueue from zerver.tornado.views import get_events_backend from collections import OrderedDict import mock import time import ujson from six.moves import range class EventsEndpointTest(ZulipTestCase): def test_events_register_endpoint(self): # type: () -> None # This test is intended to get minimal coverage on the # events_register code paths email = 'hamlet@zulip.com' with mock.patch('zerver.views.events_register.do_events_register', return_value={}): result = self.client_post('/json/register', **self.api_auth(email)) self.assert_json_success(result) with mock.patch('zerver.lib.events.request_event_queue', return_value=None): result = self.client_post('/json/register', **self.api_auth(email)) self.assert_json_error(result, "Could not allocate event queue") with mock.patch('zerver.lib.events.request_event_queue', return_value='15:11'): with mock.patch('zerver.lib.events.get_user_events', return_value=[]): result = self.client_post('/json/register', dict(event_types=ujson.dumps(['pointer'])), **self.api_auth(email)) self.assert_json_success(result) result_dict = ujson.loads(result.content) self.assertEqual(result_dict['last_event_id'], -1) self.assertEqual(result_dict['queue_id'], '15:11') with mock.patch('zerver.lib.events.request_event_queue', return_value='15:12'): with mock.patch('zerver.lib.events.get_user_events', return_value=[{ 'id': 6, 'type': 'pointer', 'pointer': 15, }]): result = self.client_post('/json/register', dict(event_types=ujson.dumps(['pointer'])), **self.api_auth(email)) self.assert_json_success(result) result_dict = ujson.loads(result.content) self.assertEqual(result_dict['last_event_id'], 6) self.assertEqual(result_dict['pointer'], 15) self.assertEqual(result_dict['queue_id'], '15:12') def test_tornado_endpoint(self): # type: () -> None # This test is mostly intended to get minimal coverage on # the /notify_tornado endpoint, so we can have 100% URL coverage, # but it does exercise a little bit of the codepath. post_data = dict( data=ujson.dumps( dict( event=dict( type='other' ), users=[get_user_profile_by_email('hamlet@zulip.com').id], ), ), ) req = POSTRequestMock(post_data, user_profile=None) req.META['REMOTE_ADDR'] = '127.0.0.1' result = self.client_post_request('/notify_tornado', req) self.assert_json_error(result, 'Access denied', status_code=403) post_data['secret'] = settings.SHARED_SECRET req = POSTRequestMock(post_data, user_profile=None) req.META['REMOTE_ADDR'] = '127.0.0.1' result = self.client_post_request('/notify_tornado', req) self.assert_json_success(result) class GetEventsTest(ZulipTestCase): def tornado_call(self, view_func, user_profile, post_data): # type: (Callable[[HttpRequest, UserProfile], HttpResponse], UserProfile, Dict[str, Any]) -> HttpResponse request = POSTRequestMock(post_data, user_profile) return view_func(request, user_profile) def test_get_events(self): # type: () -> None email = "hamlet@zulip.com" recipient_email = "othello@zulip.com" user_profile = get_user_profile_by_email(email) recipient_user_profile = get_user_profile_by_email(recipient_email) self.login(email) result = self.tornado_call(get_events_backend, user_profile, {"apply_markdown": ujson.dumps(True), "event_types": ujson.dumps(["message"]), "user_client": "website", "dont_block": ujson.dumps(True), }) self.assert_json_success(result) queue_id = ujson.loads(result.content)["queue_id"] recipient_result = self.tornado_call(get_events_backend, recipient_user_profile, {"apply_markdown": ujson.dumps(True), "event_types": ujson.dumps(["message"]), "user_client": "website", "dont_block": ujson.dumps(True), }) self.assert_json_success(recipient_result) recipient_queue_id = ujson.loads(recipient_result.content)["queue_id"] result = self.tornado_call(get_events_backend, user_profile, {"queue_id": queue_id, "user_client": "website", "last_event_id": -1, "dont_block": ujson.dumps(True), }) events = ujson.loads(result.content)["events"] self.assert_json_success(result) self.assert_length(events, 0) local_id = 10.01 self.send_message(email, recipient_email, Recipient.PERSONAL, "hello", local_id=local_id, sender_queue_id=queue_id) result = self.tornado_call(get_events_backend, user_profile, {"queue_id": queue_id, "user_client": "website", "last_event_id": -1, "dont_block": ujson.dumps(True), }) events = ujson.loads(result.content)["events"] self.assert_json_success(result) self.assert_length(events, 1) self.assertEqual(events[0]["type"], "message") self.assertEqual(events[0]["message"]["sender_email"], email) self.assertEqual(events[0]["local_message_id"], local_id) self.assertEqual(events[0]["message"]["display_recipient"][0]["is_mirror_dummy"], False) self.assertEqual(events[0]["message"]["display_recipient"][1]["is_mirror_dummy"], False) last_event_id = events[0]["id"] local_id += 0.01 self.send_message(email, recipient_email, Recipient.PERSONAL, "hello", local_id=local_id, sender_queue_id=queue_id) result = self.tornado_call(get_events_backend, user_profile, {"queue_id": queue_id, "user_client": "website", "last_event_id": last_event_id, "dont_block": ujson.dumps(True), }) events = ujson.loads(result.content)["events"] self.assert_json_success(result) self.assert_length(events, 1) self.assertEqual(events[0]["type"], "message") self.assertEqual(events[0]["message"]["sender_email"], email) self.assertEqual(events[0]["local_message_id"], local_id) # Test that the received message in the receiver's event queue # exists and does not contain a local id recipient_result = self.tornado_call(get_events_backend, recipient_user_profile, {"queue_id": recipient_queue_id, "user_client": "website", "last_event_id": -1, "dont_block": ujson.dumps(True), }) recipient_events = ujson.loads(recipient_result.content)["events"] self.assert_json_success(recipient_result) self.assertEqual(len(recipient_events), 2) self.assertEqual(recipient_events[0]["type"], "message") self.assertEqual(recipient_events[0]["message"]["sender_email"], email) self.assertTrue("local_message_id" not in recipient_events[0]) self.assertEqual(recipient_events[1]["type"], "message") self.assertEqual(recipient_events[1]["message"]["sender_email"], email) self.assertTrue("local_message_id" not in recipient_events[1]) def test_get_events_narrow(self): # type: () -> None email = "hamlet@zulip.com" user_profile = get_user_profile_by_email(email) self.login(email) result = self.tornado_call(get_events_backend, user_profile, {"apply_markdown": ujson.dumps(True), "event_types": ujson.dumps(["message"]), "narrow": ujson.dumps([["stream", "denmark"]]), "user_client": "website", "dont_block": ujson.dumps(True), }) self.assert_json_success(result) queue_id = ujson.loads(result.content)["queue_id"] result = self.tornado_call(get_events_backend, user_profile, {"queue_id": queue_id, "user_client": "website", "last_event_id": -1, "dont_block": ujson.dumps(True), }) events = ujson.loads(result.content)["events"] self.assert_json_success(result) self.assert_length(events, 0) self.send_message(email, "othello@zulip.com", Recipient.PERSONAL, "hello") self.send_message(email, "Denmark", Recipient.STREAM, "hello") result = self.tornado_call(get_events_backend, user_profile, {"queue_id": queue_id, "user_client": "website", "last_event_id": -1, "dont_block": ujson.dumps(True), }) events = ujson.loads(result.content)["events"] self.assert_json_success(result) self.assert_length(events, 1) self.assertEqual(events[0]["type"], "message") self.assertEqual(events[0]["message"]["display_recipient"], "Denmark") class EventsRegisterTest(ZulipTestCase): user_profile = get_user_profile_by_email("hamlet@zulip.com") maxDiff = None # type: Optional[int] def create_bot(self, email): # type: (str) -> UserProfile return do_create_user(email, '123', get_realm('zulip'), 'Test Bot', 'test', bot_type=UserProfile.DEFAULT_BOT, bot_owner=self.user_profile) def realm_bot_schema(self, field_name, check): # type: (str, Validator) -> Validator return check_dict_only([ ('id', check_int), ('type', equals('realm_bot')), ('op', equals('update')), ('bot', check_dict_only([ ('email', check_string), ('user_id', check_int), (field_name, check), ])), ]) def do_test(self, action, event_types=None, include_subscribers=True, state_change_expected=True, num_events=1): # type: (Callable[[], Any], Optional[List[str]], bool, bool, int) -> List[Dict[str, Any]] client = allocate_client_descriptor( dict(user_profile_id = self.user_profile.id, user_profile_email = self.user_profile.email, realm_id = self.user_profile.realm_id, event_types = event_types, client_type_name = "website", apply_markdown = True, all_public_streams = False, queue_timeout = 600, last_connection_time = time.time(), narrow = []) ) # hybrid_state = initial fetch state + re-applying events triggered by our action # normal_state = do action then fetch at the end (the "normal" code path) hybrid_state = fetch_initial_state_data(self.user_profile, event_types, "", include_subscribers=include_subscribers) action() events = client.event_queue.contents() self.assertTrue(len(events) == num_events) before = ujson.dumps(hybrid_state) apply_events(hybrid_state, events, self.user_profile, include_subscribers=include_subscribers) after = ujson.dumps(hybrid_state) if state_change_expected: if before == after: print(events) # nocoverage raise AssertionError('Test does not exercise enough code -- events do not change state.') else: if before != after: raise AssertionError('Test is invalid--state actually does change here.') normal_state = fetch_initial_state_data(self.user_profile, event_types, "", include_subscribers=include_subscribers) self.match_states(hybrid_state, normal_state) return events def assert_on_error(self, error): # type: (Optional[str]) -> None if error: raise AssertionError(error) def match_states(self, state1, state2): # type: (Dict[str, Any], Dict[str, Any]) -> None def normalize(state): # type: (Dict[str, Any]) -> None state['realm_users'] = {u['email']: u for u in state['realm_users']} for u in state['subscriptions']: if 'subscribers' in u: u['subscribers'].sort() state['subscriptions'] = {u['name']: u for u in state['subscriptions']} state['unsubscribed'] = {u['name']: u for u in state['unsubscribed']} if 'realm_bots' in state: state['realm_bots'] = {u['email']: u for u in state['realm_bots']} normalize(state1) normalize(state2) self.assertEqual(state1, state2) def test_send_message_events(self): # type: () -> None schema_checker = check_dict([ ('type', equals('message')), ('flags', check_list(None)), ('message', check_dict([ ('avatar_url', check_string), ('client', check_string), ('content', check_string), ('content_type', equals('text/html')), ('display_recipient', check_string), ('gravatar_hash', check_string), ('id', check_int), ('recipient_id', check_int), ('sender_realm_str', check_string), ('sender_email', check_string), ('sender_full_name', check_string), ('sender_id', check_int), ('sender_short_name', check_string), ('subject', check_string), ('subject_links', check_list(None)), ('timestamp', check_int), ('type', check_string), ])), ]) events = self.do_test( lambda: self.send_message("hamlet@zulip.com", "Verona", Recipient.STREAM, "hello"), ) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) # Verify message editing schema_checker = check_dict([ ('type', equals('update_message')), ('flags', check_list(None)), ('content', check_string), ('edit_timestamp', check_int), ('flags', check_list(None)), ('message_id', check_int), ('message_ids', check_list(check_int)), ('orig_content', check_string), ('orig_rendered_content', check_string), ('orig_subject', check_string), ('propagate_mode', check_string), ('rendered_content', check_string), ('sender', check_string), ('stream_id', check_int), ('subject', check_string), ('subject_links', check_list(None)), ('user_id', check_int), # There is also a timestamp field in the event, but we ignore it, as # it's kind of an unwanted but harmless side effect of calling log_event. ]) message = Message.objects.order_by('-id')[0] topic = 'new_topic' propagate_mode = 'change_all' content = 'new content' rendered_content = render_markdown(message, content) events = self.do_test( lambda: do_update_message(self.user_profile, message, topic, propagate_mode, content, rendered_content), state_change_expected=False, ) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) # Verify do_update_embedded_data schema_checker = check_dict([ ('type', equals('update_message')), ('flags', check_list(None)), ('content', check_string), ('flags', check_list(None)), ('message_id', check_int), ('message_ids', check_list(check_int)), ('rendered_content', check_string), ('sender', check_string), ]) events = self.do_test( lambda: do_update_embedded_data(self.user_profile, message, u"embed_content", "
embed_content
"), state_change_expected=False, ) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_update_message_flags(self): # type: () -> None # Test message flag update events schema_checker = check_dict([ ('id', check_int), ('type', equals('update_message_flags')), ('flag', check_string), ('messages', check_list(check_int)), ('operation', equals("add")), ]) message = self.send_message("cordelia@zulip.com", "hamlet@zulip.com", Recipient.PERSONAL, "hello") user_profile = get_user_profile_by_email("hamlet@zulip.com") events = self.do_test( lambda: do_update_message_flags(user_profile, 'add', 'starred', [message], False, None, None), state_change_expected=False, ) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) schema_checker = check_dict([ ('id', check_int), ('type', equals('update_message_flags')), ('flag', check_string), ('messages', check_list(check_int)), ('operation', equals("remove")), ]) events = self.do_test( lambda: do_update_message_flags(user_profile, 'remove', 'starred', [message], False, None, None), state_change_expected=False, ) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_send_reaction(self): # type: () -> None schema_checker = check_dict([ ('type', equals('reaction')), ('op', equals('add')), ('message_id', check_int), ('emoji_name', check_string), ('user', check_dict([ ('email', check_string), ('full_name', check_string), ('user_id', check_int) ])), ]) message_id = self.send_message("hamlet@zulip.com", "Verona", Recipient.STREAM, "hello") message = Message.objects.get(id=message_id) events = self.do_test( lambda: do_add_reaction( self.user_profile, message, "tada"), state_change_expected=False, ) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_remove_reaction(self): # type: () -> None schema_checker = check_dict([ ('type', equals('reaction')), ('op', equals('remove')), ('message_id', check_int), ('emoji_name', check_string), ('user', check_dict([ ('email', check_string), ('full_name', check_string), ('user_id', check_int) ])), ]) message_id = self.send_message("hamlet@zulip.com", "Verona", Recipient.STREAM, "hello") message = Message.objects.get(id=message_id) events = self.do_test( lambda: do_remove_reaction( self.user_profile, message, "tada"), state_change_expected=False, ) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_typing_events(self): # type: () -> None schema_checker = check_dict([ ('type', equals('typing')), ('op', equals('start')), ('sender', check_dict([ ('email', check_string), ('user_id', check_int)])), ('recipients', check_list(check_dict([ ('email', check_string), ('user_id', check_int), ]))), ]) events = self.do_test( lambda: check_send_typing_notification( self.user_profile, ["cordelia@zulip.com"], "start"), state_change_expected=False, ) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_presence_events(self): # type: () -> None schema_checker = check_dict([ ('type', equals('pointer')), ('email', check_string), ('timestamp', check_float), ('presence', check_dict([ # TODO: Add more here once the test below works ])), ]) # BUG: Marked as failing for now because this is a failing # test, due to the `aggregated` feature not being supported by # our events code. with self.assertRaises(AssertionError): events = self.do_test(lambda: do_update_user_presence( self.user_profile, get_client("website"), timezone.now(), UserPresence.ACTIVE)) # Marked as nocoverage since unreachable error = schema_checker('events[0]', events[0]) # nocoverage self.assert_on_error(error) # nocoverage def test_pointer_events(self): # type: () -> None schema_checker = check_dict([ ('type', equals('pointer')), ('pointer', check_int) ]) events = self.do_test(lambda: do_update_pointer(self.user_profile, 1500)) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_referral_events(self): # type: () -> None schema_checker = check_dict([ ('type', equals('referral')), ('referrals', check_dict([ ('granted', check_int), ('used', check_int), ])), ]) events = self.do_test(lambda: do_refer_friend(self.user_profile, "friend@example.com")) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_register_events(self): # type: () -> None realm_user_add_checker = check_dict([ ('type', equals('realm_user')), ('op', equals('add')), ('person', check_dict([ ('email', check_string), ('full_name', check_string), ('is_admin', check_bool), ('is_bot', check_bool), ])), ]) events = self.do_test(lambda: self.register("test1@zulip.com", "test1")) self.assert_length(events, 1) error = realm_user_add_checker('events[0]', events[0]) self.assert_on_error(error) def test_alert_words_events(self): # type: () -> None alert_words_checker = check_dict([ ('type', equals('alert_words')), ('alert_words', check_list(check_string)), ]) events = self.do_test(lambda: do_add_alert_words(self.user_profile, ["alert_word"])) error = alert_words_checker('events[0]', events[0]) self.assert_on_error(error) events = self.do_test(lambda: do_remove_alert_words(self.user_profile, ["alert_word"])) error = alert_words_checker('events[0]', events[0]) self.assert_on_error(error) def test_default_streams_events(self): # type: () -> None default_streams_checker = check_dict([ ('type', equals('default_streams')), ('default_streams', check_list(check_dict([ ('description', check_string), ('invite_only', check_bool), ('name', check_string), ('stream_id', check_int), ]))), ]) stream = get_stream("Scotland", self.user_profile.realm) events = self.do_test(lambda: do_add_default_stream(stream)) error = default_streams_checker('events[0]', events[0]) events = self.do_test(lambda: do_remove_default_stream(stream)) error = default_streams_checker('events[0]', events[0]) self.assert_on_error(error) def test_muted_topics_events(self): # type: () -> None muted_topics_checker = check_dict([ ('type', equals('muted_topics')), ('muted_topics', check_list(check_list(check_string, 2))), ]) events = self.do_test(lambda: do_set_muted_topics(self.user_profile, [[u"Denmark", u"topic"]])) error = muted_topics_checker('events[0]', events[0]) self.assert_on_error(error) events = self.do_test(lambda: do_update_muted_topic( self.user_profile, "Denmark", "topic", "add")) error = muted_topics_checker('events[0]', events[0]) self.assert_on_error(error) events = self.do_test(lambda: do_update_muted_topic( self.user_profile, "Denmark", "topic", "remove")) error = muted_topics_checker('events[0]', events[0]) self.assert_on_error(error) def test_change_avatar_fields(self): # type: () -> None schema_checker = check_dict([ ('type', equals('realm_user')), ('op', equals('update')), ('person', check_dict([ ('email', check_string), ('user_id', check_int), ('avatar_url', check_string), ])), ]) events = self.do_test( lambda: do_change_avatar_fields(self.user_profile, UserProfile.AVATAR_FROM_USER), ) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_change_full_name(self): # type: () -> None schema_checker = check_dict([ ('type', equals('realm_user')), ('op', equals('update')), ('person', check_dict([ ('email', check_string), ('full_name', check_string), ])), ]) events = self.do_test(lambda: do_change_full_name(self.user_profile, 'Sir Hamlet')) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_change_realm_name(self): # type: () -> None schema_checker = check_dict([ ('type', equals('realm')), ('op', equals('update')), ('property', equals('name')), ('value', check_string), ]) events = self.do_test( lambda: do_set_realm_property(self.user_profile.realm, 'name', u'New Realm Name')) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_change_realm_description(self): # type: () -> None schema_checker = check_dict([ ('type', equals('realm')), ('op', equals('update')), ('property', equals('description')), ('value', check_string), ]) events = self.do_test( lambda: do_set_realm_property(self.user_profile.realm, 'description', u'New Realm Description')) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_change_realm_waiting_period_threshold(self): # type: () -> None schema_checker = check_dict([ ('type', equals('realm')), ('op', equals('update')), ('property', equals('waiting_period_threshold')), ('value', check_int), ]) events = self.do_test( lambda: do_set_realm_property(self.user_profile.realm, 'waiting_period_threshold', 17)) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_change_message_retention_days(self): # type: () -> None schema_checker = check_dict([ ('type', equals('realm')), ('op', equals('update')), ('property', equals('message_retention_days')), ('value', check_int), ]) events = self.do_test( lambda: do_set_realm_property(self.user_profile.realm, "message_retention_days", 30)) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_change_realm_add_emoji_by_admins_only(self): # type: () -> None schema_checker = check_dict([ ('type', equals('realm')), ('op', equals('update')), ('property', equals('add_emoji_by_admins_only')), ('value', check_bool), ]) events = self.do_test( lambda: do_set_realm_property(self.user_profile.realm, 'add_emoji_by_admins_only', True)) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_change_realm_restricted_to_domain(self): # type: () -> None schema_checker = check_dict([ ('type', equals('realm')), ('op', equals('update')), ('property', equals('restricted_to_domain')), ('value', check_bool), ]) do_set_realm_property(self.user_profile.realm, 'restricted_to_domain', True) for restricted_to_domain in (False, True): events = self.do_test( lambda: do_set_realm_property(self.user_profile.realm, 'restricted_to_domain', restricted_to_domain)) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_change_realm_invite_required(self): # type: () -> None schema_checker = check_dict([ ('type', equals('realm')), ('op', equals('update')), ('property', equals('invite_required')), ('value', check_bool), ]) invite_required = False do_set_realm_property(self.user_profile.realm, 'invite_required', invite_required) for invite_required in (True, False): events = self.do_test( lambda: do_set_realm_property(self.user_profile.realm, 'invite_required', invite_required)) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_change_realm_name_changes_disabled(self): # type: () -> None schema_checker = check_dict([ ('type', equals('realm')), ('op', equals('update')), ('property', equals('name_changes_disabled')), ('value', check_bool), ]) do_set_realm_property(self.user_profile.realm, 'name_changes_disabled', True) for name_changes_disabled in (False, True): events = self.do_test( lambda: do_set_realm_property(self.user_profile.realm, 'name_changes_disabled', name_changes_disabled)) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_change_realm_email_changes_disabled(self): # type: () -> None schema_checker = check_dict([ ('type', equals('realm')), ('op', equals('update')), ('property', equals('email_changes_disabled')), ('value', check_bool), ]) do_set_realm_property(self.user_profile.realm, 'email_changes_disabled', True) for email_changes_disabled in (False, True): events = self.do_test(lambda: do_set_realm_property(self.user_profile.realm, 'email_changes_disabled', email_changes_disabled)) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_change_realm_authentication_methods(self): # type: () -> None schema_checker = check_dict([ ('type', equals('realm')), ('op', equals('update_dict')), ('property', equals('default')), ('data', check_dict([])), ]) def fake_backends(): # type: () -> Any backends = ( 'zproject.backends.DevAuthBackend', 'zproject.backends.EmailAuthBackend', 'zproject.backends.GitHubAuthBackend', 'zproject.backends.GoogleMobileOauth2Backend', 'zproject.backends.ZulipLDAPAuthBackend', ) return self.settings(AUTHENTICATION_BACKENDS=backends) # Test transitions; any new backends should be tested with T/T/T/F/T for (auth_method_dict) in \ ({'Google': True, 'Email': True, 'GitHub': True, 'LDAP': False, 'Dev': False}, {'Google': True, 'Email': True, 'GitHub': False, 'LDAP': False, 'Dev': False}, {'Google': True, 'Email': False, 'GitHub': False, 'LDAP': False, 'Dev': False}, {'Google': True, 'Email': False, 'GitHub': True, 'LDAP': False, 'Dev': False}, {'Google': False, 'Email': False, 'GitHub': False, 'LDAP': False, 'Dev': True}, {'Google': False, 'Email': False, 'GitHub': True, 'LDAP': False, 'Dev': True}, {'Google': False, 'Email': True, 'GitHub': True, 'LDAP': True, 'Dev': False}): with fake_backends(): events = self.do_test( lambda: do_set_realm_authentication_methods( self.user_profile.realm, auth_method_dict)) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_change_realm_invite_by_admins_only(self): # type: () -> None schema_checker = check_dict([ ('type', equals('realm')), ('op', equals('update')), ('property', equals('invite_by_admins_only')), ('value', check_bool), ]) invite_by_admins_only = False do_set_realm_property(self.user_profile.realm, 'invite_by_admins_only', invite_by_admins_only) for invite_by_admins_only in (True, False): events = self.do_test( lambda: do_set_realm_property(self.user_profile.realm, 'invite_by_admins_only', invite_by_admins_only)) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_change_realm_inline_image_preview(self): # type: () -> None schema_checker = check_dict([ ('type', equals('realm')), ('op', equals('update')), ('property', equals('inline_image_preview')), ('value', check_bool), ]) inline_image_preview = False do_set_realm_property(self.user_profile.realm, 'inline_image_preview', inline_image_preview) for inline_image_preview in (True, False): events = self.do_test( lambda: do_set_realm_property(self.user_profile.realm, 'inline_image_preview', inline_image_preview)) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_change_realm_inline_url_embed_preview(self): # type: () -> None schema_checker = check_dict([ ('type', equals('realm')), ('op', equals('update')), ('property', equals('inline_url_embed_preview')), ('value', check_bool), ]) inline_url_embed_preview = False do_set_realm_property(self.user_profile.realm, 'inline_url_embed_preview', inline_url_embed_preview) for inline_url_embed_preview in (True, False): events = self.do_test( lambda: do_set_realm_property(self.user_profile.realm, 'inline_url_embed_preview', inline_url_embed_preview)) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_change_realm_default_language(self): # type: () -> None schema_checker = check_dict([ ('type', equals('realm')), ('op', equals('update')), ('property', equals('default_language')), ('value', check_string), ]) events = self.do_test( lambda: do_set_realm_property(self.user_profile.realm, 'default_language', u'de')) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_change_realm_create_stream_by_admins_only(self): # type: () -> None schema_checker = check_dict([ ('type', equals('realm')), ('op', equals('update')), ('property', equals('create_stream_by_admins_only')), ('value', check_bool), ]) do_set_realm_property(self.user_profile.realm, 'create_stream_by_admins_only', False) for create_stream_by_admins_only in (True, False): events = self.do_test( lambda: do_set_realm_property( self.user_profile.realm, 'create_stream_by_admins_only', create_stream_by_admins_only)) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_change_pin_stream(self): # type: () -> None schema_checker = check_dict([ ('type', equals('subscription')), ('op', equals('update')), ('property', equals('pin_to_top')), ('stream_id', check_int), ('value', check_bool), ]) stream = get_stream("Denmark", self.user_profile.realm) sub = get_subscription(stream.name, self.user_profile) do_change_subscription_property(self.user_profile, sub, stream, "pin_to_top", False) for pinned in (True, False): events = self.do_test(lambda: do_change_subscription_property(self.user_profile, sub, stream, "pin_to_top", pinned)) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_change_realm_message_edit_settings(self): # type: () -> None schema_checker = check_dict([ ('type', equals('realm')), ('op', equals('update_dict')), ('property', equals('default')), ('data', check_dict([('allow_message_editing', check_bool), ('message_content_edit_limit_seconds', check_int)])), ]) # Test every transition among the four possibilities {T,F} x {0, non-0} for (allow_message_editing, message_content_edit_limit_seconds) in \ ((True, 0), (False, 0), (True, 0), (False, 1234), (True, 0), (True, 1234), (True, 0), (False, 0), (False, 1234), (False, 0), (True, 1234), (False, 0), (True, 1234), (True, 600), (False, 600), (False, 1234), (True, 600)): events = self.do_test( lambda: do_set_realm_message_editing(self.user_profile.realm, allow_message_editing, message_content_edit_limit_seconds)) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_change_is_admin(self): # type: () -> None schema_checker = check_dict([ ('type', equals('realm_user')), ('op', equals('update')), ('person', check_dict([ ('email', check_string), ('is_admin', check_bool), ])), ]) do_change_is_admin(self.user_profile, False) for is_admin in [True, False]: events = self.do_test(lambda: do_change_is_admin(self.user_profile, is_admin)) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_change_twenty_four_hour_time(self): # type: () -> None schema_checker = check_dict([ ('type', equals('update_display_settings')), ('setting_name', equals('twenty_four_hour_time')), ('user', check_string), ('setting', check_bool), ]) do_change_twenty_four_hour_time(self.user_profile, False) for setting_value in [True, False]: events = self.do_test(lambda: do_change_twenty_four_hour_time(self.user_profile, setting_value)) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_change_left_side_userlist(self): # type: () -> None schema_checker = check_dict([ ('type', equals('update_display_settings')), ('setting_name', equals('left_side_userlist')), ('user', check_string), ('setting', check_bool), ]) do_change_left_side_userlist(self.user_profile, False) for setting_value in [True, False]: events = self.do_test(lambda: do_change_left_side_userlist(self.user_profile, setting_value)) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_change_emoji_alt_code(self): # type: () -> None schema_checker = check_dict([ ('type', equals('update_display_settings')), ('setting_name', equals('emoji_alt_code')), ('user', check_string), ('setting', check_bool), ]) do_change_emoji_alt_code(self.user_profile, False) for setting_value in [True, False]: events = self.do_test(lambda: do_change_emoji_alt_code(self.user_profile, setting_value)) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_change_default_language(self): # type: () -> None schema_checker = check_dict([ ('type', equals('update_display_settings')), ('setting_name', equals('default_language')), ('user', check_string), ('setting', check_string), ]) for setting_value in ['de', 'es', 'en']: events = self.do_test(lambda: do_change_default_language(self.user_profile, setting_value)) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_change_timezone(self): # type: () -> None schema_checker = check_dict([ ('type', equals('update_display_settings')), ('setting_name', equals('timezone')), ('user', check_string), ('setting', check_string), ]) for setting_value in ['US/Mountain', 'US/Samoa', 'Pacific/Galapagos', '']: events = self.do_test(lambda: do_change_timezone(self.user_profile, setting_value)) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_change_enable_stream_desktop_notifications(self): # type: () -> None schema_checker = check_dict([ ('type', equals('update_global_notifications')), ('notification_name', equals('enable_stream_desktop_notifications')), ('user', check_string), ('setting', check_bool), ]) do_change_enable_stream_desktop_notifications(self.user_profile, False) for setting_value in [True, False]: events = self.do_test(lambda: do_change_enable_stream_desktop_notifications(self.user_profile, setting_value)) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_change_enable_stream_sounds(self): # type: () -> None schema_checker = check_dict([ ('type', equals('update_global_notifications')), ('notification_name', equals('enable_stream_sounds')), ('user', check_string), ('setting', check_bool), ]) do_change_enable_stream_sounds(self.user_profile, False) for setting_value in [True, False]: events = self.do_test(lambda: do_change_enable_stream_sounds(self.user_profile, setting_value)) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_change_enable_desktop_notifications(self): # type: () -> None schema_checker = check_dict([ ('type', equals('update_global_notifications')), ('notification_name', equals('enable_desktop_notifications')), ('user', check_string), ('setting', check_bool), ]) do_change_enable_desktop_notifications(self.user_profile, False) for setting_value in [True, False]: events = self.do_test(lambda: do_change_enable_desktop_notifications(self.user_profile, setting_value)) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_change_enable_sounds(self): # type: () -> None schema_checker = check_dict([ ('type', equals('update_global_notifications')), ('notification_name', equals('enable_sounds')), ('user', check_string), ('setting', check_bool), ]) do_change_enable_sounds(self.user_profile, False) for setting_value in [True, False]: events = self.do_test(lambda: do_change_enable_sounds(self.user_profile, setting_value)) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_change_enable_offline_email_notifications(self): # type: () -> None schema_checker = check_dict([ ('type', equals('update_global_notifications')), ('notification_name', equals('enable_offline_email_notifications')), ('user', check_string), ('setting', check_bool), ]) do_change_enable_offline_email_notifications(self.user_profile, False) for setting_value in [True, False]: events = self.do_test(lambda: do_change_enable_offline_email_notifications(self.user_profile, setting_value)) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_change_enable_offline_push_notifications(self): # type: () -> None schema_checker = check_dict([ ('type', equals('update_global_notifications')), ('notification_name', equals('enable_offline_push_notifications')), ('user', check_string), ('setting', check_bool), ]) do_change_enable_offline_push_notifications(self.user_profile, False) for setting_value in [True, False]: events = self.do_test(lambda: do_change_enable_offline_push_notifications(self.user_profile, setting_value)) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_change_enable_online_push_notifications(self): # type: () -> None schema_checker = check_dict([ ('type', equals('update_global_notifications')), ('notification_name', equals('enable_online_push_notifications')), ('user', check_string), ('setting', check_bool), ]) do_change_enable_online_push_notifications(self.user_profile, False) for setting_value in [True, False]: events = self.do_test(lambda: do_change_enable_online_push_notifications(self.user_profile, setting_value)) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_change_pm_content_in_desktop_notifications(self): # type: () -> None schema_checker = check_dict([ ('type', equals('update_global_notifications')), ('notification_name', equals('pm_content_in_desktop_notifications')), ('user', check_string), ('setting', check_bool), ]) do_change_pm_content_in_desktop_notifications(self.user_profile, False) for setting_value in [True, False]: events = self.do_test( lambda: do_change_pm_content_in_desktop_notifications(self.user_profile, setting_value), state_change_expected=False, ) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_change_enable_digest_emails(self): # type: () -> None schema_checker = check_dict([ ('type', equals('update_global_notifications')), ('notification_name', equals('enable_digest_emails')), ('user', check_string), ('setting', check_bool), ]) do_change_enable_digest_emails(self.user_profile, False) for setting_value in [True, False]: events = self.do_test(lambda: do_change_enable_digest_emails(self.user_profile, setting_value)) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_realm_emoji_events(self): # type: () -> None schema_checker = check_dict([ ('type', equals('realm_emoji')), ('op', equals('update')), ('realm_emoji', check_dict([])), ]) events = self.do_test(lambda: check_add_realm_emoji(get_realm("zulip"), "my_emoji", "https://realm.com/my_emoji")) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) events = self.do_test(lambda: do_remove_realm_emoji(get_realm("zulip"), "my_emoji")) error = schema_checker('events[0]', events[0]) self.assert_on_error(error) def test_realm_filter_events(self): # type: () -> None schema_checker = check_dict([ ('type', equals('realm_filters')), ('realm_filters', check_list(None)), # TODO: validate tuples in the list ]) events = self.do_test(lambda: do_add_realm_filter(get_realm("zulip"), "#(?P