mirror of
https://github.com/zulip/zulip.git
synced 2025-11-19 05:58:25 +00:00
We had a couple tests in EventsRegisterTest that weren't really testing much, because they were going through codepaths with authentication problems or actions that didn't affect our user. We now assert that the code under test generates events. (imported from commit c2f61180cb420d45fa95e137433e9456394bf0ff)
391 lines
18 KiB
Python
391 lines
18 KiB
Python
# -*- coding: utf-8 -*-
|
|
from django.test import TestCase
|
|
|
|
from zerver.models import (
|
|
get_client, get_realm, get_stream, get_user_profile_by_email,
|
|
Recipient,
|
|
)
|
|
|
|
from zerver.lib.actions import (
|
|
apply_events,
|
|
create_stream_if_needed,
|
|
do_add_alert_words,
|
|
do_add_realm_filter,
|
|
do_add_realm_emoji,
|
|
do_change_full_name,
|
|
do_change_is_admin,
|
|
do_change_stream_description,
|
|
do_remove_alert_words,
|
|
do_remove_realm_emoji,
|
|
do_remove_realm_filter,
|
|
do_remove_subscription,
|
|
do_rename_stream,
|
|
do_set_muted_topics,
|
|
do_set_realm_name,
|
|
do_update_pointer,
|
|
fetch_initial_state_data,
|
|
)
|
|
|
|
from zerver.lib.event_queue import allocate_client_descriptor
|
|
from zerver.lib.test_helpers import AuthedTestCase, POSTRequestMock
|
|
from zerver.tornadoviews import get_events_backend
|
|
|
|
from collections import OrderedDict
|
|
import ujson
|
|
|
|
|
|
class GetEventsTest(AuthedTestCase):
|
|
def tornado_call(self, view_func, user_profile, post_data,
|
|
callback=None):
|
|
request = POSTRequestMock(post_data, user_profile, callback)
|
|
return view_func(request, user_profile)
|
|
|
|
def test_get_events(self):
|
|
email = "hamlet@zulip.com"
|
|
recipient_email = "othello@zulip.com"
|
|
user_profile = get_user_profile_by_email(email)
|
|
recipient_user_profile = get_user_profile_by_email(recipient_email)
|
|
self.login(email)
|
|
|
|
result = self.tornado_call(get_events_backend, user_profile,
|
|
{"apply_markdown": ujson.dumps(True),
|
|
"event_types": ujson.dumps(["message"]),
|
|
"user_client": "website",
|
|
"dont_block": ujson.dumps(True),
|
|
})
|
|
self.assert_json_success(result)
|
|
queue_id = ujson.loads(result.content)["queue_id"]
|
|
|
|
recipient_result = self.tornado_call(get_events_backend, recipient_user_profile,
|
|
{"apply_markdown": ujson.dumps(True),
|
|
"event_types": ujson.dumps(["message"]),
|
|
"user_client": "website",
|
|
"dont_block": ujson.dumps(True),
|
|
})
|
|
self.assert_json_success(recipient_result)
|
|
recipient_queue_id = ujson.loads(recipient_result.content)["queue_id"]
|
|
|
|
result = self.tornado_call(get_events_backend, user_profile,
|
|
{"queue_id": queue_id,
|
|
"user_client": "website",
|
|
"last_event_id": -1,
|
|
"dont_block": ujson.dumps(True),
|
|
})
|
|
events = ujson.loads(result.content)["events"]
|
|
self.assert_json_success(result)
|
|
self.assert_length(events, 0, True)
|
|
|
|
local_id = 10.01
|
|
self.send_message(email, recipient_email, Recipient.PERSONAL, "hello", local_id=local_id, sender_queue_id=queue_id)
|
|
|
|
result = self.tornado_call(get_events_backend, user_profile,
|
|
{"queue_id": queue_id,
|
|
"user_client": "website",
|
|
"last_event_id": -1,
|
|
"dont_block": ujson.dumps(True),
|
|
})
|
|
events = ujson.loads(result.content)["events"]
|
|
self.assert_json_success(result)
|
|
self.assert_length(events, 1, True)
|
|
self.assertEqual(events[0]["type"], "message")
|
|
self.assertEqual(events[0]["message"]["sender_email"], email)
|
|
self.assertEqual(events[0]["local_message_id"], local_id)
|
|
last_event_id = events[0]["id"]
|
|
local_id += 0.01
|
|
|
|
self.send_message(email, recipient_email, Recipient.PERSONAL, "hello", local_id=local_id, sender_queue_id=queue_id)
|
|
|
|
result = self.tornado_call(get_events_backend, user_profile,
|
|
{"queue_id": queue_id,
|
|
"user_client": "website",
|
|
"last_event_id": last_event_id,
|
|
"dont_block": ujson.dumps(True),
|
|
})
|
|
events = ujson.loads(result.content)["events"]
|
|
self.assert_json_success(result)
|
|
self.assert_length(events, 1, True)
|
|
self.assertEqual(events[0]["type"], "message")
|
|
self.assertEqual(events[0]["message"]["sender_email"], email)
|
|
self.assertEqual(events[0]["local_message_id"], local_id)
|
|
|
|
# Test that the received message in the receiver's event queue
|
|
# exists and does not contain a local id
|
|
recipient_result = self.tornado_call(get_events_backend, recipient_user_profile,
|
|
{"queue_id": recipient_queue_id,
|
|
"user_client": "website",
|
|
"last_event_id": -1,
|
|
"dont_block": ujson.dumps(True),
|
|
})
|
|
recipient_events = ujson.loads(recipient_result.content)["events"]
|
|
self.assert_json_success(recipient_result)
|
|
self.assertEqual(len(recipient_events), 2)
|
|
self.assertEqual(recipient_events[0]["type"], "message")
|
|
self.assertEqual(recipient_events[0]["message"]["sender_email"], email)
|
|
self.assertTrue("local_message_id" not in recipient_events[0])
|
|
self.assertEqual(recipient_events[1]["type"], "message")
|
|
self.assertEqual(recipient_events[1]["message"]["sender_email"], email)
|
|
self.assertTrue("local_message_id" not in recipient_events[1])
|
|
|
|
def test_get_events_narrow(self):
|
|
email = "hamlet@zulip.com"
|
|
user_profile = get_user_profile_by_email(email)
|
|
self.login(email)
|
|
|
|
result = self.tornado_call(get_events_backend, user_profile,
|
|
{"apply_markdown": ujson.dumps(True),
|
|
"event_types": ujson.dumps(["message"]),
|
|
"narrow": ujson.dumps([["stream", "denmark"]]),
|
|
"user_client": "website",
|
|
"dont_block": ujson.dumps(True),
|
|
})
|
|
self.assert_json_success(result)
|
|
queue_id = ujson.loads(result.content)["queue_id"]
|
|
|
|
result = self.tornado_call(get_events_backend, user_profile,
|
|
{"queue_id": queue_id,
|
|
"user_client": "website",
|
|
"last_event_id": -1,
|
|
"dont_block": ujson.dumps(True),
|
|
})
|
|
events = ujson.loads(result.content)["events"]
|
|
self.assert_json_success(result)
|
|
self.assert_length(events, 0, True)
|
|
|
|
self.send_message(email, "othello@zulip.com", Recipient.PERSONAL, "hello")
|
|
self.send_message(email, "Denmark", Recipient.STREAM, "hello")
|
|
|
|
result = self.tornado_call(get_events_backend, user_profile,
|
|
{"queue_id": queue_id,
|
|
"user_client": "website",
|
|
"last_event_id": -1,
|
|
"dont_block": ujson.dumps(True),
|
|
})
|
|
events = ujson.loads(result.content)["events"]
|
|
self.assert_json_success(result)
|
|
self.assert_length(events, 1, True)
|
|
self.assertEqual(events[0]["type"], "message")
|
|
self.assertEqual(events[0]["message"]["display_recipient"], "Denmark")
|
|
|
|
class EventsRegisterTest(AuthedTestCase):
|
|
maxDiff = None
|
|
user_profile = get_user_profile_by_email("hamlet@zulip.com")
|
|
|
|
def do_test(self, action, event_types=None, matcher=None):
|
|
client = allocate_client_descriptor(self.user_profile.id, self.user_profile.realm.id,
|
|
event_types,
|
|
get_client("website"), True, False, 600, [])
|
|
# hybrid_state = initial fetch state + re-applying events triggered by our action
|
|
# normal_state = do action then fetch at the end (the "normal" code path)
|
|
hybrid_state = fetch_initial_state_data(self.user_profile, event_types, "")
|
|
action()
|
|
events = client.event_queue.contents()
|
|
self.assertTrue(len(events) > 0)
|
|
apply_events(hybrid_state, events, self.user_profile)
|
|
|
|
normal_state = fetch_initial_state_data(self.user_profile, event_types, "")
|
|
|
|
if matcher is None:
|
|
matcher = self.assertEqual
|
|
|
|
matcher(hybrid_state, normal_state)
|
|
|
|
def match_with_reorder(self, a, b, field):
|
|
# We need to use an OrderedDict to turn these into strings consistently
|
|
self.assertEqual(set(ujson.dumps(OrderedDict(x.items())) for x in a[field]),
|
|
set(ujson.dumps(OrderedDict(x.items())) for x in b[field]))
|
|
a[field] = []
|
|
b[field] = []
|
|
self.assertEqual(a, b)
|
|
|
|
def test_send_message_events(self):
|
|
self.do_test(lambda: self.send_message("hamlet@zulip.com", "Verona", Recipient.STREAM, "hello"))
|
|
|
|
def test_pointer_events(self):
|
|
self.do_test(lambda: do_update_pointer(self.user_profile, 1500))
|
|
|
|
def test_register_events(self):
|
|
self.do_test(lambda: self.register("test1", "test1"),
|
|
matcher=lambda a, b: self.match_with_reorder(a, b, "realm_users"))
|
|
|
|
def test_alert_words_events(self):
|
|
self.do_test(lambda: do_add_alert_words(self.user_profile, ["alert_word"]))
|
|
self.do_test(lambda: do_remove_alert_words(self.user_profile, ["alert_word"]))
|
|
|
|
def test_muted_topics_events(self):
|
|
self.do_test(lambda: do_set_muted_topics(self.user_profile, [["Denmark", "topic"]]))
|
|
|
|
def test_change_full_name(self):
|
|
self.do_test(lambda: do_change_full_name(self.user_profile, 'Sir Hamlet'))
|
|
|
|
def test_change_realm_name(self):
|
|
self.do_test(lambda: do_set_realm_name(self.user_profile.realm, 'New Realm Name'))
|
|
|
|
def test_change_is_admin(self):
|
|
# The first False is probably a noop, then we get transitions in both directions.
|
|
for is_admin in [False, True, False]:
|
|
self.do_test(lambda: do_change_is_admin(self.user_profile, is_admin))
|
|
|
|
def test_realm_emoji_events(self):
|
|
self.do_test(lambda: do_add_realm_emoji(get_realm("zulip.com"), "my_emoji",
|
|
"https://realm.com/my_emoji"))
|
|
self.do_test(lambda: do_remove_realm_emoji(get_realm("zulip.com"), "my_emoji"))
|
|
|
|
def test_realm_filter_events(self):
|
|
self.do_test(lambda: do_add_realm_filter(get_realm("zulip.com"), "#[123]",
|
|
"https://realm.com/my_realm_filter/%(id)s"))
|
|
self.do_test(lambda: do_remove_realm_filter(get_realm("zulip.com"), "#[123]"))
|
|
|
|
def test_rename_stream(self):
|
|
realm = get_realm('zulip.com')
|
|
stream, _ = create_stream_if_needed(realm, 'old_name')
|
|
new_name = u'stream with a brand new name'
|
|
self.subscribe_to_stream(self.user_profile.email, stream.name)
|
|
self.do_test(lambda: do_rename_stream(realm, stream.name, new_name))
|
|
|
|
def test_subscribe_events(self):
|
|
self.do_test(lambda: self.subscribe_to_stream("hamlet@zulip.com", "test_stream"),
|
|
matcher=lambda a, b: self.match_with_reorder(a, b, "subscriptions"))
|
|
self.do_test(lambda: self.subscribe_to_stream("othello@zulip.com", "test_stream"),
|
|
matcher=lambda a, b: self.match_with_reorder(a, b, "subscriptions"))
|
|
stream = get_stream("test_stream", self.user_profile.realm)
|
|
self.do_test(lambda: do_remove_subscription(get_user_profile_by_email("othello@zulip.com"), stream),
|
|
matcher=lambda a, b: self.match_with_reorder(a, b, "subscriptions"))
|
|
self.do_test(lambda: do_remove_subscription(get_user_profile_by_email("hamlet@zulip.com"), stream),
|
|
matcher=lambda a, b: self.match_with_reorder(a, b, "unsubscribed"))
|
|
self.do_test(lambda: self.subscribe_to_stream("hamlet@zulip.com", "test_stream"),
|
|
matcher=lambda a, b: self.match_with_reorder(a, b, "subscriptions"))
|
|
self.do_test(lambda: do_change_stream_description(get_realm('zulip.com'), 'test_stream',
|
|
'new description'),
|
|
matcher=lambda a, b: self.match_with_reorder(a, b, "subscriptions"))
|
|
|
|
from zerver.lib.event_queue import EventQueue
|
|
class EventQueueTest(TestCase):
|
|
def test_one_event(self):
|
|
queue = EventQueue("1")
|
|
queue.push({"type": "pointer",
|
|
"pointer": 1,
|
|
"timestamp": "1"})
|
|
self.assertFalse(queue.empty())
|
|
self.assertEqual(queue.contents(),
|
|
[{'id': 0,
|
|
'type': 'pointer',
|
|
"pointer": 1,
|
|
"timestamp": "1"}])
|
|
|
|
def test_event_collapsing(self):
|
|
queue = EventQueue("1")
|
|
for pointer_val in xrange(1, 10):
|
|
queue.push({"type": "pointer",
|
|
"pointer": pointer_val,
|
|
"timestamp": str(pointer_val)})
|
|
self.assertEqual(queue.contents(),
|
|
[{'id': 8,
|
|
'type': 'pointer',
|
|
"pointer": 9,
|
|
"timestamp": "9"}])
|
|
|
|
queue = EventQueue("2")
|
|
for pointer_val in xrange(1, 10):
|
|
queue.push({"type": "pointer",
|
|
"pointer": pointer_val,
|
|
"timestamp": str(pointer_val)})
|
|
queue.push({"type": "unknown"})
|
|
queue.push({"type": "restart", "server_generation": "1"})
|
|
for pointer_val in xrange(11, 20):
|
|
queue.push({"type": "pointer",
|
|
"pointer": pointer_val,
|
|
"timestamp": str(pointer_val)})
|
|
queue.push({"type": "restart", "server_generation": "2"})
|
|
self.assertEqual(queue.contents(),
|
|
[{"type": "unknown",
|
|
"id": 9,},
|
|
{'id': 19,
|
|
'type': 'pointer',
|
|
"pointer": 19,
|
|
"timestamp": "19"},
|
|
{"id": 20,
|
|
"type": "restart",
|
|
"server_generation": "2"}])
|
|
for pointer_val in xrange(21, 23):
|
|
queue.push({"type": "pointer",
|
|
"pointer": pointer_val,
|
|
"timestamp": str(pointer_val)})
|
|
self.assertEqual(queue.contents(),
|
|
[{"type": "unknown",
|
|
"id": 9,},
|
|
{'id': 19,
|
|
'type': 'pointer',
|
|
"pointer": 19,
|
|
"timestamp": "19"},
|
|
{"id": 20,
|
|
"type": "restart",
|
|
"server_generation": "2"},
|
|
{'id': 22,
|
|
'type': 'pointer',
|
|
"pointer": 22,
|
|
"timestamp": "22"},
|
|
])
|
|
|
|
def test_flag_add_collapsing(self):
|
|
queue = EventQueue("1")
|
|
queue.push({"type": "update_message_flags",
|
|
"flag": "read",
|
|
"operation": "add",
|
|
"all": False,
|
|
"messages": [1, 2, 3, 4],
|
|
"timestamp": "1"})
|
|
queue.push({"type": "update_message_flags",
|
|
"flag": "read",
|
|
"all": False,
|
|
"operation": "add",
|
|
"messages": [5, 6],
|
|
"timestamp": "1"})
|
|
self.assertEqual(queue.contents(),
|
|
[{'id': 1,
|
|
'type': 'update_message_flags',
|
|
"all": False,
|
|
"flag": "read",
|
|
"operation": "add",
|
|
"messages": [1,2,3,4,5,6],
|
|
"timestamp": "1"}])
|
|
|
|
def test_flag_remove_collapsing(self):
|
|
queue = EventQueue("1")
|
|
queue.push({"type": "update_message_flags",
|
|
"flag": "collapsed",
|
|
"operation": "remove",
|
|
"all": False,
|
|
"messages": [1, 2, 3, 4],
|
|
"timestamp": "1"})
|
|
queue.push({"type": "update_message_flags",
|
|
"flag": "collapsed",
|
|
"all": False,
|
|
"operation": "remove",
|
|
"messages": [5, 6],
|
|
"timestamp": "1"})
|
|
self.assertEqual(queue.contents(),
|
|
[{'id': 1,
|
|
'type': 'update_message_flags',
|
|
"all": False,
|
|
"flag": "collapsed",
|
|
"operation": "remove",
|
|
"messages": [1,2,3,4,5,6],
|
|
"timestamp": "1"}])
|
|
|
|
def test_collapse_event(self):
|
|
queue = EventQueue("1")
|
|
queue.push({"type": "pointer",
|
|
"pointer": 1,
|
|
"timestamp": "1"})
|
|
queue.push({"type": "unknown",
|
|
"timestamp": "1"})
|
|
self.assertEqual(queue.contents(),
|
|
[{'id': 0,
|
|
'type': 'pointer',
|
|
"pointer": 1,
|
|
"timestamp": "1"},
|
|
{'id': 1,
|
|
'type': 'unknown',
|
|
"timestamp": "1"}])
|
|
|