mirror of
https://github.com/zulip/zulip.git
synced 2025-11-04 14:03:30 +00:00
213 lines
7.7 KiB
Python
213 lines
7.7 KiB
Python
import datetime
|
|
from typing import Dict, List
|
|
from unittest import mock
|
|
|
|
from django.utils.timezone import now as timezone_now
|
|
|
|
from zerver.lib.actions import (
|
|
gather_subscriptions_helper,
|
|
get_active_presence_idle_user_ids,
|
|
get_client,
|
|
get_last_message_id,
|
|
)
|
|
from zerver.lib.test_classes import ZulipTestCase
|
|
from zerver.lib.test_helpers import (
|
|
message_stream_count,
|
|
most_recent_message,
|
|
most_recent_usermessage,
|
|
)
|
|
from zerver.lib.topic import DB_TOPIC_NAME
|
|
from zerver.lib.url_encoding import near_message_url
|
|
from zerver.models import (
|
|
Message,
|
|
Recipient,
|
|
Subscription,
|
|
UserPresence,
|
|
UserProfile,
|
|
bulk_get_huddle_user_ids,
|
|
get_huddle_user_ids,
|
|
get_realm,
|
|
)
|
|
|
|
|
|
class MiscMessageTest(ZulipTestCase):
|
|
def test_get_last_message_id(self) -> None:
|
|
self.assertEqual(
|
|
get_last_message_id(),
|
|
Message.objects.latest('id').id,
|
|
)
|
|
|
|
Message.objects.all().delete()
|
|
|
|
self.assertEqual(get_last_message_id(), -1)
|
|
|
|
class PersonalMessagesTest(ZulipTestCase):
|
|
|
|
def test_near_pm_message_url(self) -> None:
|
|
realm = get_realm('zulip')
|
|
message = dict(
|
|
type='personal',
|
|
id=555,
|
|
display_recipient=[
|
|
dict(id=77),
|
|
dict(id=80),
|
|
],
|
|
)
|
|
url = near_message_url(
|
|
realm=realm,
|
|
message=message,
|
|
)
|
|
self.assertEqual(url, 'http://zulip.testserver/#narrow/pm-with/77,80-pm/near/555')
|
|
|
|
def test_is_private_flag_not_leaked(self) -> None:
|
|
"""
|
|
Make sure `is_private` flag is not leaked to the API.
|
|
"""
|
|
self.login('hamlet')
|
|
self.send_personal_message(self.example_user("hamlet"),
|
|
self.example_user("cordelia"),
|
|
"test")
|
|
|
|
for msg in self.get_messages():
|
|
self.assertNotIn('is_private', msg['flags'])
|
|
|
|
def test_auto_subbed_to_personals(self) -> None:
|
|
"""
|
|
Newly created users are auto-subbed to the ability to receive
|
|
personals.
|
|
"""
|
|
test_email = self.nonreg_email('test')
|
|
self.register(test_email, "test")
|
|
user_profile = self.nonreg_user('test')
|
|
old_messages_count = message_stream_count(user_profile)
|
|
self.send_personal_message(user_profile, user_profile)
|
|
new_messages_count = message_stream_count(user_profile)
|
|
self.assertEqual(new_messages_count, old_messages_count + 1)
|
|
|
|
recipient = Recipient.objects.get(type_id=user_profile.id,
|
|
type=Recipient.PERSONAL)
|
|
message = most_recent_message(user_profile)
|
|
self.assertEqual(message.recipient, recipient)
|
|
|
|
with mock.patch('zerver.models.get_display_recipient', return_value='recip'):
|
|
self.assertEqual(
|
|
str(message),
|
|
'<Message: recip / / '
|
|
'<UserProfile: {} {}>>'.format(user_profile.email, user_profile.realm))
|
|
|
|
user_message = most_recent_usermessage(user_profile)
|
|
self.assertEqual(
|
|
str(user_message),
|
|
f'<UserMessage: recip / {user_profile.email} ([])>',
|
|
)
|
|
|
|
class MissedMessageTest(ZulipTestCase):
|
|
def test_presence_idle_user_ids(self) -> None:
|
|
UserPresence.objects.all().delete()
|
|
|
|
sender = self.example_user('cordelia')
|
|
realm = sender.realm
|
|
hamlet = self.example_user('hamlet')
|
|
othello = self.example_user('othello')
|
|
recipient_ids = {hamlet.id, othello.id}
|
|
message_type = 'stream'
|
|
user_flags: Dict[int, List[str]] = {}
|
|
|
|
def assert_missing(user_ids: List[int]) -> None:
|
|
presence_idle_user_ids = get_active_presence_idle_user_ids(
|
|
realm=realm,
|
|
sender_id=sender.id,
|
|
message_type=message_type,
|
|
active_user_ids=recipient_ids,
|
|
user_flags=user_flags,
|
|
)
|
|
self.assertEqual(sorted(user_ids), sorted(presence_idle_user_ids))
|
|
|
|
def set_presence(user: UserProfile, client_name: str, ago: int) -> None:
|
|
when = timezone_now() - datetime.timedelta(seconds=ago)
|
|
UserPresence.objects.create(
|
|
user_profile_id=user.id,
|
|
realm_id=user.realm_id,
|
|
client=get_client(client_name),
|
|
timestamp=when,
|
|
)
|
|
|
|
message_type = 'private'
|
|
assert_missing([hamlet.id, othello.id])
|
|
|
|
message_type = 'stream'
|
|
user_flags[hamlet.id] = ['mentioned']
|
|
assert_missing([hamlet.id])
|
|
|
|
set_presence(hamlet, 'iPhone', ago=5000)
|
|
assert_missing([hamlet.id])
|
|
|
|
set_presence(hamlet, 'webapp', ago=15)
|
|
assert_missing([])
|
|
|
|
message_type = 'private'
|
|
assert_missing([othello.id])
|
|
|
|
class LogDictTest(ZulipTestCase):
|
|
def test_to_log_dict(self) -> None:
|
|
user = self.example_user('hamlet')
|
|
stream_name = 'Denmark'
|
|
topic_name = 'Copenhagen'
|
|
content = 'find me some good coffee shops'
|
|
message_id = self.send_stream_message(user, stream_name,
|
|
topic_name=topic_name,
|
|
content=content)
|
|
message = Message.objects.get(id=message_id)
|
|
dct = message.to_log_dict()
|
|
|
|
self.assertTrue('timestamp' in dct)
|
|
|
|
self.assertEqual(dct['content'], 'find me some good coffee shops')
|
|
self.assertEqual(dct['id'], message.id)
|
|
self.assertEqual(dct['recipient'], 'Denmark')
|
|
self.assertEqual(dct['sender_realm_str'], 'zulip')
|
|
self.assertEqual(dct['sender_email'], user.email)
|
|
self.assertEqual(dct['sender_full_name'], 'King Hamlet')
|
|
self.assertEqual(dct['sender_id'], user.id)
|
|
self.assertEqual(dct['sender_short_name'], 'hamlet')
|
|
self.assertEqual(dct['sending_client'], 'test suite')
|
|
self.assertEqual(dct[DB_TOPIC_NAME], 'Copenhagen')
|
|
self.assertEqual(dct['type'], 'stream')
|
|
|
|
class TestBulkGetHuddleUserIds(ZulipTestCase):
|
|
def test_bulk_get_huddle_user_ids(self) -> None:
|
|
hamlet = self.example_user('hamlet')
|
|
cordelia = self.example_user('cordelia')
|
|
othello = self.example_user('othello')
|
|
iago = self.example_user('iago')
|
|
message_ids = [
|
|
self.send_huddle_message(hamlet, [cordelia, othello], 'test'),
|
|
self.send_huddle_message(cordelia, [hamlet, othello, iago], 'test'),
|
|
]
|
|
|
|
messages = Message.objects.filter(id__in=message_ids).order_by("id")
|
|
first_huddle_recipient = messages[0].recipient
|
|
first_huddle_user_ids = list(get_huddle_user_ids(first_huddle_recipient))
|
|
second_huddle_recipient = messages[1].recipient
|
|
second_huddle_user_ids = list(get_huddle_user_ids(second_huddle_recipient))
|
|
|
|
huddle_user_ids = bulk_get_huddle_user_ids([first_huddle_recipient, second_huddle_recipient])
|
|
self.assertEqual(huddle_user_ids[first_huddle_recipient.id], first_huddle_user_ids)
|
|
self.assertEqual(huddle_user_ids[second_huddle_recipient.id], second_huddle_user_ids)
|
|
|
|
def test_bulk_get_huddle_user_ids_empty_list(self) -> None:
|
|
self.assertEqual(bulk_get_huddle_user_ids([]), {})
|
|
|
|
class NoRecipientIDsTest(ZulipTestCase):
|
|
def test_no_recipient_ids(self) -> None:
|
|
user_profile = self.example_user('cordelia')
|
|
|
|
Subscription.objects.filter(user_profile=user_profile, recipient__type=Recipient.STREAM).delete()
|
|
subs = gather_subscriptions_helper(user_profile)
|
|
|
|
# Checks that gather_subscriptions_helper will not return anything
|
|
# since there will not be any recipients, without crashing.
|
|
#
|
|
# This covers a rare corner case.
|
|
self.assertEqual(len(subs[0]), 0)
|