Files
zulip/zerver/test_events.py
Steve Howell 72ee636571 Use stream_id as key for subscription removals.
Have the server send down the stream's id for removal
events, and have the client use that id to look up the
stream in its internal data structures.  This sets the
stage for eventually just sending the stream id (and not
the stream name) down to clients, once all our clients
are ready to use the stream id.

(imported from commit 922516c98fb79ffad8ae7da0396646663ca54fd0)
2014-02-10 13:23:27 -05:00

485 lines
20 KiB
Python

# -*- coding: utf-8 -*-
from django.test import TestCase
from zerver.models import (
get_client, get_realm, get_stream, get_user_profile_by_email,
Recipient,
)
from zerver.lib.actions import (
apply_events,
create_stream_if_needed,
do_add_alert_words,
do_add_realm_filter,
do_add_realm_emoji,
do_change_full_name,
do_change_is_admin,
do_change_stream_description,
do_remove_alert_words,
do_remove_realm_emoji,
do_remove_realm_filter,
do_remove_subscription,
do_rename_stream,
do_set_muted_topics,
do_set_realm_name,
do_update_pointer,
fetch_initial_state_data,
)
from zerver.lib.event_queue import allocate_client_descriptor
from zerver.lib.test_helpers import AuthedTestCase, POSTRequestMock
from zerver.lib.validator import (
check_bool, check_dict, check_int, check_list, check_string,
equals,
)
from zerver.tornadoviews import get_events_backend
from collections import OrderedDict
import ujson
class GetEventsTest(AuthedTestCase):
def tornado_call(self, view_func, user_profile, post_data,
callback=None):
request = POSTRequestMock(post_data, user_profile, callback)
return view_func(request, user_profile)
def test_get_events(self):
email = "hamlet@zulip.com"
recipient_email = "othello@zulip.com"
user_profile = get_user_profile_by_email(email)
recipient_user_profile = get_user_profile_by_email(recipient_email)
self.login(email)
result = self.tornado_call(get_events_backend, user_profile,
{"apply_markdown": ujson.dumps(True),
"event_types": ujson.dumps(["message"]),
"user_client": "website",
"dont_block": ujson.dumps(True),
})
self.assert_json_success(result)
queue_id = ujson.loads(result.content)["queue_id"]
recipient_result = self.tornado_call(get_events_backend, recipient_user_profile,
{"apply_markdown": ujson.dumps(True),
"event_types": ujson.dumps(["message"]),
"user_client": "website",
"dont_block": ujson.dumps(True),
})
self.assert_json_success(recipient_result)
recipient_queue_id = ujson.loads(recipient_result.content)["queue_id"]
result = self.tornado_call(get_events_backend, user_profile,
{"queue_id": queue_id,
"user_client": "website",
"last_event_id": -1,
"dont_block": ujson.dumps(True),
})
events = ujson.loads(result.content)["events"]
self.assert_json_success(result)
self.assert_length(events, 0, True)
local_id = 10.01
self.send_message(email, recipient_email, Recipient.PERSONAL, "hello", local_id=local_id, sender_queue_id=queue_id)
result = self.tornado_call(get_events_backend, user_profile,
{"queue_id": queue_id,
"user_client": "website",
"last_event_id": -1,
"dont_block": ujson.dumps(True),
})
events = ujson.loads(result.content)["events"]
self.assert_json_success(result)
self.assert_length(events, 1, True)
self.assertEqual(events[0]["type"], "message")
self.assertEqual(events[0]["message"]["sender_email"], email)
self.assertEqual(events[0]["local_message_id"], local_id)
last_event_id = events[0]["id"]
local_id += 0.01
self.send_message(email, recipient_email, Recipient.PERSONAL, "hello", local_id=local_id, sender_queue_id=queue_id)
result = self.tornado_call(get_events_backend, user_profile,
{"queue_id": queue_id,
"user_client": "website",
"last_event_id": last_event_id,
"dont_block": ujson.dumps(True),
})
events = ujson.loads(result.content)["events"]
self.assert_json_success(result)
self.assert_length(events, 1, True)
self.assertEqual(events[0]["type"], "message")
self.assertEqual(events[0]["message"]["sender_email"], email)
self.assertEqual(events[0]["local_message_id"], local_id)
# Test that the received message in the receiver's event queue
# exists and does not contain a local id
recipient_result = self.tornado_call(get_events_backend, recipient_user_profile,
{"queue_id": recipient_queue_id,
"user_client": "website",
"last_event_id": -1,
"dont_block": ujson.dumps(True),
})
recipient_events = ujson.loads(recipient_result.content)["events"]
self.assert_json_success(recipient_result)
self.assertEqual(len(recipient_events), 2)
self.assertEqual(recipient_events[0]["type"], "message")
self.assertEqual(recipient_events[0]["message"]["sender_email"], email)
self.assertTrue("local_message_id" not in recipient_events[0])
self.assertEqual(recipient_events[1]["type"], "message")
self.assertEqual(recipient_events[1]["message"]["sender_email"], email)
self.assertTrue("local_message_id" not in recipient_events[1])
def test_get_events_narrow(self):
email = "hamlet@zulip.com"
user_profile = get_user_profile_by_email(email)
self.login(email)
result = self.tornado_call(get_events_backend, user_profile,
{"apply_markdown": ujson.dumps(True),
"event_types": ujson.dumps(["message"]),
"narrow": ujson.dumps([["stream", "denmark"]]),
"user_client": "website",
"dont_block": ujson.dumps(True),
})
self.assert_json_success(result)
queue_id = ujson.loads(result.content)["queue_id"]
result = self.tornado_call(get_events_backend, user_profile,
{"queue_id": queue_id,
"user_client": "website",
"last_event_id": -1,
"dont_block": ujson.dumps(True),
})
events = ujson.loads(result.content)["events"]
self.assert_json_success(result)
self.assert_length(events, 0, True)
self.send_message(email, "othello@zulip.com", Recipient.PERSONAL, "hello")
self.send_message(email, "Denmark", Recipient.STREAM, "hello")
result = self.tornado_call(get_events_backend, user_profile,
{"queue_id": queue_id,
"user_client": "website",
"last_event_id": -1,
"dont_block": ujson.dumps(True),
})
events = ujson.loads(result.content)["events"]
self.assert_json_success(result)
self.assert_length(events, 1, True)
self.assertEqual(events[0]["type"], "message")
self.assertEqual(events[0]["message"]["display_recipient"], "Denmark")
class EventsRegisterTest(AuthedTestCase):
maxDiff = None
user_profile = get_user_profile_by_email("hamlet@zulip.com")
def do_test(self, action, event_types=None):
client = allocate_client_descriptor(self.user_profile.id, self.user_profile.realm.id,
event_types,
get_client("website"), True, False, 600, [])
# hybrid_state = initial fetch state + re-applying events triggered by our action
# normal_state = do action then fetch at the end (the "normal" code path)
hybrid_state = fetch_initial_state_data(self.user_profile, event_types, "")
action()
events = client.event_queue.contents()
self.assertTrue(len(events) > 0)
apply_events(hybrid_state, events, self.user_profile)
normal_state = fetch_initial_state_data(self.user_profile, event_types, "")
self.match_states(hybrid_state, normal_state)
return events
def assert_on_error(self, error):
if error:
raise AssertionError(error)
def match_states(self, state1, state2):
def normalize(state):
state['realm_users'] = {u['email']: u for u in state['realm_users']}
state['subscriptions'] = {u['name']: u for u in state['subscriptions']}
state['unsubscribed'] = {u['name']: u for u in state['unsubscribed']}
normalize(state1)
normalize(state2)
self.assertEqual(state1, state2)
def test_send_message_events(self):
self.do_test(lambda: self.send_message("hamlet@zulip.com", "Verona", Recipient.STREAM, "hello"))
def test_pointer_events(self):
self.do_test(lambda: do_update_pointer(self.user_profile, 1500))
def test_register_events(self):
self.do_test(lambda: self.register("test1", "test1"))
def test_alert_words_events(self):
self.do_test(lambda: do_add_alert_words(self.user_profile, ["alert_word"]))
self.do_test(lambda: do_remove_alert_words(self.user_profile, ["alert_word"]))
def test_muted_topics_events(self):
self.do_test(lambda: do_set_muted_topics(self.user_profile, [["Denmark", "topic"]]))
def test_change_full_name(self):
self.do_test(lambda: do_change_full_name(self.user_profile, 'Sir Hamlet'))
def test_change_realm_name(self):
self.do_test(lambda: do_set_realm_name(self.user_profile.realm, 'New Realm Name'))
def test_change_is_admin(self):
# The first False is probably a noop, then we get transitions in both directions.
for is_admin in [False, True, False]:
self.do_test(lambda: do_change_is_admin(self.user_profile, is_admin))
def test_realm_emoji_events(self):
self.do_test(lambda: do_add_realm_emoji(get_realm("zulip.com"), "my_emoji",
"https://realm.com/my_emoji"))
self.do_test(lambda: do_remove_realm_emoji(get_realm("zulip.com"), "my_emoji"))
def test_realm_filter_events(self):
self.do_test(lambda: do_add_realm_filter(get_realm("zulip.com"), "#[123]",
"https://realm.com/my_realm_filter/%(id)s"))
self.do_test(lambda: do_remove_realm_filter(get_realm("zulip.com"), "#[123]"))
def test_rename_stream(self):
realm = get_realm('zulip.com')
stream, _ = create_stream_if_needed(realm, 'old_name')
new_name = u'stream with a brand new name'
self.subscribe_to_stream(self.user_profile.email, stream.name)
action = lambda: do_rename_stream(realm, stream.name, new_name)
events = self.do_test(action)
schema_checker = check_dict([
('type', equals('stream')),
('op', equals('update')),
('property', equals('email_address')),
('value', check_string),
('name', equals('old_name')),
])
error = schema_checker('events[0]', events[0])
self.assert_on_error(error)
schema_checker = check_dict([
('type', equals('stream')),
('op', equals('update')),
('property', equals('name')),
('value', equals(new_name)),
('name', equals('old_name')),
])
error = schema_checker('events[1]', events[1])
self.assert_on_error(error)
def test_subscribe_events(self):
subscription_schema_checker = check_list(
check_dict([
('color', check_string),
('description', check_string),
('email_address', check_string),
('invite_only', check_bool),
('in_home_view', check_bool),
('name', check_string),
('notifications', check_bool),
('stream_id', check_int),
('subscribers', check_list(check_int)),
])
)
add_schema_checker = check_dict([
('type', equals('subscription')),
('op', equals('add')),
('subscriptions', subscription_schema_checker),
])
remove_schema_checker = check_dict([
('type', equals('subscription')),
('op', equals('remove')),
('subscriptions', check_list(
check_dict([
('name', equals('test_stream')),
('stream_id', check_int),
]),
)),
])
peer_add_schema_checker = check_dict([
('type', equals('subscription')),
('op', equals('peer_add')),
('user_email', check_string),
('subscriptions', check_list(check_string)),
])
peer_remove_schema_checker = check_dict([
('type', equals('subscription')),
('op', equals('peer_remove')),
('user_email', check_string),
('subscriptions', check_list(check_string)),
])
stream_update_schema_checker = check_dict([
('type', equals('stream')),
('op', equals('update')),
('property', equals('description')),
('value', check_string),
('name', check_string),
])
action = lambda: self.subscribe_to_stream("hamlet@zulip.com", "test_stream")
events = self.do_test(action, event_types=["subscription", "realm_user"])
error = add_schema_checker('events[0]', events[0])
self.assert_on_error(error)
action = lambda: self.subscribe_to_stream("othello@zulip.com", "test_stream")
events = self.do_test(action)
error = peer_add_schema_checker('events[0]', events[0])
self.assert_on_error(error)
stream = get_stream("test_stream", self.user_profile.realm)
action = lambda: do_remove_subscription(get_user_profile_by_email("othello@zulip.com"), stream)
events = self.do_test(action)
error = peer_remove_schema_checker('events[0]', events[0])
self.assert_on_error(error)
action = lambda: do_remove_subscription(get_user_profile_by_email("hamlet@zulip.com"), stream)
events = self.do_test(action)
error = remove_schema_checker('events[0]', events[0])
self.assert_on_error(error)
action = lambda: self.subscribe_to_stream("hamlet@zulip.com", "test_stream")
events = self.do_test(action)
error = add_schema_checker('events[0]', events[0])
self.assert_on_error(error)
action = lambda: do_change_stream_description(get_realm('zulip.com'), 'test_stream', 'new description')
events = self.do_test(action)
error = stream_update_schema_checker('events[0]', events[0])
self.assert_on_error(error)
from zerver.lib.event_queue import EventQueue
class EventQueueTest(TestCase):
def test_one_event(self):
queue = EventQueue("1")
queue.push({"type": "pointer",
"pointer": 1,
"timestamp": "1"})
self.assertFalse(queue.empty())
self.assertEqual(queue.contents(),
[{'id': 0,
'type': 'pointer',
"pointer": 1,
"timestamp": "1"}])
def test_event_collapsing(self):
queue = EventQueue("1")
for pointer_val in xrange(1, 10):
queue.push({"type": "pointer",
"pointer": pointer_val,
"timestamp": str(pointer_val)})
self.assertEqual(queue.contents(),
[{'id': 8,
'type': 'pointer',
"pointer": 9,
"timestamp": "9"}])
queue = EventQueue("2")
for pointer_val in xrange(1, 10):
queue.push({"type": "pointer",
"pointer": pointer_val,
"timestamp": str(pointer_val)})
queue.push({"type": "unknown"})
queue.push({"type": "restart", "server_generation": "1"})
for pointer_val in xrange(11, 20):
queue.push({"type": "pointer",
"pointer": pointer_val,
"timestamp": str(pointer_val)})
queue.push({"type": "restart", "server_generation": "2"})
self.assertEqual(queue.contents(),
[{"type": "unknown",
"id": 9,},
{'id': 19,
'type': 'pointer',
"pointer": 19,
"timestamp": "19"},
{"id": 20,
"type": "restart",
"server_generation": "2"}])
for pointer_val in xrange(21, 23):
queue.push({"type": "pointer",
"pointer": pointer_val,
"timestamp": str(pointer_val)})
self.assertEqual(queue.contents(),
[{"type": "unknown",
"id": 9,},
{'id': 19,
'type': 'pointer',
"pointer": 19,
"timestamp": "19"},
{"id": 20,
"type": "restart",
"server_generation": "2"},
{'id': 22,
'type': 'pointer',
"pointer": 22,
"timestamp": "22"},
])
def test_flag_add_collapsing(self):
queue = EventQueue("1")
queue.push({"type": "update_message_flags",
"flag": "read",
"operation": "add",
"all": False,
"messages": [1, 2, 3, 4],
"timestamp": "1"})
queue.push({"type": "update_message_flags",
"flag": "read",
"all": False,
"operation": "add",
"messages": [5, 6],
"timestamp": "1"})
self.assertEqual(queue.contents(),
[{'id': 1,
'type': 'update_message_flags',
"all": False,
"flag": "read",
"operation": "add",
"messages": [1,2,3,4,5,6],
"timestamp": "1"}])
def test_flag_remove_collapsing(self):
queue = EventQueue("1")
queue.push({"type": "update_message_flags",
"flag": "collapsed",
"operation": "remove",
"all": False,
"messages": [1, 2, 3, 4],
"timestamp": "1"})
queue.push({"type": "update_message_flags",
"flag": "collapsed",
"all": False,
"operation": "remove",
"messages": [5, 6],
"timestamp": "1"})
self.assertEqual(queue.contents(),
[{'id': 1,
'type': 'update_message_flags',
"all": False,
"flag": "collapsed",
"operation": "remove",
"messages": [1,2,3,4,5,6],
"timestamp": "1"}])
def test_collapse_event(self):
queue = EventQueue("1")
queue.push({"type": "pointer",
"pointer": 1,
"timestamp": "1"})
queue.push({"type": "unknown",
"timestamp": "1"})
self.assertEqual(queue.contents(),
[{'id': 0,
'type': 'pointer',
"pointer": 1,
"timestamp": "1"},
{'id': 1,
'type': 'unknown',
"timestamp": "1"}])