mirror of
https://github.com/zulip/zulip.git
synced 2025-11-05 14:35:27 +00:00
We now use the same custom matcher for all of our EventRegisterTest
tests. The "state" that we're tracking has three lists that aren't
really order sensitive, so we turn them into dictionaries keyed
by the primary keys in their structs. Here are the lists:
realm_users
subscriptions
unsubscribed
(imported from commit 53787c56722b69640368c1b5d67d5d4757f84718)
381 lines
17 KiB
Python
381 lines
17 KiB
Python
# -*- coding: utf-8 -*-
|
|
from django.test import TestCase
|
|
|
|
from zerver.models import (
|
|
get_client, get_realm, get_stream, get_user_profile_by_email,
|
|
Recipient,
|
|
)
|
|
|
|
from zerver.lib.actions import (
|
|
apply_events,
|
|
create_stream_if_needed,
|
|
do_add_alert_words,
|
|
do_add_realm_filter,
|
|
do_add_realm_emoji,
|
|
do_change_full_name,
|
|
do_change_is_admin,
|
|
do_change_stream_description,
|
|
do_remove_alert_words,
|
|
do_remove_realm_emoji,
|
|
do_remove_realm_filter,
|
|
do_remove_subscription,
|
|
do_rename_stream,
|
|
do_set_muted_topics,
|
|
do_set_realm_name,
|
|
do_update_pointer,
|
|
fetch_initial_state_data,
|
|
)
|
|
|
|
from zerver.lib.event_queue import allocate_client_descriptor
|
|
from zerver.lib.test_helpers import AuthedTestCase, POSTRequestMock
|
|
from zerver.tornadoviews import get_events_backend
|
|
|
|
from collections import OrderedDict
|
|
import ujson
|
|
|
|
|
|
class GetEventsTest(AuthedTestCase):
|
|
def tornado_call(self, view_func, user_profile, post_data,
|
|
callback=None):
|
|
request = POSTRequestMock(post_data, user_profile, callback)
|
|
return view_func(request, user_profile)
|
|
|
|
def test_get_events(self):
|
|
email = "hamlet@zulip.com"
|
|
recipient_email = "othello@zulip.com"
|
|
user_profile = get_user_profile_by_email(email)
|
|
recipient_user_profile = get_user_profile_by_email(recipient_email)
|
|
self.login(email)
|
|
|
|
result = self.tornado_call(get_events_backend, user_profile,
|
|
{"apply_markdown": ujson.dumps(True),
|
|
"event_types": ujson.dumps(["message"]),
|
|
"user_client": "website",
|
|
"dont_block": ujson.dumps(True),
|
|
})
|
|
self.assert_json_success(result)
|
|
queue_id = ujson.loads(result.content)["queue_id"]
|
|
|
|
recipient_result = self.tornado_call(get_events_backend, recipient_user_profile,
|
|
{"apply_markdown": ujson.dumps(True),
|
|
"event_types": ujson.dumps(["message"]),
|
|
"user_client": "website",
|
|
"dont_block": ujson.dumps(True),
|
|
})
|
|
self.assert_json_success(recipient_result)
|
|
recipient_queue_id = ujson.loads(recipient_result.content)["queue_id"]
|
|
|
|
result = self.tornado_call(get_events_backend, user_profile,
|
|
{"queue_id": queue_id,
|
|
"user_client": "website",
|
|
"last_event_id": -1,
|
|
"dont_block": ujson.dumps(True),
|
|
})
|
|
events = ujson.loads(result.content)["events"]
|
|
self.assert_json_success(result)
|
|
self.assert_length(events, 0, True)
|
|
|
|
local_id = 10.01
|
|
self.send_message(email, recipient_email, Recipient.PERSONAL, "hello", local_id=local_id, sender_queue_id=queue_id)
|
|
|
|
result = self.tornado_call(get_events_backend, user_profile,
|
|
{"queue_id": queue_id,
|
|
"user_client": "website",
|
|
"last_event_id": -1,
|
|
"dont_block": ujson.dumps(True),
|
|
})
|
|
events = ujson.loads(result.content)["events"]
|
|
self.assert_json_success(result)
|
|
self.assert_length(events, 1, True)
|
|
self.assertEqual(events[0]["type"], "message")
|
|
self.assertEqual(events[0]["message"]["sender_email"], email)
|
|
self.assertEqual(events[0]["local_message_id"], local_id)
|
|
last_event_id = events[0]["id"]
|
|
local_id += 0.01
|
|
|
|
self.send_message(email, recipient_email, Recipient.PERSONAL, "hello", local_id=local_id, sender_queue_id=queue_id)
|
|
|
|
result = self.tornado_call(get_events_backend, user_profile,
|
|
{"queue_id": queue_id,
|
|
"user_client": "website",
|
|
"last_event_id": last_event_id,
|
|
"dont_block": ujson.dumps(True),
|
|
})
|
|
events = ujson.loads(result.content)["events"]
|
|
self.assert_json_success(result)
|
|
self.assert_length(events, 1, True)
|
|
self.assertEqual(events[0]["type"], "message")
|
|
self.assertEqual(events[0]["message"]["sender_email"], email)
|
|
self.assertEqual(events[0]["local_message_id"], local_id)
|
|
|
|
# Test that the received message in the receiver's event queue
|
|
# exists and does not contain a local id
|
|
recipient_result = self.tornado_call(get_events_backend, recipient_user_profile,
|
|
{"queue_id": recipient_queue_id,
|
|
"user_client": "website",
|
|
"last_event_id": -1,
|
|
"dont_block": ujson.dumps(True),
|
|
})
|
|
recipient_events = ujson.loads(recipient_result.content)["events"]
|
|
self.assert_json_success(recipient_result)
|
|
self.assertEqual(len(recipient_events), 2)
|
|
self.assertEqual(recipient_events[0]["type"], "message")
|
|
self.assertEqual(recipient_events[0]["message"]["sender_email"], email)
|
|
self.assertTrue("local_message_id" not in recipient_events[0])
|
|
self.assertEqual(recipient_events[1]["type"], "message")
|
|
self.assertEqual(recipient_events[1]["message"]["sender_email"], email)
|
|
self.assertTrue("local_message_id" not in recipient_events[1])
|
|
|
|
def test_get_events_narrow(self):
|
|
email = "hamlet@zulip.com"
|
|
user_profile = get_user_profile_by_email(email)
|
|
self.login(email)
|
|
|
|
result = self.tornado_call(get_events_backend, user_profile,
|
|
{"apply_markdown": ujson.dumps(True),
|
|
"event_types": ujson.dumps(["message"]),
|
|
"narrow": ujson.dumps([["stream", "denmark"]]),
|
|
"user_client": "website",
|
|
"dont_block": ujson.dumps(True),
|
|
})
|
|
self.assert_json_success(result)
|
|
queue_id = ujson.loads(result.content)["queue_id"]
|
|
|
|
result = self.tornado_call(get_events_backend, user_profile,
|
|
{"queue_id": queue_id,
|
|
"user_client": "website",
|
|
"last_event_id": -1,
|
|
"dont_block": ujson.dumps(True),
|
|
})
|
|
events = ujson.loads(result.content)["events"]
|
|
self.assert_json_success(result)
|
|
self.assert_length(events, 0, True)
|
|
|
|
self.send_message(email, "othello@zulip.com", Recipient.PERSONAL, "hello")
|
|
self.send_message(email, "Denmark", Recipient.STREAM, "hello")
|
|
|
|
result = self.tornado_call(get_events_backend, user_profile,
|
|
{"queue_id": queue_id,
|
|
"user_client": "website",
|
|
"last_event_id": -1,
|
|
"dont_block": ujson.dumps(True),
|
|
})
|
|
events = ujson.loads(result.content)["events"]
|
|
self.assert_json_success(result)
|
|
self.assert_length(events, 1, True)
|
|
self.assertEqual(events[0]["type"], "message")
|
|
self.assertEqual(events[0]["message"]["display_recipient"], "Denmark")
|
|
|
|
class EventsRegisterTest(AuthedTestCase):
|
|
maxDiff = None
|
|
user_profile = get_user_profile_by_email("hamlet@zulip.com")
|
|
|
|
def do_test(self, action, event_types=None):
|
|
client = allocate_client_descriptor(self.user_profile.id, self.user_profile.realm.id,
|
|
event_types,
|
|
get_client("website"), True, False, 600, [])
|
|
# hybrid_state = initial fetch state + re-applying events triggered by our action
|
|
# normal_state = do action then fetch at the end (the "normal" code path)
|
|
hybrid_state = fetch_initial_state_data(self.user_profile, event_types, "")
|
|
action()
|
|
events = client.event_queue.contents()
|
|
self.assertTrue(len(events) > 0)
|
|
apply_events(hybrid_state, events, self.user_profile)
|
|
|
|
normal_state = fetch_initial_state_data(self.user_profile, event_types, "")
|
|
self.match_states(hybrid_state, normal_state)
|
|
|
|
def match_states(self, state1, state2):
|
|
def normalize(state):
|
|
state['realm_users'] = {u['email']: u for u in state['realm_users']}
|
|
state['subscriptions'] = {u['name']: u for u in state['subscriptions']}
|
|
state['unsubscribed'] = {u['name']: u for u in state['unsubscribed']}
|
|
normalize(state1)
|
|
normalize(state2)
|
|
self.assertEqual(state1, state2)
|
|
|
|
def test_send_message_events(self):
|
|
self.do_test(lambda: self.send_message("hamlet@zulip.com", "Verona", Recipient.STREAM, "hello"))
|
|
|
|
def test_pointer_events(self):
|
|
self.do_test(lambda: do_update_pointer(self.user_profile, 1500))
|
|
|
|
def test_register_events(self):
|
|
self.do_test(lambda: self.register("test1", "test1"))
|
|
|
|
def test_alert_words_events(self):
|
|
self.do_test(lambda: do_add_alert_words(self.user_profile, ["alert_word"]))
|
|
self.do_test(lambda: do_remove_alert_words(self.user_profile, ["alert_word"]))
|
|
|
|
def test_muted_topics_events(self):
|
|
self.do_test(lambda: do_set_muted_topics(self.user_profile, [["Denmark", "topic"]]))
|
|
|
|
def test_change_full_name(self):
|
|
self.do_test(lambda: do_change_full_name(self.user_profile, 'Sir Hamlet'))
|
|
|
|
def test_change_realm_name(self):
|
|
self.do_test(lambda: do_set_realm_name(self.user_profile.realm, 'New Realm Name'))
|
|
|
|
def test_change_is_admin(self):
|
|
# The first False is probably a noop, then we get transitions in both directions.
|
|
for is_admin in [False, True, False]:
|
|
self.do_test(lambda: do_change_is_admin(self.user_profile, is_admin))
|
|
|
|
def test_realm_emoji_events(self):
|
|
self.do_test(lambda: do_add_realm_emoji(get_realm("zulip.com"), "my_emoji",
|
|
"https://realm.com/my_emoji"))
|
|
self.do_test(lambda: do_remove_realm_emoji(get_realm("zulip.com"), "my_emoji"))
|
|
|
|
def test_realm_filter_events(self):
|
|
self.do_test(lambda: do_add_realm_filter(get_realm("zulip.com"), "#[123]",
|
|
"https://realm.com/my_realm_filter/%(id)s"))
|
|
self.do_test(lambda: do_remove_realm_filter(get_realm("zulip.com"), "#[123]"))
|
|
|
|
def test_rename_stream(self):
|
|
realm = get_realm('zulip.com')
|
|
stream, _ = create_stream_if_needed(realm, 'old_name')
|
|
new_name = u'stream with a brand new name'
|
|
self.subscribe_to_stream(self.user_profile.email, stream.name)
|
|
self.do_test(lambda: do_rename_stream(realm, stream.name, new_name))
|
|
|
|
def test_subscribe_events(self):
|
|
self.do_test(lambda: self.subscribe_to_stream("hamlet@zulip.com", "test_stream"))
|
|
self.do_test(lambda: self.subscribe_to_stream("othello@zulip.com", "test_stream"))
|
|
stream = get_stream("test_stream", self.user_profile.realm)
|
|
self.do_test(lambda: do_remove_subscription(get_user_profile_by_email("othello@zulip.com"), stream))
|
|
self.do_test(lambda: do_remove_subscription(get_user_profile_by_email("hamlet@zulip.com"), stream))
|
|
self.do_test(lambda: self.subscribe_to_stream("hamlet@zulip.com", "test_stream"))
|
|
self.do_test(lambda: do_change_stream_description(get_realm('zulip.com'), 'test_stream',
|
|
'new description'))
|
|
|
|
from zerver.lib.event_queue import EventQueue
|
|
class EventQueueTest(TestCase):
|
|
def test_one_event(self):
|
|
queue = EventQueue("1")
|
|
queue.push({"type": "pointer",
|
|
"pointer": 1,
|
|
"timestamp": "1"})
|
|
self.assertFalse(queue.empty())
|
|
self.assertEqual(queue.contents(),
|
|
[{'id': 0,
|
|
'type': 'pointer',
|
|
"pointer": 1,
|
|
"timestamp": "1"}])
|
|
|
|
def test_event_collapsing(self):
|
|
queue = EventQueue("1")
|
|
for pointer_val in xrange(1, 10):
|
|
queue.push({"type": "pointer",
|
|
"pointer": pointer_val,
|
|
"timestamp": str(pointer_val)})
|
|
self.assertEqual(queue.contents(),
|
|
[{'id': 8,
|
|
'type': 'pointer',
|
|
"pointer": 9,
|
|
"timestamp": "9"}])
|
|
|
|
queue = EventQueue("2")
|
|
for pointer_val in xrange(1, 10):
|
|
queue.push({"type": "pointer",
|
|
"pointer": pointer_val,
|
|
"timestamp": str(pointer_val)})
|
|
queue.push({"type": "unknown"})
|
|
queue.push({"type": "restart", "server_generation": "1"})
|
|
for pointer_val in xrange(11, 20):
|
|
queue.push({"type": "pointer",
|
|
"pointer": pointer_val,
|
|
"timestamp": str(pointer_val)})
|
|
queue.push({"type": "restart", "server_generation": "2"})
|
|
self.assertEqual(queue.contents(),
|
|
[{"type": "unknown",
|
|
"id": 9,},
|
|
{'id': 19,
|
|
'type': 'pointer',
|
|
"pointer": 19,
|
|
"timestamp": "19"},
|
|
{"id": 20,
|
|
"type": "restart",
|
|
"server_generation": "2"}])
|
|
for pointer_val in xrange(21, 23):
|
|
queue.push({"type": "pointer",
|
|
"pointer": pointer_val,
|
|
"timestamp": str(pointer_val)})
|
|
self.assertEqual(queue.contents(),
|
|
[{"type": "unknown",
|
|
"id": 9,},
|
|
{'id': 19,
|
|
'type': 'pointer',
|
|
"pointer": 19,
|
|
"timestamp": "19"},
|
|
{"id": 20,
|
|
"type": "restart",
|
|
"server_generation": "2"},
|
|
{'id': 22,
|
|
'type': 'pointer',
|
|
"pointer": 22,
|
|
"timestamp": "22"},
|
|
])
|
|
|
|
def test_flag_add_collapsing(self):
|
|
queue = EventQueue("1")
|
|
queue.push({"type": "update_message_flags",
|
|
"flag": "read",
|
|
"operation": "add",
|
|
"all": False,
|
|
"messages": [1, 2, 3, 4],
|
|
"timestamp": "1"})
|
|
queue.push({"type": "update_message_flags",
|
|
"flag": "read",
|
|
"all": False,
|
|
"operation": "add",
|
|
"messages": [5, 6],
|
|
"timestamp": "1"})
|
|
self.assertEqual(queue.contents(),
|
|
[{'id': 1,
|
|
'type': 'update_message_flags',
|
|
"all": False,
|
|
"flag": "read",
|
|
"operation": "add",
|
|
"messages": [1,2,3,4,5,6],
|
|
"timestamp": "1"}])
|
|
|
|
def test_flag_remove_collapsing(self):
|
|
queue = EventQueue("1")
|
|
queue.push({"type": "update_message_flags",
|
|
"flag": "collapsed",
|
|
"operation": "remove",
|
|
"all": False,
|
|
"messages": [1, 2, 3, 4],
|
|
"timestamp": "1"})
|
|
queue.push({"type": "update_message_flags",
|
|
"flag": "collapsed",
|
|
"all": False,
|
|
"operation": "remove",
|
|
"messages": [5, 6],
|
|
"timestamp": "1"})
|
|
self.assertEqual(queue.contents(),
|
|
[{'id': 1,
|
|
'type': 'update_message_flags',
|
|
"all": False,
|
|
"flag": "collapsed",
|
|
"operation": "remove",
|
|
"messages": [1,2,3,4,5,6],
|
|
"timestamp": "1"}])
|
|
|
|
def test_collapse_event(self):
|
|
queue = EventQueue("1")
|
|
queue.push({"type": "pointer",
|
|
"pointer": 1,
|
|
"timestamp": "1"})
|
|
queue.push({"type": "unknown",
|
|
"timestamp": "1"})
|
|
self.assertEqual(queue.contents(),
|
|
[{'id': 0,
|
|
'type': 'pointer',
|
|
"pointer": 1,
|
|
"timestamp": "1"},
|
|
{'id': 1,
|
|
'type': 'unknown',
|
|
"timestamp": "1"}])
|
|
|