mirror of
https://github.com/zulip/zulip.git
synced 2025-11-12 18:06:44 +00:00
This commit adds the backend support for a new style of tutorial which allows for highlighting of multiple areas of the page with hotspots that disappear when clicked by the user.
1907 lines
80 KiB
Python
1907 lines
80 KiB
Python
# -*- coding: utf-8 -*-
|
|
# See http://zulip.readthedocs.io/en/latest/events-system.html for
|
|
# high-level documentation on how this system works.
|
|
from __future__ import absolute_import
|
|
from __future__ import print_function
|
|
from typing import Any, Callable, Dict, List, Optional
|
|
|
|
from django.conf import settings
|
|
from django.http import HttpRequest, HttpResponse
|
|
from django.test import TestCase
|
|
from django.utils import timezone
|
|
|
|
from zerver.models import (
|
|
get_client, get_realm, get_recipient, get_stream, get_user_profile_by_email,
|
|
Message, RealmAlias, Recipient, UserMessage, UserPresence, UserProfile
|
|
)
|
|
|
|
from zerver.lib.actions import (
|
|
bulk_add_subscriptions,
|
|
bulk_remove_subscriptions,
|
|
do_add_alert_words,
|
|
check_add_realm_emoji,
|
|
check_send_typing_notification,
|
|
do_add_realm_filter,
|
|
do_add_reaction,
|
|
do_remove_reaction,
|
|
do_change_avatar_fields,
|
|
do_change_default_language,
|
|
do_change_default_all_public_streams,
|
|
do_change_default_events_register_stream,
|
|
do_change_default_sending_stream,
|
|
do_change_full_name,
|
|
do_change_bot_owner,
|
|
do_change_is_admin,
|
|
do_change_stream_description,
|
|
do_change_subscription_property,
|
|
do_change_timezone,
|
|
do_create_user,
|
|
do_deactivate_stream,
|
|
do_deactivate_user,
|
|
do_mark_hotspot_as_read,
|
|
do_reactivate_user,
|
|
do_refer_friend,
|
|
do_regenerate_api_key,
|
|
do_remove_alert_words,
|
|
do_remove_realm_emoji,
|
|
do_remove_realm_filter,
|
|
do_rename_stream,
|
|
do_add_default_stream,
|
|
do_remove_default_stream,
|
|
do_set_muted_topics,
|
|
do_set_realm_property,
|
|
do_set_realm_authentication_methods,
|
|
do_set_realm_message_editing,
|
|
do_update_embedded_data,
|
|
do_update_message,
|
|
do_update_message_flags,
|
|
do_update_muted_topic,
|
|
do_update_pointer,
|
|
do_update_user_presence,
|
|
do_change_twenty_four_hour_time,
|
|
do_change_left_side_userlist,
|
|
do_change_emoji_alt_code,
|
|
do_change_enable_stream_desktop_notifications,
|
|
do_change_enable_stream_sounds,
|
|
do_change_enable_desktop_notifications,
|
|
do_change_enable_sounds,
|
|
do_change_enable_offline_email_notifications,
|
|
do_change_enable_offline_push_notifications,
|
|
do_change_enable_online_push_notifications,
|
|
do_change_pm_content_in_desktop_notifications,
|
|
do_change_enable_digest_emails,
|
|
do_add_realm_alias,
|
|
do_change_realm_alias,
|
|
do_remove_realm_alias,
|
|
do_change_icon_source,
|
|
)
|
|
from zerver.lib.events import (
|
|
apply_events,
|
|
fetch_initial_state_data,
|
|
)
|
|
from zerver.lib.message import render_markdown
|
|
from zerver.lib.test_helpers import POSTRequestMock, get_subscription
|
|
from zerver.lib.test_classes import (
|
|
ZulipTestCase,
|
|
)
|
|
from zerver.lib.validator import (
|
|
check_bool, check_dict, check_dict_only, check_float, check_int, check_list, check_string,
|
|
equals, check_none_or, Validator
|
|
)
|
|
|
|
from zerver.views.events_register import _default_all_public_streams, _default_narrow
|
|
|
|
from zerver.tornado.event_queue import allocate_client_descriptor, EventQueue
|
|
from zerver.tornado.views import get_events_backend
|
|
|
|
from collections import OrderedDict
|
|
import mock
|
|
import time
|
|
import ujson
|
|
from six.moves import range
|
|
|
|
class EventsEndpointTest(ZulipTestCase):
|
|
def test_events_register_endpoint(self):
|
|
# type: () -> None
|
|
|
|
# This test is intended to get minimal coverage on the
|
|
# events_register code paths
|
|
email = 'hamlet@zulip.com'
|
|
with mock.patch('zerver.views.events_register.do_events_register', return_value={}):
|
|
result = self.client_post('/json/register', **self.api_auth(email))
|
|
self.assert_json_success(result)
|
|
|
|
with mock.patch('zerver.lib.events.request_event_queue', return_value=None):
|
|
result = self.client_post('/json/register', **self.api_auth(email))
|
|
self.assert_json_error(result, "Could not allocate event queue")
|
|
|
|
with mock.patch('zerver.lib.events.request_event_queue', return_value='15:11'):
|
|
with mock.patch('zerver.lib.events.get_user_events',
|
|
return_value=[]):
|
|
result = self.client_post('/json/register', dict(event_types=ujson.dumps(['pointer'])),
|
|
**self.api_auth(email))
|
|
self.assert_json_success(result)
|
|
result_dict = ujson.loads(result.content)
|
|
self.assertEqual(result_dict['last_event_id'], -1)
|
|
self.assertEqual(result_dict['queue_id'], '15:11')
|
|
|
|
with mock.patch('zerver.lib.events.request_event_queue', return_value='15:12'):
|
|
with mock.patch('zerver.lib.events.get_user_events',
|
|
return_value=[{
|
|
'id': 6,
|
|
'type': 'pointer',
|
|
'pointer': 15,
|
|
}]):
|
|
result = self.client_post('/json/register', dict(event_types=ujson.dumps(['pointer'])),
|
|
**self.api_auth(email))
|
|
self.assert_json_success(result)
|
|
result_dict = ujson.loads(result.content)
|
|
self.assertEqual(result_dict['last_event_id'], 6)
|
|
self.assertEqual(result_dict['pointer'], 15)
|
|
self.assertEqual(result_dict['queue_id'], '15:12')
|
|
|
|
def test_tornado_endpoint(self):
|
|
# type: () -> None
|
|
|
|
# This test is mostly intended to get minimal coverage on
|
|
# the /notify_tornado endpoint, so we can have 100% URL coverage,
|
|
# but it does exercise a little bit of the codepath.
|
|
post_data = dict(
|
|
data=ujson.dumps(
|
|
dict(
|
|
event=dict(
|
|
type='other'
|
|
),
|
|
users=[get_user_profile_by_email('hamlet@zulip.com').id],
|
|
),
|
|
),
|
|
)
|
|
req = POSTRequestMock(post_data, user_profile=None)
|
|
req.META['REMOTE_ADDR'] = '127.0.0.1'
|
|
result = self.client_post_request('/notify_tornado', req)
|
|
self.assert_json_error(result, 'Access denied', status_code=403)
|
|
|
|
post_data['secret'] = settings.SHARED_SECRET
|
|
req = POSTRequestMock(post_data, user_profile=None)
|
|
req.META['REMOTE_ADDR'] = '127.0.0.1'
|
|
result = self.client_post_request('/notify_tornado', req)
|
|
self.assert_json_success(result)
|
|
|
|
class GetEventsTest(ZulipTestCase):
|
|
def tornado_call(self, view_func, user_profile, post_data):
|
|
# type: (Callable[[HttpRequest, UserProfile], HttpResponse], UserProfile, Dict[str, Any]) -> HttpResponse
|
|
request = POSTRequestMock(post_data, user_profile)
|
|
return view_func(request, user_profile)
|
|
|
|
def test_get_events(self):
|
|
# type: () -> None
|
|
email = "hamlet@zulip.com"
|
|
recipient_email = "othello@zulip.com"
|
|
user_profile = get_user_profile_by_email(email)
|
|
recipient_user_profile = get_user_profile_by_email(recipient_email)
|
|
self.login(email)
|
|
|
|
result = self.tornado_call(get_events_backend, user_profile,
|
|
{"apply_markdown": ujson.dumps(True),
|
|
"event_types": ujson.dumps(["message"]),
|
|
"user_client": "website",
|
|
"dont_block": ujson.dumps(True),
|
|
})
|
|
self.assert_json_success(result)
|
|
queue_id = ujson.loads(result.content)["queue_id"]
|
|
|
|
recipient_result = self.tornado_call(get_events_backend, recipient_user_profile,
|
|
{"apply_markdown": ujson.dumps(True),
|
|
"event_types": ujson.dumps(["message"]),
|
|
"user_client": "website",
|
|
"dont_block": ujson.dumps(True),
|
|
})
|
|
self.assert_json_success(recipient_result)
|
|
recipient_queue_id = ujson.loads(recipient_result.content)["queue_id"]
|
|
|
|
result = self.tornado_call(get_events_backend, user_profile,
|
|
{"queue_id": queue_id,
|
|
"user_client": "website",
|
|
"last_event_id": -1,
|
|
"dont_block": ujson.dumps(True),
|
|
})
|
|
events = ujson.loads(result.content)["events"]
|
|
self.assert_json_success(result)
|
|
self.assert_length(events, 0)
|
|
|
|
local_id = 10.01
|
|
self.send_message(email, recipient_email, Recipient.PERSONAL, "hello", local_id=local_id, sender_queue_id=queue_id)
|
|
|
|
result = self.tornado_call(get_events_backend, user_profile,
|
|
{"queue_id": queue_id,
|
|
"user_client": "website",
|
|
"last_event_id": -1,
|
|
"dont_block": ujson.dumps(True),
|
|
})
|
|
events = ujson.loads(result.content)["events"]
|
|
self.assert_json_success(result)
|
|
self.assert_length(events, 1)
|
|
self.assertEqual(events[0]["type"], "message")
|
|
self.assertEqual(events[0]["message"]["sender_email"], email)
|
|
self.assertEqual(events[0]["local_message_id"], local_id)
|
|
self.assertEqual(events[0]["message"]["display_recipient"][0]["is_mirror_dummy"], False)
|
|
self.assertEqual(events[0]["message"]["display_recipient"][1]["is_mirror_dummy"], False)
|
|
|
|
last_event_id = events[0]["id"]
|
|
local_id += 0.01
|
|
|
|
self.send_message(email, recipient_email, Recipient.PERSONAL, "hello", local_id=local_id, sender_queue_id=queue_id)
|
|
|
|
result = self.tornado_call(get_events_backend, user_profile,
|
|
{"queue_id": queue_id,
|
|
"user_client": "website",
|
|
"last_event_id": last_event_id,
|
|
"dont_block": ujson.dumps(True),
|
|
})
|
|
events = ujson.loads(result.content)["events"]
|
|
self.assert_json_success(result)
|
|
self.assert_length(events, 1)
|
|
self.assertEqual(events[0]["type"], "message")
|
|
self.assertEqual(events[0]["message"]["sender_email"], email)
|
|
self.assertEqual(events[0]["local_message_id"], local_id)
|
|
|
|
# Test that the received message in the receiver's event queue
|
|
# exists and does not contain a local id
|
|
recipient_result = self.tornado_call(get_events_backend, recipient_user_profile,
|
|
{"queue_id": recipient_queue_id,
|
|
"user_client": "website",
|
|
"last_event_id": -1,
|
|
"dont_block": ujson.dumps(True),
|
|
})
|
|
recipient_events = ujson.loads(recipient_result.content)["events"]
|
|
self.assert_json_success(recipient_result)
|
|
self.assertEqual(len(recipient_events), 2)
|
|
self.assertEqual(recipient_events[0]["type"], "message")
|
|
self.assertEqual(recipient_events[0]["message"]["sender_email"], email)
|
|
self.assertTrue("local_message_id" not in recipient_events[0])
|
|
self.assertEqual(recipient_events[1]["type"], "message")
|
|
self.assertEqual(recipient_events[1]["message"]["sender_email"], email)
|
|
self.assertTrue("local_message_id" not in recipient_events[1])
|
|
|
|
def test_get_events_narrow(self):
|
|
# type: () -> None
|
|
email = "hamlet@zulip.com"
|
|
user_profile = get_user_profile_by_email(email)
|
|
self.login(email)
|
|
|
|
result = self.tornado_call(get_events_backend, user_profile,
|
|
{"apply_markdown": ujson.dumps(True),
|
|
"event_types": ujson.dumps(["message"]),
|
|
"narrow": ujson.dumps([["stream", "denmark"]]),
|
|
"user_client": "website",
|
|
"dont_block": ujson.dumps(True),
|
|
})
|
|
self.assert_json_success(result)
|
|
queue_id = ujson.loads(result.content)["queue_id"]
|
|
|
|
result = self.tornado_call(get_events_backend, user_profile,
|
|
{"queue_id": queue_id,
|
|
"user_client": "website",
|
|
"last_event_id": -1,
|
|
"dont_block": ujson.dumps(True),
|
|
})
|
|
events = ujson.loads(result.content)["events"]
|
|
self.assert_json_success(result)
|
|
self.assert_length(events, 0)
|
|
|
|
self.send_message(email, "othello@zulip.com", Recipient.PERSONAL, "hello")
|
|
self.send_message(email, "Denmark", Recipient.STREAM, "hello")
|
|
|
|
result = self.tornado_call(get_events_backend, user_profile,
|
|
{"queue_id": queue_id,
|
|
"user_client": "website",
|
|
"last_event_id": -1,
|
|
"dont_block": ujson.dumps(True),
|
|
})
|
|
events = ujson.loads(result.content)["events"]
|
|
self.assert_json_success(result)
|
|
self.assert_length(events, 1)
|
|
self.assertEqual(events[0]["type"], "message")
|
|
self.assertEqual(events[0]["message"]["display_recipient"], "Denmark")
|
|
|
|
class EventsRegisterTest(ZulipTestCase):
|
|
user_profile = get_user_profile_by_email("hamlet@zulip.com")
|
|
maxDiff = None # type: Optional[int]
|
|
|
|
def create_bot(self, email):
|
|
# type: (str) -> UserProfile
|
|
return do_create_user(email, '123',
|
|
get_realm('zulip'), 'Test Bot', 'test',
|
|
bot_type=UserProfile.DEFAULT_BOT, bot_owner=self.user_profile)
|
|
|
|
def realm_bot_schema(self, field_name, check):
|
|
# type: (str, Validator) -> Validator
|
|
return check_dict_only([
|
|
('id', check_int),
|
|
('type', equals('realm_bot')),
|
|
('op', equals('update')),
|
|
('bot', check_dict_only([
|
|
('email', check_string),
|
|
('user_id', check_int),
|
|
(field_name, check),
|
|
])),
|
|
])
|
|
|
|
def do_test(self, action, event_types=None, include_subscribers=True, state_change_expected=True,
|
|
num_events=1):
|
|
# type: (Callable[[], Any], Optional[List[str]], bool, bool, int) -> List[Dict[str, Any]]
|
|
client = allocate_client_descriptor(
|
|
dict(user_profile_id = self.user_profile.id,
|
|
user_profile_email = self.user_profile.email,
|
|
realm_id = self.user_profile.realm_id,
|
|
event_types = event_types,
|
|
client_type_name = "website",
|
|
apply_markdown = True,
|
|
all_public_streams = False,
|
|
queue_timeout = 600,
|
|
last_connection_time = time.time(),
|
|
narrow = [])
|
|
)
|
|
# hybrid_state = initial fetch state + re-applying events triggered by our action
|
|
# normal_state = do action then fetch at the end (the "normal" code path)
|
|
hybrid_state = fetch_initial_state_data(self.user_profile, event_types, "", include_subscribers=include_subscribers)
|
|
action()
|
|
events = client.event_queue.contents()
|
|
self.assertTrue(len(events) == num_events)
|
|
|
|
before = ujson.dumps(hybrid_state)
|
|
apply_events(hybrid_state, events, self.user_profile, include_subscribers=include_subscribers)
|
|
after = ujson.dumps(hybrid_state)
|
|
|
|
if state_change_expected:
|
|
if before == after:
|
|
print(events) # nocoverage
|
|
raise AssertionError('Test does not exercise enough code -- events do not change state.')
|
|
else:
|
|
if before != after:
|
|
raise AssertionError('Test is invalid--state actually does change here.')
|
|
|
|
normal_state = fetch_initial_state_data(self.user_profile, event_types, "", include_subscribers=include_subscribers)
|
|
self.match_states(hybrid_state, normal_state)
|
|
return events
|
|
|
|
def assert_on_error(self, error):
|
|
# type: (Optional[str]) -> None
|
|
if error:
|
|
raise AssertionError(error)
|
|
|
|
def match_states(self, state1, state2):
|
|
# type: (Dict[str, Any], Dict[str, Any]) -> None
|
|
def normalize(state):
|
|
# type: (Dict[str, Any]) -> None
|
|
state['realm_users'] = {u['email']: u for u in state['realm_users']}
|
|
for u in state['subscriptions']:
|
|
if 'subscribers' in u:
|
|
u['subscribers'].sort()
|
|
state['subscriptions'] = {u['name']: u for u in state['subscriptions']}
|
|
state['unsubscribed'] = {u['name']: u for u in state['unsubscribed']}
|
|
if 'realm_bots' in state:
|
|
state['realm_bots'] = {u['email']: u for u in state['realm_bots']}
|
|
normalize(state1)
|
|
normalize(state2)
|
|
self.assertEqual(state1, state2)
|
|
|
|
def test_send_message_events(self):
|
|
# type: () -> None
|
|
schema_checker = check_dict([
|
|
('type', equals('message')),
|
|
('flags', check_list(None)),
|
|
('message', check_dict([
|
|
('avatar_url', check_string),
|
|
('client', check_string),
|
|
('content', check_string),
|
|
('content_type', equals('text/html')),
|
|
('display_recipient', check_string),
|
|
('gravatar_hash', check_string),
|
|
('id', check_int),
|
|
('recipient_id', check_int),
|
|
('sender_realm_str', check_string),
|
|
('sender_email', check_string),
|
|
('sender_full_name', check_string),
|
|
('sender_id', check_int),
|
|
('sender_short_name', check_string),
|
|
('subject', check_string),
|
|
('subject_links', check_list(None)),
|
|
('timestamp', check_int),
|
|
('type', check_string),
|
|
])),
|
|
])
|
|
events = self.do_test(
|
|
lambda: self.send_message("hamlet@zulip.com", "Verona", Recipient.STREAM, "hello"),
|
|
)
|
|
error = schema_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
# Verify message editing
|
|
schema_checker = check_dict([
|
|
('type', equals('update_message')),
|
|
('flags', check_list(None)),
|
|
('content', check_string),
|
|
('edit_timestamp', check_int),
|
|
('flags', check_list(None)),
|
|
('message_id', check_int),
|
|
('message_ids', check_list(check_int)),
|
|
('orig_content', check_string),
|
|
('orig_rendered_content', check_string),
|
|
('orig_subject', check_string),
|
|
('propagate_mode', check_string),
|
|
('rendered_content', check_string),
|
|
('sender', check_string),
|
|
('stream_id', check_int),
|
|
('subject', check_string),
|
|
('subject_links', check_list(None)),
|
|
('user_id', check_int),
|
|
# There is also a timestamp field in the event, but we ignore it, as
|
|
# it's kind of an unwanted but harmless side effect of calling log_event.
|
|
])
|
|
|
|
message = Message.objects.order_by('-id')[0]
|
|
topic = 'new_topic'
|
|
propagate_mode = 'change_all'
|
|
content = 'new content'
|
|
rendered_content = render_markdown(message, content)
|
|
events = self.do_test(
|
|
lambda: do_update_message(self.user_profile, message, topic,
|
|
propagate_mode, content, rendered_content),
|
|
state_change_expected=False,
|
|
)
|
|
error = schema_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
# Verify do_update_embedded_data
|
|
schema_checker = check_dict([
|
|
('type', equals('update_message')),
|
|
('flags', check_list(None)),
|
|
('content', check_string),
|
|
('flags', check_list(None)),
|
|
('message_id', check_int),
|
|
('message_ids', check_list(check_int)),
|
|
('rendered_content', check_string),
|
|
('sender', check_string),
|
|
])
|
|
|
|
events = self.do_test(
|
|
lambda: do_update_embedded_data(self.user_profile, message,
|
|
u"embed_content", "<p>embed_content</p>"),
|
|
state_change_expected=False,
|
|
)
|
|
error = schema_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
def test_update_message_flags(self):
|
|
# type: () -> None
|
|
# Test message flag update events
|
|
schema_checker = check_dict([
|
|
('id', check_int),
|
|
('type', equals('update_message_flags')),
|
|
('flag', check_string),
|
|
('messages', check_list(check_int)),
|
|
('operation', equals("add")),
|
|
])
|
|
|
|
message = self.send_message("cordelia@zulip.com", "hamlet@zulip.com", Recipient.PERSONAL, "hello")
|
|
user_profile = get_user_profile_by_email("hamlet@zulip.com")
|
|
events = self.do_test(
|
|
lambda: do_update_message_flags(user_profile, 'add', 'starred',
|
|
[message], False, None, None),
|
|
state_change_expected=False,
|
|
)
|
|
error = schema_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
schema_checker = check_dict([
|
|
('id', check_int),
|
|
('type', equals('update_message_flags')),
|
|
('flag', check_string),
|
|
('messages', check_list(check_int)),
|
|
('operation', equals("remove")),
|
|
])
|
|
events = self.do_test(
|
|
lambda: do_update_message_flags(user_profile, 'remove', 'starred',
|
|
[message], False, None, None),
|
|
state_change_expected=False,
|
|
)
|
|
error = schema_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
def test_send_reaction(self):
|
|
# type: () -> None
|
|
schema_checker = check_dict([
|
|
('type', equals('reaction')),
|
|
('op', equals('add')),
|
|
('message_id', check_int),
|
|
('emoji_name', check_string),
|
|
('user', check_dict([
|
|
('email', check_string),
|
|
('full_name', check_string),
|
|
('user_id', check_int)
|
|
])),
|
|
])
|
|
|
|
message_id = self.send_message("hamlet@zulip.com", "Verona", Recipient.STREAM, "hello")
|
|
message = Message.objects.get(id=message_id)
|
|
events = self.do_test(
|
|
lambda: do_add_reaction(
|
|
self.user_profile, message, "tada"),
|
|
state_change_expected=False,
|
|
)
|
|
error = schema_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
def test_remove_reaction(self):
|
|
# type: () -> None
|
|
schema_checker = check_dict([
|
|
('type', equals('reaction')),
|
|
('op', equals('remove')),
|
|
('message_id', check_int),
|
|
('emoji_name', check_string),
|
|
('user', check_dict([
|
|
('email', check_string),
|
|
('full_name', check_string),
|
|
('user_id', check_int)
|
|
])),
|
|
])
|
|
|
|
message_id = self.send_message("hamlet@zulip.com", "Verona", Recipient.STREAM, "hello")
|
|
message = Message.objects.get(id=message_id)
|
|
events = self.do_test(
|
|
lambda: do_remove_reaction(
|
|
self.user_profile, message, "tada"),
|
|
state_change_expected=False,
|
|
)
|
|
error = schema_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
def test_typing_events(self):
|
|
# type: () -> None
|
|
schema_checker = check_dict([
|
|
('type', equals('typing')),
|
|
('op', equals('start')),
|
|
('sender', check_dict([
|
|
('email', check_string),
|
|
('user_id', check_int)])),
|
|
('recipients', check_list(check_dict([
|
|
('email', check_string),
|
|
('user_id', check_int),
|
|
]))),
|
|
])
|
|
|
|
events = self.do_test(
|
|
lambda: check_send_typing_notification(
|
|
self.user_profile, ["cordelia@zulip.com"], "start"),
|
|
state_change_expected=False,
|
|
)
|
|
error = schema_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
def test_presence_events(self):
|
|
# type: () -> None
|
|
schema_checker = check_dict([
|
|
('type', equals('pointer')),
|
|
('email', check_string),
|
|
('timestamp', check_float),
|
|
('presence', check_dict([
|
|
# TODO: Add more here once the test below works
|
|
])),
|
|
])
|
|
# BUG: Marked as failing for now because this is a failing
|
|
# test, due to the `aggregated` feature not being supported by
|
|
# our events code.
|
|
|
|
with self.assertRaises(AssertionError):
|
|
events = self.do_test(lambda: do_update_user_presence(
|
|
self.user_profile, get_client("website"), timezone.now(), UserPresence.ACTIVE))
|
|
# Marked as nocoverage since unreachable
|
|
error = schema_checker('events[0]', events[0]) # nocoverage
|
|
self.assert_on_error(error) # nocoverage
|
|
|
|
def test_pointer_events(self):
|
|
# type: () -> None
|
|
schema_checker = check_dict([
|
|
('type', equals('pointer')),
|
|
('pointer', check_int)
|
|
])
|
|
events = self.do_test(lambda: do_update_pointer(self.user_profile, 1500))
|
|
error = schema_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
def test_referral_events(self):
|
|
# type: () -> None
|
|
schema_checker = check_dict([
|
|
('type', equals('referral')),
|
|
('referrals', check_dict([
|
|
('granted', check_int),
|
|
('used', check_int),
|
|
])),
|
|
])
|
|
events = self.do_test(lambda: do_refer_friend(self.user_profile, "friend@example.com"))
|
|
error = schema_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
def test_register_events(self):
|
|
# type: () -> None
|
|
realm_user_add_checker = check_dict([
|
|
('type', equals('realm_user')),
|
|
('op', equals('add')),
|
|
('person', check_dict([
|
|
('email', check_string),
|
|
('full_name', check_string),
|
|
('is_admin', check_bool),
|
|
('is_bot', check_bool),
|
|
])),
|
|
])
|
|
|
|
events = self.do_test(lambda: self.register("test1@zulip.com", "test1"))
|
|
self.assert_length(events, 1)
|
|
error = realm_user_add_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
def test_alert_words_events(self):
|
|
# type: () -> None
|
|
alert_words_checker = check_dict([
|
|
('type', equals('alert_words')),
|
|
('alert_words', check_list(check_string)),
|
|
])
|
|
|
|
events = self.do_test(lambda: do_add_alert_words(self.user_profile, ["alert_word"]))
|
|
error = alert_words_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
events = self.do_test(lambda: do_remove_alert_words(self.user_profile, ["alert_word"]))
|
|
error = alert_words_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
def test_default_streams_events(self):
|
|
# type: () -> None
|
|
default_streams_checker = check_dict([
|
|
('type', equals('default_streams')),
|
|
('default_streams', check_list(check_dict([
|
|
('description', check_string),
|
|
('invite_only', check_bool),
|
|
('name', check_string),
|
|
('stream_id', check_int),
|
|
]))),
|
|
])
|
|
|
|
stream = get_stream("Scotland", self.user_profile.realm)
|
|
events = self.do_test(lambda: do_add_default_stream(stream))
|
|
error = default_streams_checker('events[0]', events[0])
|
|
events = self.do_test(lambda: do_remove_default_stream(stream))
|
|
error = default_streams_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
def test_muted_topics_events(self):
|
|
# type: () -> None
|
|
muted_topics_checker = check_dict([
|
|
('type', equals('muted_topics')),
|
|
('muted_topics', check_list(check_list(check_string, 2))),
|
|
])
|
|
events = self.do_test(lambda: do_set_muted_topics(self.user_profile, [[u"Denmark", u"topic"]]))
|
|
error = muted_topics_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
events = self.do_test(lambda: do_update_muted_topic(
|
|
self.user_profile, "Denmark", "topic", "add"))
|
|
error = muted_topics_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
events = self.do_test(lambda: do_update_muted_topic(
|
|
self.user_profile, "Denmark", "topic", "remove"))
|
|
error = muted_topics_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
def test_change_avatar_fields(self):
|
|
# type: () -> None
|
|
schema_checker = check_dict([
|
|
('type', equals('realm_user')),
|
|
('op', equals('update')),
|
|
('person', check_dict([
|
|
('email', check_string),
|
|
('user_id', check_int),
|
|
('avatar_url', check_string),
|
|
])),
|
|
])
|
|
events = self.do_test(
|
|
lambda: do_change_avatar_fields(self.user_profile, UserProfile.AVATAR_FROM_USER),
|
|
)
|
|
error = schema_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
def test_change_full_name(self):
|
|
# type: () -> None
|
|
schema_checker = check_dict([
|
|
('type', equals('realm_user')),
|
|
('op', equals('update')),
|
|
('person', check_dict([
|
|
('email', check_string),
|
|
('full_name', check_string),
|
|
])),
|
|
])
|
|
events = self.do_test(lambda: do_change_full_name(self.user_profile, 'Sir Hamlet'))
|
|
error = schema_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
def test_change_realm_name(self):
|
|
# type: () -> None
|
|
schema_checker = check_dict([
|
|
('type', equals('realm')),
|
|
('op', equals('update')),
|
|
('property', equals('name')),
|
|
('value', check_string),
|
|
])
|
|
events = self.do_test(
|
|
lambda: do_set_realm_property(self.user_profile.realm, 'name', u'New Realm Name'))
|
|
error = schema_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
def test_change_realm_description(self):
|
|
# type: () -> None
|
|
schema_checker = check_dict([
|
|
('type', equals('realm')),
|
|
('op', equals('update')),
|
|
('property', equals('description')),
|
|
('value', check_string),
|
|
])
|
|
events = self.do_test(
|
|
lambda: do_set_realm_property(self.user_profile.realm, 'description', u'New Realm Description'))
|
|
error = schema_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
def test_change_realm_waiting_period_threshold(self):
|
|
# type: () -> None
|
|
schema_checker = check_dict([
|
|
('type', equals('realm')),
|
|
('op', equals('update')),
|
|
('property', equals('waiting_period_threshold')),
|
|
('value', check_int),
|
|
])
|
|
events = self.do_test(
|
|
lambda: do_set_realm_property(self.user_profile.realm, 'waiting_period_threshold', 17))
|
|
error = schema_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
def test_change_message_retention_days(self):
|
|
# type: () -> None
|
|
schema_checker = check_dict([
|
|
('type', equals('realm')),
|
|
('op', equals('update')),
|
|
('property', equals('message_retention_days')),
|
|
('value', check_int),
|
|
])
|
|
events = self.do_test(
|
|
lambda: do_set_realm_property(self.user_profile.realm, "message_retention_days", 30))
|
|
error = schema_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
def test_change_realm_add_emoji_by_admins_only(self):
|
|
# type: () -> None
|
|
schema_checker = check_dict([
|
|
('type', equals('realm')),
|
|
('op', equals('update')),
|
|
('property', equals('add_emoji_by_admins_only')),
|
|
('value', check_bool),
|
|
])
|
|
events = self.do_test(
|
|
lambda: do_set_realm_property(self.user_profile.realm, 'add_emoji_by_admins_only', True))
|
|
error = schema_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
def test_change_realm_restricted_to_domain(self):
|
|
# type: () -> None
|
|
schema_checker = check_dict([
|
|
('type', equals('realm')),
|
|
('op', equals('update')),
|
|
('property', equals('restricted_to_domain')),
|
|
('value', check_bool),
|
|
])
|
|
do_set_realm_property(self.user_profile.realm, 'restricted_to_domain', True)
|
|
for restricted_to_domain in (False, True):
|
|
events = self.do_test(
|
|
lambda: do_set_realm_property(self.user_profile.realm, 'restricted_to_domain', restricted_to_domain))
|
|
error = schema_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
def test_change_realm_invite_required(self):
|
|
# type: () -> None
|
|
schema_checker = check_dict([
|
|
('type', equals('realm')),
|
|
('op', equals('update')),
|
|
('property', equals('invite_required')),
|
|
('value', check_bool),
|
|
])
|
|
invite_required = False
|
|
do_set_realm_property(self.user_profile.realm, 'invite_required', invite_required)
|
|
for invite_required in (True, False):
|
|
events = self.do_test(
|
|
lambda: do_set_realm_property(self.user_profile.realm, 'invite_required', invite_required))
|
|
error = schema_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
def test_change_realm_name_changes_disabled(self):
|
|
# type: () -> None
|
|
schema_checker = check_dict([
|
|
('type', equals('realm')),
|
|
('op', equals('update')),
|
|
('property', equals('name_changes_disabled')),
|
|
('value', check_bool),
|
|
])
|
|
do_set_realm_property(self.user_profile.realm, 'name_changes_disabled', True)
|
|
for name_changes_disabled in (False, True):
|
|
events = self.do_test(
|
|
lambda: do_set_realm_property(self.user_profile.realm, 'name_changes_disabled', name_changes_disabled))
|
|
error = schema_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
def test_change_realm_email_changes_disabled(self):
|
|
# type: () -> None
|
|
schema_checker = check_dict([
|
|
('type', equals('realm')),
|
|
('op', equals('update')),
|
|
('property', equals('email_changes_disabled')),
|
|
('value', check_bool),
|
|
])
|
|
do_set_realm_property(self.user_profile.realm, 'email_changes_disabled', True)
|
|
for email_changes_disabled in (False, True):
|
|
events = self.do_test(lambda: do_set_realm_property(self.user_profile.realm, 'email_changes_disabled', email_changes_disabled))
|
|
error = schema_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
def test_change_realm_authentication_methods(self):
|
|
# type: () -> None
|
|
schema_checker = check_dict([
|
|
('type', equals('realm')),
|
|
('op', equals('update_dict')),
|
|
('property', equals('default')),
|
|
('data', check_dict([])),
|
|
])
|
|
|
|
def fake_backends():
|
|
# type: () -> Any
|
|
backends = (
|
|
'zproject.backends.DevAuthBackend',
|
|
'zproject.backends.EmailAuthBackend',
|
|
'zproject.backends.GitHubAuthBackend',
|
|
'zproject.backends.GoogleMobileOauth2Backend',
|
|
'zproject.backends.ZulipLDAPAuthBackend',
|
|
)
|
|
return self.settings(AUTHENTICATION_BACKENDS=backends)
|
|
|
|
# Test transitions; any new backends should be tested with T/T/T/F/T
|
|
for (auth_method_dict) in \
|
|
({'Google': True, 'Email': True, 'GitHub': True, 'LDAP': False, 'Dev': False},
|
|
{'Google': True, 'Email': True, 'GitHub': False, 'LDAP': False, 'Dev': False},
|
|
{'Google': True, 'Email': False, 'GitHub': False, 'LDAP': False, 'Dev': False},
|
|
{'Google': True, 'Email': False, 'GitHub': True, 'LDAP': False, 'Dev': False},
|
|
{'Google': False, 'Email': False, 'GitHub': False, 'LDAP': False, 'Dev': True},
|
|
{'Google': False, 'Email': False, 'GitHub': True, 'LDAP': False, 'Dev': True},
|
|
{'Google': False, 'Email': True, 'GitHub': True, 'LDAP': True, 'Dev': False}):
|
|
with fake_backends():
|
|
events = self.do_test(
|
|
lambda: do_set_realm_authentication_methods(
|
|
self.user_profile.realm,
|
|
auth_method_dict))
|
|
|
|
error = schema_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
def test_change_realm_invite_by_admins_only(self):
|
|
# type: () -> None
|
|
schema_checker = check_dict([
|
|
('type', equals('realm')),
|
|
('op', equals('update')),
|
|
('property', equals('invite_by_admins_only')),
|
|
('value', check_bool),
|
|
])
|
|
invite_by_admins_only = False
|
|
do_set_realm_property(self.user_profile.realm, 'invite_by_admins_only', invite_by_admins_only)
|
|
for invite_by_admins_only in (True, False):
|
|
events = self.do_test(
|
|
lambda: do_set_realm_property(self.user_profile.realm, 'invite_by_admins_only', invite_by_admins_only))
|
|
error = schema_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
def test_change_realm_inline_image_preview(self):
|
|
# type: () -> None
|
|
schema_checker = check_dict([
|
|
('type', equals('realm')),
|
|
('op', equals('update')),
|
|
('property', equals('inline_image_preview')),
|
|
('value', check_bool),
|
|
])
|
|
inline_image_preview = False
|
|
do_set_realm_property(self.user_profile.realm, 'inline_image_preview', inline_image_preview)
|
|
for inline_image_preview in (True, False):
|
|
events = self.do_test(
|
|
lambda: do_set_realm_property(self.user_profile.realm, 'inline_image_preview', inline_image_preview))
|
|
error = schema_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
def test_change_realm_inline_url_embed_preview(self):
|
|
# type: () -> None
|
|
schema_checker = check_dict([
|
|
('type', equals('realm')),
|
|
('op', equals('update')),
|
|
('property', equals('inline_url_embed_preview')),
|
|
('value', check_bool),
|
|
])
|
|
inline_url_embed_preview = False
|
|
do_set_realm_property(self.user_profile.realm, 'inline_url_embed_preview', inline_url_embed_preview)
|
|
for inline_url_embed_preview in (True, False):
|
|
events = self.do_test(
|
|
lambda: do_set_realm_property(self.user_profile.realm, 'inline_url_embed_preview', inline_url_embed_preview))
|
|
error = schema_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
def test_change_realm_default_language(self):
|
|
# type: () -> None
|
|
schema_checker = check_dict([
|
|
('type', equals('realm')),
|
|
('op', equals('update')),
|
|
('property', equals('default_language')),
|
|
('value', check_string),
|
|
])
|
|
events = self.do_test(
|
|
lambda: do_set_realm_property(self.user_profile.realm, 'default_language', u'de'))
|
|
error = schema_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
def test_change_realm_create_stream_by_admins_only(self):
|
|
# type: () -> None
|
|
schema_checker = check_dict([
|
|
('type', equals('realm')),
|
|
('op', equals('update')),
|
|
('property', equals('create_stream_by_admins_only')),
|
|
('value', check_bool),
|
|
])
|
|
do_set_realm_property(self.user_profile.realm, 'create_stream_by_admins_only', False)
|
|
|
|
for create_stream_by_admins_only in (True, False):
|
|
events = self.do_test(
|
|
lambda: do_set_realm_property(
|
|
self.user_profile.realm,
|
|
'create_stream_by_admins_only',
|
|
create_stream_by_admins_only))
|
|
|
|
error = schema_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
def test_change_pin_stream(self):
|
|
# type: () -> None
|
|
schema_checker = check_dict([
|
|
('type', equals('subscription')),
|
|
('op', equals('update')),
|
|
('property', equals('pin_to_top')),
|
|
('stream_id', check_int),
|
|
('value', check_bool),
|
|
])
|
|
stream = get_stream("Denmark", self.user_profile.realm)
|
|
sub = get_subscription(stream.name, self.user_profile)
|
|
do_change_subscription_property(self.user_profile, sub, stream, "pin_to_top", False)
|
|
for pinned in (True, False):
|
|
events = self.do_test(lambda: do_change_subscription_property(self.user_profile, sub, stream, "pin_to_top", pinned))
|
|
error = schema_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
def test_change_realm_message_edit_settings(self):
|
|
# type: () -> None
|
|
schema_checker = check_dict([
|
|
('type', equals('realm')),
|
|
('op', equals('update_dict')),
|
|
('property', equals('default')),
|
|
('data', check_dict([('allow_message_editing', check_bool),
|
|
('message_content_edit_limit_seconds', check_int)])),
|
|
])
|
|
# Test every transition among the four possibilities {T,F} x {0, non-0}
|
|
for (allow_message_editing, message_content_edit_limit_seconds) in \
|
|
((True, 0), (False, 0), (True, 0), (False, 1234), (True, 0), (True, 1234), (True, 0),
|
|
(False, 0), (False, 1234), (False, 0), (True, 1234), (False, 0),
|
|
(True, 1234), (True, 600), (False, 600), (False, 1234), (True, 600)):
|
|
events = self.do_test(
|
|
lambda: do_set_realm_message_editing(self.user_profile.realm,
|
|
allow_message_editing,
|
|
message_content_edit_limit_seconds))
|
|
error = schema_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
def test_change_is_admin(self):
|
|
# type: () -> None
|
|
schema_checker = check_dict([
|
|
('type', equals('realm_user')),
|
|
('op', equals('update')),
|
|
('person', check_dict([
|
|
('email', check_string),
|
|
('is_admin', check_bool),
|
|
])),
|
|
])
|
|
do_change_is_admin(self.user_profile, False)
|
|
for is_admin in [True, False]:
|
|
events = self.do_test(lambda: do_change_is_admin(self.user_profile, is_admin))
|
|
error = schema_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
def test_change_twenty_four_hour_time(self):
|
|
# type: () -> None
|
|
schema_checker = check_dict([
|
|
('type', equals('update_display_settings')),
|
|
('setting_name', equals('twenty_four_hour_time')),
|
|
('user', check_string),
|
|
('setting', check_bool),
|
|
])
|
|
do_change_twenty_four_hour_time(self.user_profile, False)
|
|
for setting_value in [True, False]:
|
|
events = self.do_test(lambda: do_change_twenty_four_hour_time(self.user_profile, setting_value))
|
|
error = schema_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
def test_change_left_side_userlist(self):
|
|
# type: () -> None
|
|
schema_checker = check_dict([
|
|
('type', equals('update_display_settings')),
|
|
('setting_name', equals('left_side_userlist')),
|
|
('user', check_string),
|
|
('setting', check_bool),
|
|
])
|
|
do_change_left_side_userlist(self.user_profile, False)
|
|
for setting_value in [True, False]:
|
|
events = self.do_test(lambda: do_change_left_side_userlist(self.user_profile, setting_value))
|
|
error = schema_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
def test_change_emoji_alt_code(self):
|
|
# type: () -> None
|
|
schema_checker = check_dict([
|
|
('type', equals('update_display_settings')),
|
|
('setting_name', equals('emoji_alt_code')),
|
|
('user', check_string),
|
|
('setting', check_bool),
|
|
])
|
|
do_change_emoji_alt_code(self.user_profile, False)
|
|
for setting_value in [True, False]:
|
|
events = self.do_test(lambda: do_change_emoji_alt_code(self.user_profile, setting_value))
|
|
error = schema_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
def test_change_default_language(self):
|
|
# type: () -> None
|
|
schema_checker = check_dict([
|
|
('type', equals('update_display_settings')),
|
|
('setting_name', equals('default_language')),
|
|
('user', check_string),
|
|
('setting', check_string),
|
|
])
|
|
for setting_value in ['de', 'es', 'en']:
|
|
events = self.do_test(lambda: do_change_default_language(self.user_profile, setting_value))
|
|
error = schema_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
def test_change_timezone(self):
|
|
# type: () -> None
|
|
schema_checker = check_dict([
|
|
('type', equals('update_display_settings')),
|
|
('setting_name', equals('timezone')),
|
|
('user', check_string),
|
|
('setting', check_string),
|
|
])
|
|
for setting_value in ['US/Mountain', 'US/Samoa', 'Pacific/Galapagos', '']:
|
|
events = self.do_test(lambda: do_change_timezone(self.user_profile, setting_value))
|
|
error = schema_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
def test_change_enable_stream_desktop_notifications(self):
|
|
# type: () -> None
|
|
schema_checker = check_dict([
|
|
('type', equals('update_global_notifications')),
|
|
('notification_name', equals('enable_stream_desktop_notifications')),
|
|
('user', check_string),
|
|
('setting', check_bool),
|
|
])
|
|
do_change_enable_stream_desktop_notifications(self.user_profile, False)
|
|
for setting_value in [True, False]:
|
|
events = self.do_test(lambda: do_change_enable_stream_desktop_notifications(self.user_profile, setting_value))
|
|
error = schema_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
def test_change_enable_stream_sounds(self):
|
|
# type: () -> None
|
|
schema_checker = check_dict([
|
|
('type', equals('update_global_notifications')),
|
|
('notification_name', equals('enable_stream_sounds')),
|
|
('user', check_string),
|
|
('setting', check_bool),
|
|
])
|
|
do_change_enable_stream_sounds(self.user_profile, False)
|
|
for setting_value in [True, False]:
|
|
events = self.do_test(lambda: do_change_enable_stream_sounds(self.user_profile, setting_value))
|
|
error = schema_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
def test_change_enable_desktop_notifications(self):
|
|
# type: () -> None
|
|
schema_checker = check_dict([
|
|
('type', equals('update_global_notifications')),
|
|
('notification_name', equals('enable_desktop_notifications')),
|
|
('user', check_string),
|
|
('setting', check_bool),
|
|
])
|
|
do_change_enable_desktop_notifications(self.user_profile, False)
|
|
for setting_value in [True, False]:
|
|
events = self.do_test(lambda: do_change_enable_desktop_notifications(self.user_profile, setting_value))
|
|
error = schema_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
def test_change_enable_sounds(self):
|
|
# type: () -> None
|
|
schema_checker = check_dict([
|
|
('type', equals('update_global_notifications')),
|
|
('notification_name', equals('enable_sounds')),
|
|
('user', check_string),
|
|
('setting', check_bool),
|
|
])
|
|
do_change_enable_sounds(self.user_profile, False)
|
|
for setting_value in [True, False]:
|
|
events = self.do_test(lambda: do_change_enable_sounds(self.user_profile, setting_value))
|
|
error = schema_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
def test_change_enable_offline_email_notifications(self):
|
|
# type: () -> None
|
|
schema_checker = check_dict([
|
|
('type', equals('update_global_notifications')),
|
|
('notification_name', equals('enable_offline_email_notifications')),
|
|
('user', check_string),
|
|
('setting', check_bool),
|
|
])
|
|
do_change_enable_offline_email_notifications(self.user_profile, False)
|
|
for setting_value in [True, False]:
|
|
events = self.do_test(lambda: do_change_enable_offline_email_notifications(self.user_profile, setting_value))
|
|
error = schema_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
def test_change_enable_offline_push_notifications(self):
|
|
# type: () -> None
|
|
schema_checker = check_dict([
|
|
('type', equals('update_global_notifications')),
|
|
('notification_name', equals('enable_offline_push_notifications')),
|
|
('user', check_string),
|
|
('setting', check_bool),
|
|
])
|
|
do_change_enable_offline_push_notifications(self.user_profile, False)
|
|
for setting_value in [True, False]:
|
|
events = self.do_test(lambda: do_change_enable_offline_push_notifications(self.user_profile, setting_value))
|
|
error = schema_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
def test_change_enable_online_push_notifications(self):
|
|
# type: () -> None
|
|
schema_checker = check_dict([
|
|
('type', equals('update_global_notifications')),
|
|
('notification_name', equals('enable_online_push_notifications')),
|
|
('user', check_string),
|
|
('setting', check_bool),
|
|
])
|
|
|
|
do_change_enable_online_push_notifications(self.user_profile, False)
|
|
for setting_value in [True, False]:
|
|
events = self.do_test(lambda: do_change_enable_online_push_notifications(self.user_profile, setting_value))
|
|
error = schema_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
def test_change_pm_content_in_desktop_notifications(self):
|
|
# type: () -> None
|
|
schema_checker = check_dict([
|
|
('type', equals('update_global_notifications')),
|
|
('notification_name', equals('pm_content_in_desktop_notifications')),
|
|
('user', check_string),
|
|
('setting', check_bool),
|
|
])
|
|
do_change_pm_content_in_desktop_notifications(self.user_profile, False)
|
|
for setting_value in [True, False]:
|
|
events = self.do_test(
|
|
lambda: do_change_pm_content_in_desktop_notifications(self.user_profile,
|
|
setting_value),
|
|
state_change_expected=False,
|
|
)
|
|
error = schema_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
def test_change_enable_digest_emails(self):
|
|
# type: () -> None
|
|
schema_checker = check_dict([
|
|
('type', equals('update_global_notifications')),
|
|
('notification_name', equals('enable_digest_emails')),
|
|
('user', check_string),
|
|
('setting', check_bool),
|
|
])
|
|
do_change_enable_digest_emails(self.user_profile, False)
|
|
for setting_value in [True, False]:
|
|
events = self.do_test(lambda: do_change_enable_digest_emails(self.user_profile, setting_value))
|
|
error = schema_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
def test_realm_emoji_events(self):
|
|
# type: () -> None
|
|
schema_checker = check_dict([
|
|
('type', equals('realm_emoji')),
|
|
('op', equals('update')),
|
|
('realm_emoji', check_dict([])),
|
|
])
|
|
events = self.do_test(lambda: check_add_realm_emoji(get_realm("zulip"), "my_emoji",
|
|
"https://realm.com/my_emoji"))
|
|
error = schema_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
events = self.do_test(lambda: do_remove_realm_emoji(get_realm("zulip"), "my_emoji"))
|
|
error = schema_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
def test_realm_filter_events(self):
|
|
# type: () -> None
|
|
schema_checker = check_dict([
|
|
('type', equals('realm_filters')),
|
|
('realm_filters', check_list(None)), # TODO: validate tuples in the list
|
|
])
|
|
events = self.do_test(lambda: do_add_realm_filter(get_realm("zulip"), "#(?P<id>[123])",
|
|
"https://realm.com/my_realm_filter/%(id)s"))
|
|
error = schema_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
self.do_test(lambda: do_remove_realm_filter(get_realm("zulip"), "#(?P<id>[123])"))
|
|
error = schema_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
def test_realm_alias_events(self):
|
|
# type: () -> None
|
|
schema_checker = check_dict([
|
|
('type', equals('realm_domains')),
|
|
('op', equals('add')),
|
|
('alias', check_dict([
|
|
('domain', check_string),
|
|
('allow_subdomains', check_bool),
|
|
])),
|
|
])
|
|
realm = get_realm('zulip')
|
|
events = self.do_test(lambda: do_add_realm_alias(realm, 'zulip.org', False))
|
|
error = schema_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
schema_checker = check_dict([
|
|
('type', equals('realm_domains')),
|
|
('op', equals('change')),
|
|
('alias', check_dict([
|
|
('domain', equals('zulip.org')),
|
|
('allow_subdomains', equals(True)),
|
|
])),
|
|
])
|
|
alias = RealmAlias.objects.get(realm=realm, domain='zulip.org')
|
|
events = self.do_test(lambda: do_change_realm_alias(alias, True))
|
|
error = schema_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
schema_checker = check_dict([
|
|
('type', equals('realm_domains')),
|
|
('op', equals('remove')),
|
|
('domain', equals('zulip.org')),
|
|
])
|
|
events = self.do_test(lambda: do_remove_realm_alias(alias))
|
|
error = schema_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
def test_create_bot(self):
|
|
# type: () -> None
|
|
bot_created_checker = check_dict([
|
|
('type', equals('realm_bot')),
|
|
('op', equals('add')),
|
|
('bot', check_dict([
|
|
('email', check_string),
|
|
('user_id', check_int),
|
|
('full_name', check_string),
|
|
('is_active', check_bool),
|
|
('api_key', check_string),
|
|
('default_sending_stream', check_none_or(check_string)),
|
|
('default_events_register_stream', check_none_or(check_string)),
|
|
('default_all_public_streams', check_bool),
|
|
('avatar_url', check_string),
|
|
])),
|
|
])
|
|
action = lambda: self.create_bot('test-bot@zulip.com')
|
|
events = self.do_test(action, num_events=2)
|
|
error = bot_created_checker('events[1]', events[1])
|
|
self.assert_on_error(error)
|
|
|
|
def test_change_bot_full_name(self):
|
|
# type: () -> None
|
|
bot = self.create_bot('test-bot@zulip.com')
|
|
action = lambda: do_change_full_name(bot, 'New Bot Name')
|
|
events = self.do_test(action, num_events=2)
|
|
error = self.realm_bot_schema('full_name', check_string)('events[1]', events[1])
|
|
self.assert_on_error(error)
|
|
|
|
def test_regenerate_bot_api_key(self):
|
|
# type: () -> None
|
|
bot = self.create_bot('test-bot@zulip.com')
|
|
action = lambda: do_regenerate_api_key(bot)
|
|
events = self.do_test(action)
|
|
error = self.realm_bot_schema('api_key', check_string)('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
def test_change_bot_avatar_source(self):
|
|
# type: () -> None
|
|
bot = self.create_bot('test-bot@zulip.com')
|
|
action = lambda: do_change_avatar_fields(bot, bot.AVATAR_FROM_USER)
|
|
events = self.do_test(action, num_events=2)
|
|
error = self.realm_bot_schema('avatar_url', check_string)('events[0]', events[0])
|
|
self.assertEqual(events[1]['type'], 'realm_user')
|
|
self.assert_on_error(error)
|
|
|
|
def test_change_realm_icon_source(self):
|
|
# type: () -> None
|
|
realm = get_realm('zulip')
|
|
action = lambda: do_change_icon_source(realm, realm.ICON_FROM_GRAVATAR)
|
|
events = self.do_test(action, state_change_expected=False)
|
|
schema_checker = check_dict([
|
|
('type', equals('realm')),
|
|
('op', equals('update_dict')),
|
|
('property', equals('icon')),
|
|
('data',
|
|
check_dict([('icon_url', check_string),
|
|
('icon_source', check_string)])),
|
|
])
|
|
error = schema_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
def test_change_bot_default_all_public_streams(self):
|
|
# type: () -> None
|
|
bot = self.create_bot('test-bot@zulip.com')
|
|
action = lambda: do_change_default_all_public_streams(bot, True)
|
|
events = self.do_test(action)
|
|
error = self.realm_bot_schema('default_all_public_streams', check_bool)('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
def test_change_bot_default_sending_stream(self):
|
|
# type: () -> None
|
|
bot = self.create_bot('test-bot@zulip.com')
|
|
stream = get_stream("Rome", bot.realm)
|
|
|
|
action = lambda: do_change_default_sending_stream(bot, stream)
|
|
events = self.do_test(action)
|
|
error = self.realm_bot_schema('default_sending_stream', check_string)('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
action = lambda: do_change_default_sending_stream(bot, None)
|
|
events = self.do_test(action)
|
|
error = self.realm_bot_schema('default_sending_stream', equals(None))('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
def test_change_bot_default_events_register_stream(self):
|
|
# type: () -> None
|
|
bot = self.create_bot('test-bot@zulip.com')
|
|
stream = get_stream("Rome", bot.realm)
|
|
|
|
action = lambda: do_change_default_events_register_stream(bot, stream)
|
|
events = self.do_test(action)
|
|
error = self.realm_bot_schema('default_events_register_stream', check_string)('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
action = lambda: do_change_default_events_register_stream(bot, None)
|
|
events = self.do_test(action)
|
|
error = self.realm_bot_schema('default_events_register_stream', equals(None))('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
def test_change_bot_owner(self):
|
|
# type: () -> None
|
|
change_bot_owner_checker = check_dict([
|
|
('type', equals('realm_bot')),
|
|
('op', equals('update')),
|
|
('bot', check_dict([
|
|
('email', check_string),
|
|
('user_id', check_int),
|
|
('owner_id', check_int),
|
|
])),
|
|
])
|
|
self.user_profile = get_user_profile_by_email('iago@zulip.com')
|
|
owner = get_user_profile_by_email('hamlet@zulip.com')
|
|
bot = self.create_bot('test-bot@zulip.com')
|
|
action = lambda: do_change_bot_owner(bot, owner)
|
|
events = self.do_test(action)
|
|
error = change_bot_owner_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
def test_do_deactivate_user(self):
|
|
# type: () -> None
|
|
bot_deactivate_checker = check_dict([
|
|
('type', equals('realm_bot')),
|
|
('op', equals('remove')),
|
|
('bot', check_dict([
|
|
('email', check_string),
|
|
('full_name', check_string),
|
|
])),
|
|
])
|
|
bot = self.create_bot('foo-bot@zulip.com')
|
|
action = lambda: do_deactivate_user(bot)
|
|
events = self.do_test(action, num_events=2)
|
|
error = bot_deactivate_checker('events[1]', events[1])
|
|
self.assert_on_error(error)
|
|
|
|
def test_do_reactivate_user(self):
|
|
# type: () -> None
|
|
bot_reactivate_checker = check_dict([
|
|
('type', equals('realm_bot')),
|
|
('op', equals('add')),
|
|
('bot', check_dict([
|
|
('email', check_string),
|
|
('user_id', check_int),
|
|
('full_name', check_string),
|
|
('is_active', check_bool),
|
|
('api_key', check_string),
|
|
('default_sending_stream', check_none_or(check_string)),
|
|
('default_events_register_stream', check_none_or(check_string)),
|
|
('default_all_public_streams', check_bool),
|
|
('avatar_url', check_string),
|
|
('owner', check_none_or(check_string)),
|
|
])),
|
|
])
|
|
bot = self.create_bot('foo-bot@zulip.com')
|
|
do_deactivate_user(bot)
|
|
action = lambda: do_reactivate_user(bot)
|
|
events = self.do_test(action, num_events=2)
|
|
error = bot_reactivate_checker('events[1]', events[1])
|
|
self.assert_on_error(error)
|
|
|
|
def test_do_mark_hotspot_as_read(self):
|
|
# type: () -> None
|
|
schema_checker = check_dict([
|
|
('type', equals('hotspots')),
|
|
('hotspots', check_list(check_string)),
|
|
])
|
|
events = self.do_test(lambda: do_mark_hotspot_as_read(self.user_profile, 'welcome'))
|
|
error = schema_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
def test_rename_stream(self):
|
|
# type: () -> None
|
|
stream = self.make_stream('old_name')
|
|
new_name = u'stream with a brand new name'
|
|
self.subscribe_to_stream(self.user_profile.email, stream.name)
|
|
|
|
action = lambda: do_rename_stream(stream, new_name)
|
|
events = self.do_test(action, num_events=2)
|
|
|
|
schema_checker = check_dict([
|
|
('type', equals('stream')),
|
|
('op', equals('update')),
|
|
('property', equals('email_address')),
|
|
('value', check_string),
|
|
('stream_id', check_int),
|
|
('name', equals('old_name')),
|
|
])
|
|
error = schema_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
schema_checker = check_dict([
|
|
('type', equals('stream')),
|
|
('op', equals('update')),
|
|
('property', equals('name')),
|
|
('value', equals(new_name)),
|
|
('name', equals('old_name')),
|
|
])
|
|
error = schema_checker('events[1]', events[1])
|
|
self.assert_on_error(error)
|
|
|
|
def test_deactivate_stream_neversubscribed(self):
|
|
# type: () -> None
|
|
stream = self.make_stream('old_name')
|
|
|
|
action = lambda: do_deactivate_stream(stream)
|
|
events = self.do_test(action)
|
|
|
|
schema_checker = check_dict([
|
|
('type', equals('stream')),
|
|
('op', equals('delete')),
|
|
('streams', check_list(check_dict([]))),
|
|
])
|
|
error = schema_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
def test_subscribe_other_user_never_subscribed(self):
|
|
# type: () -> None
|
|
action = lambda: self.subscribe_to_stream("othello@zulip.com", u"test_stream")
|
|
events = self.do_test(action, num_events=2)
|
|
peer_add_schema_checker = check_dict([
|
|
('type', equals('subscription')),
|
|
('op', equals('peer_add')),
|
|
('user_id', check_int),
|
|
('subscriptions', check_list(check_string)),
|
|
])
|
|
error = peer_add_schema_checker('events[1]', events[1])
|
|
self.assert_on_error(error)
|
|
|
|
def test_subscribe_events(self):
|
|
# type: () -> None
|
|
self.do_test_subscribe_events(include_subscribers=True)
|
|
|
|
def test_subscribe_events_no_include_subscribers(self):
|
|
# type: () -> None
|
|
self.do_test_subscribe_events(include_subscribers=False)
|
|
|
|
def do_test_subscribe_events(self, include_subscribers):
|
|
# type: (bool) -> None
|
|
subscription_fields = [
|
|
('color', check_string),
|
|
('description', check_string),
|
|
('email_address', check_string),
|
|
('invite_only', check_bool),
|
|
('in_home_view', check_bool),
|
|
('name', check_string),
|
|
('desktop_notifications', check_bool),
|
|
('audible_notifications', check_bool),
|
|
('stream_id', check_int),
|
|
]
|
|
if include_subscribers:
|
|
subscription_fields.append(('subscribers', check_list(check_int))) # type: ignore
|
|
subscription_schema_checker = check_list(
|
|
check_dict(subscription_fields),
|
|
)
|
|
stream_create_schema_checker = check_dict([
|
|
('type', equals('stream')),
|
|
('op', equals('create')),
|
|
('streams', check_list(check_dict([
|
|
('name', check_string),
|
|
('stream_id', check_int),
|
|
('invite_only', check_bool),
|
|
('description', check_string),
|
|
]))),
|
|
])
|
|
add_schema_checker = check_dict([
|
|
('type', equals('subscription')),
|
|
('op', equals('add')),
|
|
('subscriptions', subscription_schema_checker),
|
|
])
|
|
remove_schema_checker = check_dict([
|
|
('type', equals('subscription')),
|
|
('op', equals('remove')),
|
|
('subscriptions', check_list(
|
|
check_dict([
|
|
('name', equals('test_stream')),
|
|
('stream_id', check_int),
|
|
]),
|
|
)),
|
|
])
|
|
peer_add_schema_checker = check_dict([
|
|
('type', equals('subscription')),
|
|
('op', equals('peer_add')),
|
|
('user_id', check_int),
|
|
('subscriptions', check_list(check_string)),
|
|
])
|
|
peer_remove_schema_checker = check_dict([
|
|
('type', equals('subscription')),
|
|
('op', equals('peer_remove')),
|
|
('user_id', check_int),
|
|
('subscriptions', check_list(check_string)),
|
|
])
|
|
stream_update_schema_checker = check_dict([
|
|
('type', equals('stream')),
|
|
('op', equals('update')),
|
|
('property', equals('description')),
|
|
('value', check_string),
|
|
('stream_id', check_int),
|
|
('name', check_string),
|
|
])
|
|
|
|
# Subscribe to a totally new stream, so it's just Hamlet on it
|
|
action = lambda: self.subscribe_to_stream("hamlet@zulip.com", "test_stream") # type: Callable
|
|
events = self.do_test(action, event_types=["subscription", "realm_user"],
|
|
include_subscribers=include_subscribers)
|
|
error = add_schema_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
# Add another user to that totally new stream
|
|
action = lambda: self.subscribe_to_stream("othello@zulip.com", "test_stream")
|
|
events = self.do_test(action,
|
|
include_subscribers=include_subscribers,
|
|
state_change_expected=include_subscribers,
|
|
)
|
|
error = peer_add_schema_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
stream = get_stream("test_stream", self.user_profile.realm)
|
|
|
|
# Now remove the first user, to test the normal unsubscribe flow
|
|
action = lambda: bulk_remove_subscriptions(
|
|
[get_user_profile_by_email("othello@zulip.com")],
|
|
[stream])
|
|
events = self.do_test(action,
|
|
include_subscribers=include_subscribers,
|
|
state_change_expected=include_subscribers,
|
|
)
|
|
error = peer_remove_schema_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
# Now remove the second user, to test the 'vacate' event flow
|
|
action = lambda: bulk_remove_subscriptions(
|
|
[get_user_profile_by_email("hamlet@zulip.com")],
|
|
[stream])
|
|
events = self.do_test(action,
|
|
include_subscribers=include_subscribers,
|
|
num_events=2)
|
|
error = remove_schema_checker('events[1]', events[1])
|
|
self.assert_on_error(error)
|
|
|
|
# Now resubscribe a user, to make sure that works on a vacated stream
|
|
action = lambda: self.subscribe_to_stream("hamlet@zulip.com", "test_stream")
|
|
events = self.do_test(action,
|
|
include_subscribers=include_subscribers,
|
|
num_events=2)
|
|
error = add_schema_checker('events[1]', events[1])
|
|
self.assert_on_error(error)
|
|
|
|
action = lambda: do_change_stream_description(stream, u'new description')
|
|
events = self.do_test(action,
|
|
include_subscribers=include_subscribers)
|
|
error = stream_update_schema_checker('events[0]', events[0])
|
|
self.assert_on_error(error)
|
|
|
|
# Subscribe to a totally new invite-only stream, so it's just Hamlet on it
|
|
stream = self.make_stream("private", get_realm("zulip"), invite_only=True)
|
|
user_profile = get_user_profile_by_email("hamlet@zulip.com")
|
|
action = lambda: bulk_add_subscriptions([stream], [user_profile])
|
|
events = self.do_test(action, include_subscribers=include_subscribers,
|
|
num_events=2)
|
|
error = stream_create_schema_checker('events[0]', events[0])
|
|
error = add_schema_checker('events[1]', events[1])
|
|
self.assert_on_error(error)
|
|
|
|
class FetchInitialStateDataTest(ZulipTestCase):
|
|
# Non-admin users don't have access to all bots
|
|
def test_realm_bots_non_admin(self):
|
|
# type: () -> None
|
|
email = 'cordelia@zulip.com'
|
|
user_profile = get_user_profile_by_email(email)
|
|
self.assertFalse(user_profile.is_realm_admin)
|
|
result = fetch_initial_state_data(user_profile, None, "")
|
|
self.assert_length(result['realm_bots'], 0)
|
|
|
|
# additionally the API key for a random bot is not present in the data
|
|
api_key = get_user_profile_by_email('notification-bot@zulip.com').api_key
|
|
self.assertNotIn(api_key, str(result))
|
|
|
|
# Admin users have access to all bots in the realm_bots field
|
|
def test_realm_bots_admin(self):
|
|
# type: () -> None
|
|
email = 'hamlet@zulip.com'
|
|
user_profile = get_user_profile_by_email(email)
|
|
do_change_is_admin(user_profile, True)
|
|
self.assertTrue(user_profile.is_realm_admin)
|
|
result = fetch_initial_state_data(user_profile, None, "")
|
|
self.assertTrue(len(result['realm_bots']) > 5)
|
|
|
|
def test_max_message_id_with_no_history(self):
|
|
# type: () -> None
|
|
email = 'aaron@zulip.com'
|
|
user_profile = get_user_profile_by_email(email)
|
|
# Delete all historical messages for this user
|
|
UserMessage.objects.filter(user_profile=user_profile).delete()
|
|
result = fetch_initial_state_data(user_profile, None, "")
|
|
self.assertEqual(result['max_message_id'], -1)
|
|
|
|
class EventQueueTest(TestCase):
|
|
def test_one_event(self):
|
|
# type: () -> None
|
|
queue = EventQueue("1")
|
|
queue.push({"type": "pointer",
|
|
"pointer": 1,
|
|
"timestamp": "1"})
|
|
self.assertFalse(queue.empty())
|
|
self.assertEqual(queue.contents(),
|
|
[{'id': 0,
|
|
'type': 'pointer',
|
|
"pointer": 1,
|
|
"timestamp": "1"}])
|
|
|
|
def test_event_collapsing(self):
|
|
# type: () -> None
|
|
queue = EventQueue("1")
|
|
for pointer_val in range(1, 10):
|
|
queue.push({"type": "pointer",
|
|
"pointer": pointer_val,
|
|
"timestamp": str(pointer_val)})
|
|
self.assertEqual(queue.contents(),
|
|
[{'id': 8,
|
|
'type': 'pointer',
|
|
"pointer": 9,
|
|
"timestamp": "9"}])
|
|
|
|
queue = EventQueue("2")
|
|
for pointer_val in range(1, 10):
|
|
queue.push({"type": "pointer",
|
|
"pointer": pointer_val,
|
|
"timestamp": str(pointer_val)})
|
|
queue.push({"type": "unknown"})
|
|
queue.push({"type": "restart", "server_generation": "1"})
|
|
for pointer_val in range(11, 20):
|
|
queue.push({"type": "pointer",
|
|
"pointer": pointer_val,
|
|
"timestamp": str(pointer_val)})
|
|
queue.push({"type": "restart", "server_generation": "2"})
|
|
self.assertEqual(queue.contents(),
|
|
[{"type": "unknown",
|
|
"id": 9},
|
|
{'id': 19,
|
|
'type': 'pointer',
|
|
"pointer": 19,
|
|
"timestamp": "19"},
|
|
{"id": 20,
|
|
"type": "restart",
|
|
"server_generation": "2"}])
|
|
for pointer_val in range(21, 23):
|
|
queue.push({"type": "pointer",
|
|
"pointer": pointer_val,
|
|
"timestamp": str(pointer_val)})
|
|
self.assertEqual(queue.contents(),
|
|
[{"type": "unknown",
|
|
"id": 9},
|
|
{'id': 19,
|
|
'type': 'pointer',
|
|
"pointer": 19,
|
|
"timestamp": "19"},
|
|
{"id": 20,
|
|
"type": "restart",
|
|
"server_generation": "2"},
|
|
{'id': 22,
|
|
'type': 'pointer',
|
|
"pointer": 22,
|
|
"timestamp": "22"},
|
|
])
|
|
|
|
def test_flag_add_collapsing(self):
|
|
# type: () -> None
|
|
queue = EventQueue("1")
|
|
queue.push({"type": "update_message_flags",
|
|
"flag": "read",
|
|
"operation": "add",
|
|
"all": False,
|
|
"messages": [1, 2, 3, 4],
|
|
"timestamp": "1"})
|
|
queue.push({"type": "update_message_flags",
|
|
"flag": "read",
|
|
"all": False,
|
|
"operation": "add",
|
|
"messages": [5, 6],
|
|
"timestamp": "1"})
|
|
self.assertEqual(queue.contents(),
|
|
[{'id': 1,
|
|
'type': 'update_message_flags',
|
|
"all": False,
|
|
"flag": "read",
|
|
"operation": "add",
|
|
"messages": [1, 2, 3, 4, 5, 6],
|
|
"timestamp": "1"}])
|
|
|
|
def test_flag_remove_collapsing(self):
|
|
# type: () -> None
|
|
queue = EventQueue("1")
|
|
queue.push({"type": "update_message_flags",
|
|
"flag": "collapsed",
|
|
"operation": "remove",
|
|
"all": False,
|
|
"messages": [1, 2, 3, 4],
|
|
"timestamp": "1"})
|
|
queue.push({"type": "update_message_flags",
|
|
"flag": "collapsed",
|
|
"all": False,
|
|
"operation": "remove",
|
|
"messages": [5, 6],
|
|
"timestamp": "1"})
|
|
self.assertEqual(queue.contents(),
|
|
[{'id': 1,
|
|
'type': 'update_message_flags',
|
|
"all": False,
|
|
"flag": "collapsed",
|
|
"operation": "remove",
|
|
"messages": [1, 2, 3, 4, 5, 6],
|
|
"timestamp": "1"}])
|
|
|
|
def test_collapse_event(self):
|
|
# type: () -> None
|
|
queue = EventQueue("1")
|
|
queue.push({"type": "pointer",
|
|
"pointer": 1,
|
|
"timestamp": "1"})
|
|
queue.push({"type": "unknown",
|
|
"timestamp": "1"})
|
|
self.assertEqual(queue.contents(),
|
|
[{'id': 0,
|
|
'type': 'pointer',
|
|
"pointer": 1,
|
|
"timestamp": "1"},
|
|
{'id': 1,
|
|
'type': 'unknown',
|
|
"timestamp": "1"}])
|
|
|
|
class TestEventsRegisterAllPublicStreamsDefaults(TestCase):
|
|
def setUp(self):
|
|
# type: () -> None
|
|
self.email = 'hamlet@zulip.com'
|
|
self.user_profile = get_user_profile_by_email(self.email)
|
|
|
|
def test_use_passed_all_public_true_default_false(self):
|
|
# type: () -> None
|
|
self.user_profile.default_all_public_streams = False
|
|
self.user_profile.save()
|
|
result = _default_all_public_streams(self.user_profile, True)
|
|
self.assertTrue(result)
|
|
|
|
def test_use_passed_all_public_true_default(self):
|
|
# type: () -> None
|
|
self.user_profile.default_all_public_streams = True
|
|
self.user_profile.save()
|
|
result = _default_all_public_streams(self.user_profile, True)
|
|
self.assertTrue(result)
|
|
|
|
def test_use_passed_all_public_false_default_false(self):
|
|
# type: () -> None
|
|
self.user_profile.default_all_public_streams = False
|
|
self.user_profile.save()
|
|
result = _default_all_public_streams(self.user_profile, False)
|
|
self.assertFalse(result)
|
|
|
|
def test_use_passed_all_public_false_default_true(self):
|
|
# type: () -> None
|
|
self.user_profile.default_all_public_streams = True
|
|
self.user_profile.save()
|
|
result = _default_all_public_streams(self.user_profile, False)
|
|
self.assertFalse(result)
|
|
|
|
def test_use_true_default_for_none(self):
|
|
# type: () -> None
|
|
self.user_profile.default_all_public_streams = True
|
|
self.user_profile.save()
|
|
result = _default_all_public_streams(self.user_profile, None)
|
|
self.assertTrue(result)
|
|
|
|
def test_use_false_default_for_none(self):
|
|
# type: () -> None
|
|
self.user_profile.default_all_public_streams = False
|
|
self.user_profile.save()
|
|
result = _default_all_public_streams(self.user_profile, None)
|
|
self.assertFalse(result)
|
|
|
|
class TestEventsRegisterNarrowDefaults(TestCase):
|
|
def setUp(self):
|
|
# type: () -> None
|
|
self.email = 'hamlet@zulip.com'
|
|
self.user_profile = get_user_profile_by_email(self.email)
|
|
self.stream = get_stream('Verona', self.user_profile.realm)
|
|
|
|
def test_use_passed_narrow_no_default(self):
|
|
# type: () -> None
|
|
self.user_profile.default_events_register_stream_id = None
|
|
self.user_profile.save()
|
|
result = _default_narrow(self.user_profile, [[u'stream', u'my_stream']])
|
|
self.assertEqual(result, [[u'stream', u'my_stream']])
|
|
|
|
def test_use_passed_narrow_with_default(self):
|
|
# type: () -> None
|
|
self.user_profile.default_events_register_stream_id = self.stream.id
|
|
self.user_profile.save()
|
|
result = _default_narrow(self.user_profile, [[u'stream', u'my_stream']])
|
|
self.assertEqual(result, [[u'stream', u'my_stream']])
|
|
|
|
def test_use_default_if_narrow_is_empty(self):
|
|
# type: () -> None
|
|
self.user_profile.default_events_register_stream_id = self.stream.id
|
|
self.user_profile.save()
|
|
result = _default_narrow(self.user_profile, [])
|
|
self.assertEqual(result, [[u'stream', u'Verona']])
|
|
|
|
def test_use_narrow_if_default_is_none(self):
|
|
# type: () -> None
|
|
self.user_profile.default_events_register_stream_id = None
|
|
self.user_profile.save()
|
|
result = _default_narrow(self.user_profile, [])
|
|
self.assertEqual(result, [])
|