Files
zulip/zerver/tests/test_messages.py

924 lines
38 KiB
Python

import datetime
from typing import Any, Dict, List
from unittest import mock
import ujson
from django.http import HttpResponse
from django.utils.timezone import now as timezone_now
from analytics.lib.counts import COUNT_STATS
from analytics.models import RealmCount
from zerver.decorator import JsonableError
from zerver.lib.actions import (
check_message,
do_change_stream_invite_only,
do_claim_attachments,
do_create_user,
do_update_message,
gather_subscriptions_helper,
get_active_presence_idle_user_ids,
get_client,
get_last_message_id,
send_rate_limited_pm_notification_to_bot_owner,
)
from zerver.lib.addressee import Addressee
from zerver.lib.markdown import MentionData
from zerver.lib.message import (
bulk_access_messages,
get_first_visible_message_id,
maybe_update_first_visible_message_id,
render_markdown,
update_first_visible_message_id,
)
from zerver.lib.test_classes import ZulipTestCase
from zerver.lib.test_helpers import (
make_client,
message_stream_count,
most_recent_message,
most_recent_usermessage,
)
from zerver.lib.topic import DB_TOPIC_NAME
from zerver.lib.upload import create_attachment
from zerver.lib.url_encoding import near_message_url
from zerver.models import (
Attachment,
Message,
Recipient,
Subscription,
UserMessage,
UserPresence,
UserProfile,
bulk_get_huddle_user_ids,
get_huddle_user_ids,
get_realm,
)
class MiscMessageTest(ZulipTestCase):
def test_get_last_message_id(self) -> None:
self.assertEqual(
get_last_message_id(),
Message.objects.latest('id').id,
)
Message.objects.all().delete()
self.assertEqual(get_last_message_id(), -1)
class PersonalMessagesTest(ZulipTestCase):
def test_near_pm_message_url(self) -> None:
realm = get_realm('zulip')
message = dict(
type='personal',
id=555,
display_recipient=[
dict(id=77),
dict(id=80),
],
)
url = near_message_url(
realm=realm,
message=message,
)
self.assertEqual(url, 'http://zulip.testserver/#narrow/pm-with/77,80-pm/near/555')
def test_is_private_flag_not_leaked(self) -> None:
"""
Make sure `is_private` flag is not leaked to the API.
"""
self.login('hamlet')
self.send_personal_message(self.example_user("hamlet"),
self.example_user("cordelia"),
"test")
for msg in self.get_messages():
self.assertNotIn('is_private', msg['flags'])
def test_auto_subbed_to_personals(self) -> None:
"""
Newly created users are auto-subbed to the ability to receive
personals.
"""
test_email = self.nonreg_email('test')
self.register(test_email, "test")
user_profile = self.nonreg_user('test')
old_messages_count = message_stream_count(user_profile)
self.send_personal_message(user_profile, user_profile)
new_messages_count = message_stream_count(user_profile)
self.assertEqual(new_messages_count, old_messages_count + 1)
recipient = Recipient.objects.get(type_id=user_profile.id,
type=Recipient.PERSONAL)
message = most_recent_message(user_profile)
self.assertEqual(message.recipient, recipient)
with mock.patch('zerver.models.get_display_recipient', return_value='recip'):
self.assertEqual(
str(message),
'<Message: recip / / '
'<UserProfile: {} {}>>'.format(user_profile.email, user_profile.realm))
user_message = most_recent_usermessage(user_profile)
self.assertEqual(
str(user_message),
f'<UserMessage: recip / {user_profile.email} ([])>',
)
class MessageAccessTests(ZulipTestCase):
def test_update_invalid_flags(self) -> None:
message = self.send_personal_message(
self.example_user("cordelia"),
self.example_user("hamlet"),
"hello",
)
self.login('hamlet')
result = self.client_post("/json/messages/flags",
{"messages": ujson.dumps([message]),
"op": "add",
"flag": "invalid"})
self.assert_json_error(result, "Invalid flag: 'invalid'")
result = self.client_post("/json/messages/flags",
{"messages": ujson.dumps([message]),
"op": "add",
"flag": "is_private"})
self.assert_json_error(result, "Invalid flag: 'is_private'")
result = self.client_post("/json/messages/flags",
{"messages": ujson.dumps([message]),
"op": "add",
"flag": "active_mobile_push_notification"})
self.assert_json_error(result, "Invalid flag: 'active_mobile_push_notification'")
result = self.client_post("/json/messages/flags",
{"messages": ujson.dumps([message]),
"op": "add",
"flag": "mentioned"})
self.assert_json_error(result, "Flag not editable: 'mentioned'")
def change_star(self, messages: List[int], add: bool=True, **kwargs: Any) -> HttpResponse:
return self.client_post("/json/messages/flags",
{"messages": ujson.dumps(messages),
"op": "add" if add else "remove",
"flag": "starred"},
**kwargs)
def test_change_star(self) -> None:
"""
You can set a message as starred/un-starred through
POST /json/messages/flags.
"""
self.login('hamlet')
message_ids = [self.send_personal_message(self.example_user("hamlet"),
self.example_user("hamlet"),
"test")]
# Star a message.
result = self.change_star(message_ids)
self.assert_json_success(result)
for msg in self.get_messages():
if msg['id'] in message_ids:
self.assertEqual(msg['flags'], ['starred'])
else:
self.assertEqual(msg['flags'], ['read'])
result = self.change_star(message_ids, False)
self.assert_json_success(result)
# Remove the stars.
for msg in self.get_messages():
if msg['id'] in message_ids:
self.assertEqual(msg['flags'], [])
def test_change_star_public_stream_historical(self) -> None:
"""
You can set a message as starred/un-starred through
POST /json/messages/flags.
"""
stream_name = "new_stream"
self.subscribe(self.example_user("hamlet"), stream_name)
self.login('hamlet')
message_ids = [
self.send_stream_message(self.example_user("hamlet"), stream_name, "test"),
]
# Send a second message so we can verify it isn't modified
other_message_ids = [
self.send_stream_message(self.example_user("hamlet"), stream_name, "test_unused"),
]
received_message_ids = [
self.send_personal_message(
self.example_user("hamlet"),
self.example_user("cordelia"),
"test_received",
),
]
# Now login as another user who wasn't on that stream
self.login('cordelia')
# Send a message to yourself to make sure we have at least one with the read flag
sent_message_ids = [
self.send_personal_message(
self.example_user("cordelia"),
self.example_user("cordelia"),
"test_read_message",
),
]
result = self.client_post("/json/messages/flags",
{"messages": ujson.dumps(sent_message_ids),
"op": "add",
"flag": "read"})
# We can't change flags other than "starred" on historical messages:
result = self.client_post("/json/messages/flags",
{"messages": ujson.dumps(message_ids),
"op": "add",
"flag": "read"})
self.assert_json_error(result, 'Invalid message(s)')
# Trying to change a list of more than one historical message fails
result = self.change_star(message_ids * 2)
self.assert_json_error(result, 'Invalid message(s)')
# Confirm that one can change the historical flag now
result = self.change_star(message_ids)
self.assert_json_success(result)
for msg in self.get_messages():
if msg['id'] in message_ids:
self.assertEqual(set(msg['flags']), {'starred', 'historical', 'read'})
elif msg['id'] in received_message_ids:
self.assertEqual(msg['flags'], [])
else:
self.assertEqual(msg['flags'], ['read'])
self.assertNotIn(msg['id'], other_message_ids)
result = self.change_star(message_ids, False)
self.assert_json_success(result)
# But it still doesn't work if you're in another realm
user = self.mit_user('sipbtest')
self.login_user(user)
result = self.change_star(message_ids, subdomain="zephyr")
self.assert_json_error(result, 'Invalid message(s)')
def test_change_star_private_message_security(self) -> None:
"""
You can set a message as starred/un-starred through
POST /json/messages/flags.
"""
self.login('hamlet')
message_ids = [
self.send_personal_message(
self.example_user("hamlet"),
self.example_user("hamlet"),
"test",
),
]
# Starring private messages you didn't receive fails.
self.login('cordelia')
result = self.change_star(message_ids)
self.assert_json_error(result, 'Invalid message(s)')
def test_change_star_private_stream_security(self) -> None:
stream_name = "private_stream"
self.make_stream(stream_name, invite_only=True)
self.subscribe(self.example_user("hamlet"), stream_name)
self.login('hamlet')
message_ids = [
self.send_stream_message(self.example_user("hamlet"), stream_name, "test"),
]
# Starring private stream messages you received works
result = self.change_star(message_ids)
self.assert_json_success(result)
# Starring private stream messages you didn't receive fails.
self.login('cordelia')
result = self.change_star(message_ids)
self.assert_json_error(result, 'Invalid message(s)')
stream_name = "private_stream_2"
self.make_stream(stream_name, invite_only=True,
history_public_to_subscribers=True)
self.subscribe(self.example_user("hamlet"), stream_name)
self.login('hamlet')
message_ids = [
self.send_stream_message(self.example_user("hamlet"), stream_name, "test"),
]
# With stream.history_public_to_subscribers = True, you still
# can't see it if you didn't receive the message and are
# not subscribed.
self.login('cordelia')
result = self.change_star(message_ids)
self.assert_json_error(result, 'Invalid message(s)')
# But if you subscribe, then you can star the message
self.subscribe(self.example_user("cordelia"), stream_name)
result = self.change_star(message_ids)
self.assert_json_success(result)
def test_new_message(self) -> None:
"""
New messages aren't starred.
"""
sender = self.example_user('hamlet')
self.login_user(sender)
content = "Test message for star"
self.send_stream_message(sender, "Verona",
content=content)
sent_message = UserMessage.objects.filter(
user_profile=self.example_user('hamlet'),
).order_by("id").reverse()[0]
self.assertEqual(sent_message.message.content, content)
self.assertFalse(sent_message.flags.starred)
def test_change_star_public_stream_security_for_guest_user(self) -> None:
# Guest user can't access(star) unsubscribed public stream messages
normal_user = self.example_user("hamlet")
stream_name = "public_stream"
self.make_stream(stream_name)
self.subscribe(normal_user, stream_name)
self.login_user(normal_user)
message_id = [
self.send_stream_message(normal_user, stream_name, "test 1"),
]
guest_user = self.example_user('polonius')
self.login_user(guest_user)
result = self.change_star(message_id)
self.assert_json_error(result, 'Invalid message(s)')
# Subscribed guest users can access public stream messages sent before they join
self.subscribe(guest_user, stream_name)
result = self.change_star(message_id)
self.assert_json_success(result)
# And messages sent after they join
self.login_user(normal_user)
message_id = [
self.send_stream_message(normal_user, stream_name, "test 2"),
]
self.login_user(guest_user)
result = self.change_star(message_id)
self.assert_json_success(result)
def test_change_star_private_stream_security_for_guest_user(self) -> None:
# Guest users can't access(star) unsubscribed private stream messages
normal_user = self.example_user("hamlet")
stream_name = "private_stream"
stream = self.make_stream(stream_name, invite_only=True)
self.subscribe(normal_user, stream_name)
self.login_user(normal_user)
message_id = [
self.send_stream_message(normal_user, stream_name, "test 1"),
]
guest_user = self.example_user('polonius')
self.login_user(guest_user)
result = self.change_star(message_id)
self.assert_json_error(result, 'Invalid message(s)')
# Guest user can't access messages of subscribed private streams if
# history is not public to subscribers
self.subscribe(guest_user, stream_name)
result = self.change_star(message_id)
self.assert_json_error(result, 'Invalid message(s)')
# Guest user can access messages of subscribed private streams if
# history is public to subscribers
do_change_stream_invite_only(stream, True, history_public_to_subscribers=True)
result = self.change_star(message_id)
self.assert_json_success(result)
# With history not public to subscribers, they can still see new messages
do_change_stream_invite_only(stream, True, history_public_to_subscribers=False)
self.login_user(normal_user)
message_id = [
self.send_stream_message(normal_user, stream_name, "test 2"),
]
self.login_user(guest_user)
result = self.change_star(message_id)
self.assert_json_success(result)
def test_bulk_access_messages_private_stream(self) -> None:
user = self.example_user("hamlet")
self.login_user(user)
stream_name = "private_stream"
stream = self.make_stream(stream_name, invite_only=True,
history_public_to_subscribers=False)
self.subscribe(user, stream_name)
# Send a message before subscribing a new user to stream
message_one_id = self.send_stream_message(user,
stream_name, "Message one")
later_subscribed_user = self.example_user("cordelia")
# Subscribe a user to private-protected history stream
self.subscribe(later_subscribed_user, stream_name)
# Send a message after subscribing a new user to stream
message_two_id = self.send_stream_message(user,
stream_name, "Message two")
message_ids = [message_one_id, message_two_id]
messages = [Message.objects.select_related().get(id=message_id)
for message_id in message_ids]
filtered_messages = bulk_access_messages(later_subscribed_user, messages)
# Message sent before subscribing wouldn't be accessible by later
# subscribed user as stream has protected history
self.assertEqual(len(filtered_messages), 1)
self.assertEqual(filtered_messages[0].id, message_two_id)
do_change_stream_invite_only(stream, True, history_public_to_subscribers=True)
filtered_messages = bulk_access_messages(later_subscribed_user, messages)
# Message sent before subscribing are accessible by 8user as stream
# don't have protected history
self.assertEqual(len(filtered_messages), 2)
# Testing messages accessiblity for an unsubscribed user
unsubscribed_user = self.example_user("ZOE")
filtered_messages = bulk_access_messages(unsubscribed_user, messages)
self.assertEqual(len(filtered_messages), 0)
def test_bulk_access_messages_public_stream(self) -> None:
user = self.example_user("hamlet")
self.login_user(user)
# Testing messages accessiblity including a public stream message
stream_name = "public_stream"
self.subscribe(user, stream_name)
message_one_id = self.send_stream_message(user,
stream_name, "Message one")
later_subscribed_user = self.example_user("cordelia")
self.subscribe(later_subscribed_user, stream_name)
# Send a message after subscribing a new user to stream
message_two_id = self.send_stream_message(user,
stream_name, "Message two")
message_ids = [message_one_id, message_two_id]
messages = [Message.objects.select_related().get(id=message_id)
for message_id in message_ids]
# All public stream messages are always accessible
filtered_messages = bulk_access_messages(later_subscribed_user, messages)
self.assertEqual(len(filtered_messages), 2)
unsubscribed_user = self.example_user("ZOE")
filtered_messages = bulk_access_messages(unsubscribed_user, messages)
self.assertEqual(len(filtered_messages), 2)
class MessageHasKeywordsTest(ZulipTestCase):
'''Test for keywords like has_link, has_image, has_attachment.'''
def setup_dummy_attachments(self, user_profile: UserProfile) -> List[str]:
sample_size = 10
realm_id = user_profile.realm_id
dummy_files = [
('zulip.txt', f'{realm_id}/31/4CBjtTLYZhk66pZrF8hnYGwc/zulip.txt', sample_size),
('temp_file.py', f'{realm_id}/31/4CBjtTLYZhk66pZrF8hnYGwc/temp_file.py', sample_size),
('abc.py', f'{realm_id}/31/4CBjtTLYZhk66pZrF8hnYGwc/abc.py', sample_size),
]
for file_name, path_id, size in dummy_files:
create_attachment(file_name, path_id, user_profile, size)
# return path ids
return [x[1] for x in dummy_files]
def test_claim_attachment(self) -> None:
user_profile = self.example_user('hamlet')
dummy_path_ids = self.setup_dummy_attachments(user_profile)
dummy_urls = [f"http://zulip.testserver/user_uploads/{x}" for x in dummy_path_ids]
# Send message referring the attachment
self.subscribe(user_profile, "Denmark")
def assert_attachment_claimed(path_id: str, claimed: bool) -> None:
attachment = Attachment.objects.get(path_id=path_id)
self.assertEqual(attachment.is_claimed(), claimed)
# This message should claim attachments 1 only because attachment 2
# is not being parsed as a link by Markdown.
body = ("Some files here ...[zulip.txt]({})" +
"{}.... Some more...." +
"{}").format(dummy_urls[0], dummy_urls[1], dummy_urls[1])
self.send_stream_message(user_profile, "Denmark", body, "test")
assert_attachment_claimed(dummy_path_ids[0], True)
assert_attachment_claimed(dummy_path_ids[1], False)
# This message tries to claim the third attachment but fails because
# Markdown would not set has_attachments = True here.
body = f"Link in code: `{dummy_urls[2]}`"
self.send_stream_message(user_profile, "Denmark", body, "test")
assert_attachment_claimed(dummy_path_ids[2], False)
# Another scenario where we wouldn't parse the link.
body = f"Link to not parse: .{dummy_urls[2]}.`"
self.send_stream_message(user_profile, "Denmark", body, "test")
assert_attachment_claimed(dummy_path_ids[2], False)
# Finally, claim attachment 3.
body = f"Link: {dummy_urls[2]}"
self.send_stream_message(user_profile, "Denmark", body, "test")
assert_attachment_claimed(dummy_path_ids[2], True)
assert_attachment_claimed(dummy_path_ids[1], False)
def test_finds_all_links(self) -> None:
msg_ids = []
msg_contents = ["foo.org", "[bar](baz.gov)", "http://quux.ca"]
for msg_content in msg_contents:
msg_ids.append(self.send_stream_message(self.example_user('hamlet'),
'Denmark', content=msg_content))
msgs = [Message.objects.get(id=id) for id in msg_ids]
self.assertTrue(all([msg.has_link for msg in msgs]))
def test_finds_only_links(self) -> None:
msg_ids = []
msg_contents = ["`example.org`", '``example.org```', '$$https://example.org$$', "foo"]
for msg_content in msg_contents:
msg_ids.append(self.send_stream_message(self.example_user('hamlet'),
'Denmark', content=msg_content))
msgs = [Message.objects.get(id=id) for id in msg_ids]
self.assertFalse(all([msg.has_link for msg in msgs]))
def update_message(self, msg: Message, content: str) -> None:
hamlet = self.example_user('hamlet')
realm_id = hamlet.realm.id
rendered_content = render_markdown(msg, content)
mention_data = MentionData(realm_id, content)
do_update_message(hamlet, msg, None, None, "change_one", False, False, content,
rendered_content, set(), set(), mention_data=mention_data)
def test_finds_link_after_edit(self) -> None:
hamlet = self.example_user('hamlet')
msg_id = self.send_stream_message(hamlet, 'Denmark', content='a')
msg = Message.objects.get(id=msg_id)
self.assertFalse(msg.has_link)
self.update_message(msg, 'a http://foo.com')
self.assertTrue(msg.has_link)
self.update_message(msg, 'a')
self.assertFalse(msg.has_link)
# Check in blockquotes work
self.update_message(msg, '> http://bar.com')
self.assertTrue(msg.has_link)
self.update_message(msg, 'a `http://foo.com`')
self.assertFalse(msg.has_link)
def test_has_image(self) -> None:
msg_ids = []
msg_contents = ["Link: foo.org",
"Image: https://www.google.com/images/srpr/logo4w.png",
"Image: https://www.google.com/images/srpr/logo4w.pdf",
"[Google Link](https://www.google.com/images/srpr/logo4w.png)"]
for msg_content in msg_contents:
msg_ids.append(self.send_stream_message(self.example_user('hamlet'),
'Denmark', content=msg_content))
msgs = [Message.objects.get(id=id) for id in msg_ids]
self.assertEqual([False, True, False, True], [msg.has_image for msg in msgs])
self.update_message(msgs[0], 'https://www.google.com/images/srpr/logo4w.png')
self.assertTrue(msgs[0].has_image)
self.update_message(msgs[0], 'No Image Again')
self.assertFalse(msgs[0].has_image)
def test_has_attachment(self) -> None:
hamlet = self.example_user('hamlet')
dummy_path_ids = self.setup_dummy_attachments(hamlet)
dummy_urls = [f"http://zulip.testserver/user_uploads/{x}" for x in dummy_path_ids]
self.subscribe(hamlet, "Denmark")
body = ("Files ...[zulip.txt]({}) {} {}").format(dummy_urls[0], dummy_urls[1], dummy_urls[2])
msg_id = self.send_stream_message(hamlet, "Denmark", body, "test")
msg = Message.objects.get(id=msg_id)
self.assertTrue(msg.has_attachment)
self.update_message(msg, 'No Attachments')
self.assertFalse(msg.has_attachment)
self.update_message(msg, body)
self.assertTrue(msg.has_attachment)
self.update_message(msg, f'Link in code: `{dummy_urls[1]}`')
self.assertFalse(msg.has_attachment)
# Test blockquotes
self.update_message(msg, f'> {dummy_urls[1]}')
self.assertTrue(msg.has_attachment)
# Additional test to check has_attachment is being set is due to the correct attachment.
self.update_message(msg, f'Outside: {dummy_urls[0]}. In code: `{dummy_urls[1]}`.')
self.assertTrue(msg.has_attachment)
self.assertTrue(msg.attachment_set.filter(path_id=dummy_path_ids[0]))
self.assertEqual(msg.attachment_set.count(), 1)
self.update_message(msg, f'Outside: {dummy_urls[1]}. In code: `{dummy_urls[0]}`.')
self.assertTrue(msg.has_attachment)
self.assertTrue(msg.attachment_set.filter(path_id=dummy_path_ids[1]))
self.assertEqual(msg.attachment_set.count(), 1)
self.update_message(msg, f'Both in code: `{dummy_urls[1]} {dummy_urls[0]}`.')
self.assertFalse(msg.has_attachment)
self.assertEqual(msg.attachment_set.count(), 0)
def test_potential_attachment_path_ids(self) -> None:
hamlet = self.example_user('hamlet')
self.subscribe(hamlet, "Denmark")
dummy_path_ids = self.setup_dummy_attachments(hamlet)
body = "Hello"
msg_id = self.send_stream_message(hamlet, "Denmark", body, "test")
msg = Message.objects.get(id=msg_id)
with mock.patch("zerver.lib.actions.do_claim_attachments",
wraps=do_claim_attachments) as m:
self.update_message(msg, f'[link](http://{hamlet.realm.host}/user_uploads/{dummy_path_ids[0]})')
self.assertTrue(m.called)
m.reset_mock()
self.update_message(msg, f'[link](/user_uploads/{dummy_path_ids[1]})')
self.assertTrue(m.called)
m.reset_mock()
self.update_message(msg, f'[new text link](/user_uploads/{dummy_path_ids[1]})')
self.assertFalse(m.called)
m.reset_mock()
# It's not clear this is correct behavior
self.update_message(msg, f'[link](user_uploads/{dummy_path_ids[2]})')
self.assertFalse(m.called)
m.reset_mock()
self.update_message(msg, f'[link](https://github.com/user_uploads/{dummy_path_ids[0]})')
self.assertFalse(m.called)
m.reset_mock()
class MissedMessageTest(ZulipTestCase):
def test_presence_idle_user_ids(self) -> None:
UserPresence.objects.all().delete()
sender = self.example_user('cordelia')
realm = sender.realm
hamlet = self.example_user('hamlet')
othello = self.example_user('othello')
recipient_ids = {hamlet.id, othello.id}
message_type = 'stream'
user_flags: Dict[int, List[str]] = {}
def assert_missing(user_ids: List[int]) -> None:
presence_idle_user_ids = get_active_presence_idle_user_ids(
realm=realm,
sender_id=sender.id,
message_type=message_type,
active_user_ids=recipient_ids,
user_flags=user_flags,
)
self.assertEqual(sorted(user_ids), sorted(presence_idle_user_ids))
def set_presence(user: UserProfile, client_name: str, ago: int) -> None:
when = timezone_now() - datetime.timedelta(seconds=ago)
UserPresence.objects.create(
user_profile_id=user.id,
realm_id=user.realm_id,
client=get_client(client_name),
timestamp=when,
)
message_type = 'private'
assert_missing([hamlet.id, othello.id])
message_type = 'stream'
user_flags[hamlet.id] = ['mentioned']
assert_missing([hamlet.id])
set_presence(hamlet, 'iPhone', ago=5000)
assert_missing([hamlet.id])
set_presence(hamlet, 'webapp', ago=15)
assert_missing([])
message_type = 'private'
assert_missing([othello.id])
class LogDictTest(ZulipTestCase):
def test_to_log_dict(self) -> None:
user = self.example_user('hamlet')
stream_name = 'Denmark'
topic_name = 'Copenhagen'
content = 'find me some good coffee shops'
message_id = self.send_stream_message(user, stream_name,
topic_name=topic_name,
content=content)
message = Message.objects.get(id=message_id)
dct = message.to_log_dict()
self.assertTrue('timestamp' in dct)
self.assertEqual(dct['content'], 'find me some good coffee shops')
self.assertEqual(dct['id'], message.id)
self.assertEqual(dct['recipient'], 'Denmark')
self.assertEqual(dct['sender_realm_str'], 'zulip')
self.assertEqual(dct['sender_email'], user.email)
self.assertEqual(dct['sender_full_name'], 'King Hamlet')
self.assertEqual(dct['sender_id'], user.id)
self.assertEqual(dct['sender_short_name'], 'hamlet')
self.assertEqual(dct['sending_client'], 'test suite')
self.assertEqual(dct[DB_TOPIC_NAME], 'Copenhagen')
self.assertEqual(dct['type'], 'stream')
class CheckMessageTest(ZulipTestCase):
def test_basic_check_message_call(self) -> None:
sender = self.example_user('othello')
client = make_client(name="test suite")
stream_name = 'España y Francia'
self.make_stream(stream_name)
topic_name = 'issue'
message_content = 'whatever'
addressee = Addressee.for_stream_name(stream_name, topic_name)
ret = check_message(sender, client, addressee, message_content)
self.assertEqual(ret['message'].sender.id, sender.id)
def test_bot_pm_feature(self) -> None:
"""We send a PM to a bot's owner if their bot sends a message to
an unsubscribed stream"""
parent = self.example_user('othello')
bot = do_create_user(
email='othello-bot@zulip.com',
password='',
realm=parent.realm,
full_name='',
short_name='',
bot_type=UserProfile.DEFAULT_BOT,
bot_owner=parent,
)
bot.last_reminder = None
sender = bot
client = make_client(name="test suite")
stream_name = 'Россия'
topic_name = 'issue'
addressee = Addressee.for_stream_name(stream_name, topic_name)
message_content = 'whatever'
old_count = message_stream_count(parent)
# Try sending to stream that doesn't exist sends a reminder to
# the sender
with self.assertRaises(JsonableError):
check_message(sender, client, addressee, message_content)
new_count = message_stream_count(parent)
self.assertEqual(new_count, old_count + 1)
self.assertIn("that stream does not exist.", most_recent_message(parent).content)
# Try sending to stream that exists with no subscribers soon
# after; due to rate-limiting, this should send nothing.
self.make_stream(stream_name)
ret = check_message(sender, client, addressee, message_content)
new_count = message_stream_count(parent)
self.assertEqual(new_count, old_count + 1)
# Try sending to stream that exists with no subscribers longer
# after; this should send an error to the bot owner that the
# stream doesn't exist
assert(sender.last_reminder is not None)
sender.last_reminder = sender.last_reminder - datetime.timedelta(hours=1)
sender.save(update_fields=["last_reminder"])
ret = check_message(sender, client, addressee, message_content)
new_count = message_stream_count(parent)
self.assertEqual(new_count, old_count + 2)
self.assertEqual(ret['message'].sender.email, 'othello-bot@zulip.com')
self.assertIn("does not have any subscribers", most_recent_message(parent).content)
def test_bot_pm_error_handling(self) -> None:
# This just test some defensive code.
cordelia = self.example_user('cordelia')
test_bot = self.create_test_bot(
short_name='test',
user_profile=cordelia,
)
content = 'whatever'
good_realm = test_bot.realm
wrong_realm = get_realm("zephyr")
wrong_sender = cordelia
send_rate_limited_pm_notification_to_bot_owner(test_bot, wrong_realm, content)
self.assertEqual(test_bot.last_reminder, None)
send_rate_limited_pm_notification_to_bot_owner(wrong_sender, good_realm, content)
self.assertEqual(test_bot.last_reminder, None)
test_bot.realm.deactivated = True
send_rate_limited_pm_notification_to_bot_owner(test_bot, good_realm, content)
self.assertEqual(test_bot.last_reminder, None)
class MessageVisibilityTest(ZulipTestCase):
def test_update_first_visible_message_id(self) -> None:
Message.objects.all().delete()
message_ids = [self.send_stream_message(self.example_user("othello"), "Scotland") for i in range(15)]
# If message_visibility_limit is None update_first_visible_message_id
# should set first_visible_message_id to 0
realm = get_realm("zulip")
realm.message_visibility_limit = None
# Setting to a random value other than 0 as the default value of
# first_visible_message_id is 0
realm.first_visible_message_id = 5
realm.save()
update_first_visible_message_id(realm)
self.assertEqual(get_first_visible_message_id(realm), 0)
realm.message_visibility_limit = 10
realm.save()
expected_message_id = message_ids[5]
update_first_visible_message_id(realm)
self.assertEqual(get_first_visible_message_id(realm), expected_message_id)
# If the message_visibility_limit is greater than number of messages
# get_first_visible_message_id should return 0
realm.message_visibility_limit = 50
realm.save()
update_first_visible_message_id(realm)
self.assertEqual(get_first_visible_message_id(realm), 0)
def test_maybe_update_first_visible_message_id(self) -> None:
realm = get_realm("zulip")
lookback_hours = 30
realm.message_visibility_limit = None
realm.save()
end_time = timezone_now() - datetime.timedelta(hours=lookback_hours - 5)
stat = COUNT_STATS['messages_sent:is_bot:hour']
RealmCount.objects.create(realm=realm, property=stat.property,
end_time=end_time, value=5)
with mock.patch("zerver.lib.message.update_first_visible_message_id") as m:
maybe_update_first_visible_message_id(realm, lookback_hours)
m.assert_not_called()
realm.message_visibility_limit = 10
realm.save()
RealmCount.objects.all().delete()
with mock.patch("zerver.lib.message.update_first_visible_message_id") as m:
maybe_update_first_visible_message_id(realm, lookback_hours)
m.assert_not_called()
RealmCount.objects.create(realm=realm, property=stat.property,
end_time=end_time, value=5)
with mock.patch("zerver.lib.message.update_first_visible_message_id") as m:
maybe_update_first_visible_message_id(realm, lookback_hours)
m.assert_called_once_with(realm)
class TestBulkGetHuddleUserIds(ZulipTestCase):
def test_bulk_get_huddle_user_ids(self) -> None:
hamlet = self.example_user('hamlet')
cordelia = self.example_user('cordelia')
othello = self.example_user('othello')
iago = self.example_user('iago')
message_ids = [
self.send_huddle_message(hamlet, [cordelia, othello], 'test'),
self.send_huddle_message(cordelia, [hamlet, othello, iago], 'test'),
]
messages = Message.objects.filter(id__in=message_ids).order_by("id")
first_huddle_recipient = messages[0].recipient
first_huddle_user_ids = list(get_huddle_user_ids(first_huddle_recipient))
second_huddle_recipient = messages[1].recipient
second_huddle_user_ids = list(get_huddle_user_ids(second_huddle_recipient))
huddle_user_ids = bulk_get_huddle_user_ids([first_huddle_recipient, second_huddle_recipient])
self.assertEqual(huddle_user_ids[first_huddle_recipient.id], first_huddle_user_ids)
self.assertEqual(huddle_user_ids[second_huddle_recipient.id], second_huddle_user_ids)
def test_bulk_get_huddle_user_ids_empty_list(self) -> None:
self.assertEqual(bulk_get_huddle_user_ids([]), {})
class NoRecipientIDsTest(ZulipTestCase):
def test_no_recipient_ids(self) -> None:
user_profile = self.example_user('cordelia')
Subscription.objects.filter(user_profile=user_profile, recipient__type=Recipient.STREAM).delete()
subs = gather_subscriptions_helper(user_profile)
# Checks that gather_subscriptions_helper will not return anything
# since there will not be any recipients, without crashing.
#
# This covers a rare corner case.
self.assertEqual(len(subs[0]), 0)