mirror of
				https://github.com/zulip/zulip.git
				synced 2025-11-04 05:53:43 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			1418 lines
		
	
	
		
			67 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			1418 lines
		
	
	
		
			67 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# -*- coding: utf-8 -*-
 | 
						|
from __future__ import absolute_import
 | 
						|
from django.db.models import Q
 | 
						|
from sqlalchemy.sql import (
 | 
						|
    and_, select, column, compiler
 | 
						|
)
 | 
						|
from django.test import TestCase
 | 
						|
from zerver.lib import bugdown
 | 
						|
from zerver.decorator import JsonableError
 | 
						|
from zerver.lib.test_runner import slow
 | 
						|
from zerver.views.messages import (
 | 
						|
    exclude_muting_conditions, get_sqlalchemy_connection,
 | 
						|
    get_old_messages_backend, ok_to_include_history,
 | 
						|
    NarrowBuilder,
 | 
						|
)
 | 
						|
from zilencer.models import Deployment
 | 
						|
 | 
						|
from zerver.lib.test_helpers import (
 | 
						|
    AuthedTestCase, POSTRequestMock,
 | 
						|
    get_user_messages,
 | 
						|
    message_ids, message_stream_count,
 | 
						|
    most_recent_message,
 | 
						|
    queries_captured,
 | 
						|
)
 | 
						|
 | 
						|
from zerver.models import (
 | 
						|
    MAX_MESSAGE_LENGTH, MAX_SUBJECT_LENGTH,
 | 
						|
    Client, Message, Realm, Recipient, Stream, Subscription, UserMessage, UserProfile,
 | 
						|
    get_display_recipient, get_recipient, get_realm, get_stream, get_user_profile_by_email,
 | 
						|
)
 | 
						|
 | 
						|
from zerver.lib.actions import (
 | 
						|
    check_message, check_send_message,
 | 
						|
    create_stream_if_needed,
 | 
						|
    do_add_subscription, do_create_user,
 | 
						|
)
 | 
						|
 | 
						|
import datetime
 | 
						|
import time
 | 
						|
import re
 | 
						|
import ujson
 | 
						|
 | 
						|
 | 
						|
def get_sqlalchemy_query_params(query):
 | 
						|
    dialect = get_sqlalchemy_connection().dialect
 | 
						|
    comp = compiler.SQLCompiler(dialect, query)
 | 
						|
    comp.compile()
 | 
						|
    return comp.params
 | 
						|
 | 
						|
def fix_ws(s):
 | 
						|
    return re.sub('\s+', ' ', str(s)).strip()
 | 
						|
 | 
						|
def get_recipient_id_for_stream_name(realm, stream_name):
 | 
						|
    stream = get_stream(stream_name, realm)
 | 
						|
    return get_recipient(Recipient.STREAM, stream.id).id
 | 
						|
 | 
						|
def mute_stream(realm, user_profile, stream_name):
 | 
						|
    stream = Stream.objects.get(realm=realm, name=stream_name)
 | 
						|
    recipient = Recipient.objects.get(type_id=stream.id, type=Recipient.STREAM)
 | 
						|
    subscription = Subscription.objects.get(recipient=recipient, user_profile=user_profile)
 | 
						|
    subscription.in_home_view = False
 | 
						|
    subscription.save()
 | 
						|
 | 
						|
class NarrowBuilderTest(AuthedTestCase):
 | 
						|
    def test_add_term(self):
 | 
						|
        realm = get_realm('zulip.com')
 | 
						|
        user_profile = get_user_profile_by_email("hamlet@zulip.com")
 | 
						|
        builder = NarrowBuilder(user_profile, column('id'))
 | 
						|
        raw_query = select([column("id")], None, "zerver_message")
 | 
						|
 | 
						|
        def check(term, where_clause):
 | 
						|
            query = builder.add_term(raw_query, term)
 | 
						|
            self.assertTrue(where_clause in str(query))
 | 
						|
 | 
						|
        term = dict(operator='stream', operand='Scotland')
 | 
						|
        check(term, 'WHERE recipient_id = :recipient_id_1')
 | 
						|
 | 
						|
        term = dict(operator='is', operand='private')
 | 
						|
        check(term, 'WHERE type = :type_1 OR type = :type_2')
 | 
						|
 | 
						|
        for operand in ['starred', 'mentioned', 'alerted']:
 | 
						|
            term = dict(operator='is', operand=operand)
 | 
						|
            check(term, 'WHERE (flags & :flags_1) != :param_1')
 | 
						|
 | 
						|
        term = dict(operator='topic', operand='lunch')
 | 
						|
        check(term, 'WHERE upper(subject) = upper(:param_1)')
 | 
						|
 | 
						|
        term = dict(operator='sender', operand='othello@zulip.com')
 | 
						|
        check(term, 'WHERE sender_id = :param_1')
 | 
						|
 | 
						|
        term = dict(operator='pm-with', operand='othello@zulip.com')
 | 
						|
        check(term, 'WHERE sender_id = :sender_id_1 AND recipient_id = :recipient_id_1 OR sender_id = :sender_id_2 AND recipient_id = :recipient_id_2')
 | 
						|
 | 
						|
        term = dict(operator='id', operand=555)
 | 
						|
        check(term, 'WHERE id = :param_1')
 | 
						|
 | 
						|
        term = dict(operator='search', operand='"french fries"')
 | 
						|
        check(term, 'WHERE (lower(content) LIKE lower(:content_1) OR lower(subject) LIKE lower(:subject_1)) AND (search_tsvector @@ plainto_tsquery(:param_2, :param_3))')
 | 
						|
 | 
						|
        term = dict(operator='has', operand='attachment')
 | 
						|
        check(term, 'WHERE has_attachment')
 | 
						|
 | 
						|
        term = dict(operator='has', operand='image')
 | 
						|
        check(term, 'WHERE has_image')
 | 
						|
 | 
						|
        term = dict(operator='has', operand='link')
 | 
						|
        check(term, 'WHERE has_link')
 | 
						|
 | 
						|
        mute_stream(realm, user_profile, 'Verona')
 | 
						|
        term = dict(operator='in', operand='home')
 | 
						|
        check(term, 'WHERE recipient_id NOT IN (:recipient_id_1)')
 | 
						|
 | 
						|
class IncludeHistoryTest(AuthedTestCase):
 | 
						|
    def test_ok_to_include_history(self):
 | 
						|
        realm = get_realm('zulip.com')
 | 
						|
        create_stream_if_needed(realm, 'public_stream')
 | 
						|
 | 
						|
        # Negated stream searches should not include history.
 | 
						|
        narrow = [
 | 
						|
            dict(operator='stream', operand='public_stream', negated=True),
 | 
						|
        ]
 | 
						|
        self.assertFalse(ok_to_include_history(narrow, realm))
 | 
						|
 | 
						|
        # Definitely forbid seeing history on private streams.
 | 
						|
        narrow = [
 | 
						|
            dict(operator='stream', operand='private_stream'),
 | 
						|
        ]
 | 
						|
        self.assertFalse(ok_to_include_history(narrow, realm))
 | 
						|
 | 
						|
        # History doesn't apply to PMs.
 | 
						|
        narrow = [
 | 
						|
            dict(operator='is', operand='private'),
 | 
						|
        ]
 | 
						|
        self.assertFalse(ok_to_include_history(narrow, realm))
 | 
						|
 | 
						|
        # If we are looking for something like starred messages, there is
 | 
						|
        # no point in searching historical messages.
 | 
						|
        narrow = [
 | 
						|
            dict(operator='stream', operand='public_stream'),
 | 
						|
            dict(operator='is', operand='starred'),
 | 
						|
        ]
 | 
						|
        self.assertFalse(ok_to_include_history(narrow, realm))
 | 
						|
 | 
						|
        # simple True case
 | 
						|
        narrow = [
 | 
						|
            dict(operator='stream', operand='public_stream'),
 | 
						|
        ]
 | 
						|
        self.assertTrue(ok_to_include_history(narrow, realm))
 | 
						|
 | 
						|
        narrow = [
 | 
						|
            dict(operator='stream', operand='public_stream'),
 | 
						|
            dict(operator='topic', operand='whatever'),
 | 
						|
            dict(operator='search', operand='needle in haystack'),
 | 
						|
        ]
 | 
						|
        self.assertTrue(ok_to_include_history(narrow, realm))
 | 
						|
 | 
						|
class TestCrossRealmPMs(AuthedTestCase):
 | 
						|
    def create_user(self, email):
 | 
						|
        username, domain = email.split('@')
 | 
						|
        self.register(username, 'test', domain=domain)
 | 
						|
        return get_user_profile_by_email(email)
 | 
						|
 | 
						|
    def test_same_realm(self):
 | 
						|
        """Users on the same realm can PM each other"""
 | 
						|
        r1 = Realm.objects.create(domain='1.example.com')
 | 
						|
        deployment = Deployment.objects.filter()[0]
 | 
						|
        deployment.realms.add(r1)
 | 
						|
 | 
						|
        user1_email = 'user1@1.example.com'
 | 
						|
        user1 = self.create_user(user1_email)
 | 
						|
        user2_email = 'user2@1.example.com'
 | 
						|
        user2 = self.create_user(user2_email)
 | 
						|
 | 
						|
        self.send_message(user1_email, user2_email, Recipient.PERSONAL)
 | 
						|
 | 
						|
        messages = get_user_messages(user2)
 | 
						|
        self.assertEqual(len(messages), 1)
 | 
						|
        self.assertEquals(messages[0].sender.pk, user1.pk)
 | 
						|
 | 
						|
    def test_diffrent_realms(self):
 | 
						|
        """Users on the different realms can not PM each other"""
 | 
						|
        r1 = Realm.objects.create(domain='1.example.com')
 | 
						|
        r2 = Realm.objects.create(domain='2.example.com')
 | 
						|
        deployment = Deployment.objects.filter()[0]
 | 
						|
        deployment.realms.add(r1)
 | 
						|
        deployment.realms.add(r2)
 | 
						|
 | 
						|
        user1_email = 'user1@1.example.com'
 | 
						|
        self.create_user(user1_email)
 | 
						|
        user2_email = 'user2@2.example.com'
 | 
						|
        self.create_user(user2_email)
 | 
						|
 | 
						|
        with self.assertRaisesRegexp(JsonableError,
 | 
						|
                                     'You can\'t send private messages outside of your organization.'):
 | 
						|
            self.send_message(user1_email, user2_email, Recipient.PERSONAL)
 | 
						|
 | 
						|
    def test_three_diffrent_realms(self):
 | 
						|
        """Users on three different realms can not PM each other"""
 | 
						|
        r1 = Realm.objects.create(domain='1.example.com')
 | 
						|
        r2 = Realm.objects.create(domain='2.example.com')
 | 
						|
        r3 = Realm.objects.create(domain='3.example.com')
 | 
						|
        deployment = Deployment.objects.filter()[0]
 | 
						|
        deployment.realms.add(r1)
 | 
						|
        deployment.realms.add(r2)
 | 
						|
        deployment.realms.add(r3)
 | 
						|
 | 
						|
        user1_email = 'user1@1.example.com'
 | 
						|
        self.create_user(user1_email)
 | 
						|
        user2_email = 'user2@2.example.com'
 | 
						|
        self.create_user(user2_email)
 | 
						|
        user3_email = 'user3@2.example.com'
 | 
						|
        self.create_user(user3_email)
 | 
						|
 | 
						|
        with self.assertRaisesRegexp(JsonableError,
 | 
						|
                                     'You can\'t send private messages outside of your organization.'):
 | 
						|
            self.send_message(user1_email, [user2_email, user3_email], Recipient.PERSONAL)
 | 
						|
 | 
						|
    def test_from_zulip_realm(self):
 | 
						|
        """Users in the zulip.com realm can PM any realm"""
 | 
						|
        r1 = Realm.objects.create(domain='1.example.com')
 | 
						|
        deployment = Deployment.objects.filter()[0]
 | 
						|
        deployment.realms.add(r1)
 | 
						|
 | 
						|
        user1_email = 'user1@zulip.com'
 | 
						|
        user1 = self.create_user(user1_email)
 | 
						|
        user2_email = 'user2@1.example.com'
 | 
						|
        user2 = self.create_user(user2_email)
 | 
						|
 | 
						|
        self.send_message(user1_email, user2_email, Recipient.PERSONAL)
 | 
						|
 | 
						|
        messages = get_user_messages(user2)
 | 
						|
        self.assertEqual(len(messages), 1)
 | 
						|
        self.assertEquals(messages[0].sender.pk, user1.pk)
 | 
						|
 | 
						|
    def test_to_zulip_realm(self):
 | 
						|
        """All users can PM users in the zulip.com realm"""
 | 
						|
        r1 = Realm.objects.create(domain='1.example.com')
 | 
						|
        deployment = Deployment.objects.filter()[0]
 | 
						|
        deployment.realms.add(r1)
 | 
						|
 | 
						|
        user1_email = 'user1@1.example.com'
 | 
						|
        user1 = self.create_user(user1_email)
 | 
						|
        user2_email = 'user2@zulip.com'
 | 
						|
        user2 = self.create_user(user2_email)
 | 
						|
 | 
						|
        self.send_message(user1_email, user2_email, Recipient.PERSONAL)
 | 
						|
 | 
						|
        messages = get_user_messages(user2)
 | 
						|
        self.assertEqual(len(messages), 1)
 | 
						|
        self.assertEquals(messages[0].sender.pk, user1.pk)
 | 
						|
 | 
						|
    def test_zulip_realm_can_not_join_realms(self):
 | 
						|
        """Adding a zulip.com user to a PM will not let you cross realms"""
 | 
						|
        r1 = Realm.objects.create(domain='1.example.com')
 | 
						|
        r2 = Realm.objects.create(domain='2.example.com')
 | 
						|
        deployment = Deployment.objects.filter()[0]
 | 
						|
        deployment.realms.add(r1)
 | 
						|
        deployment.realms.add(r2)
 | 
						|
 | 
						|
        user1_email = 'user1@1.example.com'
 | 
						|
        self.create_user(user1_email)
 | 
						|
        user2_email = 'user2@2.example.com'
 | 
						|
        self.create_user(user2_email)
 | 
						|
        user3_email = 'user3@zulip.com'
 | 
						|
        self.create_user(user3_email)
 | 
						|
 | 
						|
        with self.assertRaisesRegexp(JsonableError,
 | 
						|
                                     'You can\'t send private messages outside of your organization.'):
 | 
						|
            self.send_message(user1_email, [user2_email, user3_email],
 | 
						|
                              Recipient.PERSONAL)
 | 
						|
 | 
						|
class PersonalMessagesTest(AuthedTestCase):
 | 
						|
 | 
						|
    def test_auto_subbed_to_personals(self):
 | 
						|
        """
 | 
						|
        Newly created users are auto-subbed to the ability to receive
 | 
						|
        personals.
 | 
						|
        """
 | 
						|
        self.register("test", "test")
 | 
						|
        user_profile = get_user_profile_by_email('test@zulip.com')
 | 
						|
        old_messages_count = message_stream_count(user_profile)
 | 
						|
        self.send_message("test@zulip.com", "test@zulip.com", Recipient.PERSONAL)
 | 
						|
        new_messages_count = message_stream_count(user_profile)
 | 
						|
        self.assertEqual(new_messages_count, old_messages_count + 1)
 | 
						|
 | 
						|
        recipient = Recipient.objects.get(type_id=user_profile.id,
 | 
						|
                                          type=Recipient.PERSONAL)
 | 
						|
        self.assertEqual(most_recent_message(user_profile).recipient, recipient)
 | 
						|
 | 
						|
    @slow(0.36, "checks several profiles")
 | 
						|
    def test_personal_to_self(self):
 | 
						|
        """
 | 
						|
        If you send a personal to yourself, only you see it.
 | 
						|
        """
 | 
						|
        old_user_profiles = list(UserProfile.objects.all())
 | 
						|
        self.register("test1", "test1")
 | 
						|
 | 
						|
        old_messages = []
 | 
						|
        for user_profile in old_user_profiles:
 | 
						|
            old_messages.append(message_stream_count(user_profile))
 | 
						|
 | 
						|
        self.send_message("test1@zulip.com", "test1@zulip.com", Recipient.PERSONAL)
 | 
						|
 | 
						|
        new_messages = []
 | 
						|
        for user_profile in old_user_profiles:
 | 
						|
            new_messages.append(message_stream_count(user_profile))
 | 
						|
 | 
						|
        self.assertEqual(old_messages, new_messages)
 | 
						|
 | 
						|
        user_profile = get_user_profile_by_email("test1@zulip.com")
 | 
						|
        recipient = Recipient.objects.get(type_id=user_profile.id, type=Recipient.PERSONAL)
 | 
						|
        self.assertEqual(most_recent_message(user_profile).recipient, recipient)
 | 
						|
 | 
						|
    def assert_personal(self, sender_email, receiver_email, content="test content"):
 | 
						|
        """
 | 
						|
        Send a private message from `sender_email` to `receiver_email` and check
 | 
						|
        that only those two parties actually received the message.
 | 
						|
        """
 | 
						|
        sender = get_user_profile_by_email(sender_email)
 | 
						|
        receiver = get_user_profile_by_email(receiver_email)
 | 
						|
 | 
						|
        sender_messages = message_stream_count(sender)
 | 
						|
        receiver_messages = message_stream_count(receiver)
 | 
						|
 | 
						|
        other_user_profiles = UserProfile.objects.filter(~Q(email=sender_email) &
 | 
						|
                                                         ~Q(email=receiver_email))
 | 
						|
        old_other_messages = []
 | 
						|
        for user_profile in other_user_profiles:
 | 
						|
            old_other_messages.append(message_stream_count(user_profile))
 | 
						|
 | 
						|
        self.send_message(sender_email, receiver_email, Recipient.PERSONAL, content)
 | 
						|
 | 
						|
        # Users outside the conversation don't get the message.
 | 
						|
        new_other_messages = []
 | 
						|
        for user_profile in other_user_profiles:
 | 
						|
            new_other_messages.append(message_stream_count(user_profile))
 | 
						|
 | 
						|
        self.assertEqual(old_other_messages, new_other_messages)
 | 
						|
 | 
						|
        # The personal message is in the streams of both the sender and receiver.
 | 
						|
        self.assertEqual(message_stream_count(sender),
 | 
						|
                         sender_messages + 1)
 | 
						|
        self.assertEqual(message_stream_count(receiver),
 | 
						|
                         receiver_messages + 1)
 | 
						|
 | 
						|
        recipient = Recipient.objects.get(type_id=receiver.id, type=Recipient.PERSONAL)
 | 
						|
        self.assertEqual(most_recent_message(sender).recipient, recipient)
 | 
						|
        self.assertEqual(most_recent_message(receiver).recipient, recipient)
 | 
						|
 | 
						|
    @slow(0.28, "assert_personal checks several profiles")
 | 
						|
    def test_personal(self):
 | 
						|
        """
 | 
						|
        If you send a personal, only you and the recipient see it.
 | 
						|
        """
 | 
						|
        self.login("hamlet@zulip.com")
 | 
						|
        self.assert_personal("hamlet@zulip.com", "othello@zulip.com")
 | 
						|
 | 
						|
    @slow(0.28, "assert_personal checks several profiles")
 | 
						|
    def test_non_ascii_personal(self):
 | 
						|
        """
 | 
						|
        Sending a PM containing non-ASCII characters succeeds.
 | 
						|
        """
 | 
						|
        self.login("hamlet@zulip.com")
 | 
						|
        self.assert_personal("hamlet@zulip.com", "othello@zulip.com", u"hümbüǵ")
 | 
						|
 | 
						|
class StreamMessagesTest(AuthedTestCase):
 | 
						|
 | 
						|
    def assert_stream_message(self, stream_name, subject="test subject",
 | 
						|
                              content="test content"):
 | 
						|
        """
 | 
						|
        Check that messages sent to a stream reach all subscribers to that stream.
 | 
						|
        """
 | 
						|
        subscribers = self.users_subscribed_to_stream(stream_name, "zulip.com")
 | 
						|
        old_subscriber_messages = []
 | 
						|
        for subscriber in subscribers:
 | 
						|
            old_subscriber_messages.append(message_stream_count(subscriber))
 | 
						|
 | 
						|
        non_subscribers = [user_profile for user_profile in UserProfile.objects.all()
 | 
						|
                           if user_profile not in subscribers]
 | 
						|
        old_non_subscriber_messages = []
 | 
						|
        for non_subscriber in non_subscribers:
 | 
						|
            old_non_subscriber_messages.append(message_stream_count(non_subscriber))
 | 
						|
 | 
						|
        a_subscriber_email = subscribers[0].email
 | 
						|
        self.login(a_subscriber_email)
 | 
						|
        self.send_message(a_subscriber_email, stream_name, Recipient.STREAM,
 | 
						|
                          subject, content)
 | 
						|
 | 
						|
        # Did all of the subscribers get the message?
 | 
						|
        new_subscriber_messages = []
 | 
						|
        for subscriber in subscribers:
 | 
						|
           new_subscriber_messages.append(message_stream_count(subscriber))
 | 
						|
 | 
						|
        # Did non-subscribers not get the message?
 | 
						|
        new_non_subscriber_messages = []
 | 
						|
        for non_subscriber in non_subscribers:
 | 
						|
            new_non_subscriber_messages.append(message_stream_count(non_subscriber))
 | 
						|
 | 
						|
        self.assertEqual(old_non_subscriber_messages, new_non_subscriber_messages)
 | 
						|
        self.assertEqual(new_subscriber_messages, [elt + 1 for elt in old_subscriber_messages])
 | 
						|
 | 
						|
    def test_not_too_many_queries(self):
 | 
						|
        recipient_list  = ['hamlet@zulip.com', 'iago@zulip.com', 'cordelia@zulip.com', 'othello@zulip.com']
 | 
						|
        for email in recipient_list:
 | 
						|
            self.subscribe_to_stream(email, "Denmark")
 | 
						|
 | 
						|
        sender_email = 'hamlet@zulip.com'
 | 
						|
        sender = get_user_profile_by_email(sender_email)
 | 
						|
        message_type_name = "stream"
 | 
						|
        (sending_client, _) = Client.objects.get_or_create(name="test suite")
 | 
						|
        stream = 'Denmark'
 | 
						|
        subject = 'foo'
 | 
						|
        content = 'whatever'
 | 
						|
        realm = sender.realm
 | 
						|
 | 
						|
        def send_message():
 | 
						|
            check_send_message(sender, sending_client, message_type_name, [stream],
 | 
						|
                               subject, content, forwarder_user_profile=sender, realm=realm)
 | 
						|
 | 
						|
        send_message() # prime the caches
 | 
						|
        with queries_captured() as queries:
 | 
						|
            send_message()
 | 
						|
 | 
						|
        self.assert_length(queries, 5)
 | 
						|
 | 
						|
    def test_message_mentions(self):
 | 
						|
        user_profile = get_user_profile_by_email("iago@zulip.com")
 | 
						|
        self.subscribe_to_stream(user_profile.email, "Denmark")
 | 
						|
        self.send_message("hamlet@zulip.com", "Denmark", Recipient.STREAM,
 | 
						|
                          content="test @**Iago** rules")
 | 
						|
        message = most_recent_message(user_profile)
 | 
						|
        assert(UserMessage.objects.get(user_profile=user_profile, message=message).flags.mentioned.is_set)
 | 
						|
 | 
						|
    @slow(0.28, 'checks all users')
 | 
						|
    def test_message_to_stream(self):
 | 
						|
        """
 | 
						|
        If you send a message to a stream, everyone subscribed to the stream
 | 
						|
        receives the messages.
 | 
						|
        """
 | 
						|
        self.assert_stream_message("Scotland")
 | 
						|
 | 
						|
    @slow(0.37, 'checks all users')
 | 
						|
    def test_non_ascii_stream_message(self):
 | 
						|
        """
 | 
						|
        Sending a stream message containing non-ASCII characters in the stream
 | 
						|
        name, subject, or message body succeeds.
 | 
						|
        """
 | 
						|
        self.login("hamlet@zulip.com")
 | 
						|
 | 
						|
        # Subscribe everyone to a stream with non-ASCII characters.
 | 
						|
        non_ascii_stream_name = u"hümbüǵ"
 | 
						|
        realm = Realm.objects.get(domain="zulip.com")
 | 
						|
        stream, _ = create_stream_if_needed(realm, non_ascii_stream_name)
 | 
						|
        for user_profile in UserProfile.objects.filter(realm=realm):
 | 
						|
            do_add_subscription(user_profile, stream, no_log=True)
 | 
						|
 | 
						|
        self.assert_stream_message(non_ascii_stream_name, subject=u"hümbüǵ",
 | 
						|
                                   content=u"hümbüǵ")
 | 
						|
 | 
						|
class MessageDictTest(AuthedTestCase):
 | 
						|
    @slow(1.6, 'builds lots of messages')
 | 
						|
    def test_bulk_message_fetching(self):
 | 
						|
        realm = Realm.objects.get(domain="zulip.com")
 | 
						|
        sender = get_user_profile_by_email('othello@zulip.com')
 | 
						|
        receiver = get_user_profile_by_email('hamlet@zulip.com')
 | 
						|
        pm_recipient = Recipient.objects.get(type_id=receiver.id, type=Recipient.PERSONAL)
 | 
						|
        stream, _ = create_stream_if_needed(realm, 'devel')
 | 
						|
        stream_recipient = Recipient.objects.get(type_id=stream.id, type=Recipient.STREAM)
 | 
						|
        sending_client, _ = Client.objects.get_or_create(name="test suite")
 | 
						|
 | 
						|
        for i in range(300):
 | 
						|
            for recipient in [pm_recipient, stream_recipient]:
 | 
						|
                message = Message(
 | 
						|
                    sender=sender,
 | 
						|
                    recipient=recipient,
 | 
						|
                    subject='whatever',
 | 
						|
                    content='whatever %d' % i,
 | 
						|
                    pub_date=datetime.datetime.now(),
 | 
						|
                    sending_client=sending_client,
 | 
						|
                    last_edit_time=datetime.datetime.now(),
 | 
						|
                    edit_history='[]'
 | 
						|
                )
 | 
						|
                message.save()
 | 
						|
 | 
						|
        ids = [row['id'] for row in Message.objects.all().values('id')]
 | 
						|
        num_ids = len(ids)
 | 
						|
        self.assertTrue(num_ids >= 600)
 | 
						|
 | 
						|
        t = time.time()
 | 
						|
        with queries_captured() as queries:
 | 
						|
            rows = list(Message.get_raw_db_rows(ids))
 | 
						|
 | 
						|
            for row in rows:
 | 
						|
                Message.build_dict_from_raw_db_row(row, False)
 | 
						|
 | 
						|
        delay = time.time() - t
 | 
						|
        # Make sure we don't take longer than 1ms per message to extract messages.
 | 
						|
        self.assertTrue(delay < 0.001 * num_ids)
 | 
						|
        self.assert_length(queries, 7)
 | 
						|
        self.assertEqual(len(rows), num_ids)
 | 
						|
 | 
						|
    def test_applying_markdown(self):
 | 
						|
        sender = get_user_profile_by_email('othello@zulip.com')
 | 
						|
        receiver = get_user_profile_by_email('hamlet@zulip.com')
 | 
						|
        recipient = Recipient.objects.get(type_id=receiver.id, type=Recipient.PERSONAL)
 | 
						|
        sending_client, _ = Client.objects.get_or_create(name="test suite")
 | 
						|
        message = Message(
 | 
						|
            sender=sender,
 | 
						|
            recipient=recipient,
 | 
						|
            subject='whatever',
 | 
						|
            content='hello **world**',
 | 
						|
            pub_date=datetime.datetime.now(),
 | 
						|
            sending_client=sending_client,
 | 
						|
            last_edit_time=datetime.datetime.now(),
 | 
						|
            edit_history='[]'
 | 
						|
        )
 | 
						|
        message.save()
 | 
						|
 | 
						|
        # An important part of this test is to get the message through this exact code path,
 | 
						|
        # because there is an ugly hack we need to cover.  So don't just say "row = message".
 | 
						|
        row = Message.get_raw_db_rows([message.id])[0]
 | 
						|
        dct = Message.build_dict_from_raw_db_row(row, apply_markdown=True)
 | 
						|
        expected_content = '<p>hello <strong>world</strong></p>'
 | 
						|
        self.assertEqual(dct['content'], expected_content)
 | 
						|
        message = Message.objects.get(id=message.id)
 | 
						|
        self.assertEqual(message.rendered_content, expected_content)
 | 
						|
        self.assertEqual(message.rendered_content_version, bugdown.version)
 | 
						|
 | 
						|
class MessagePOSTTest(AuthedTestCase):
 | 
						|
 | 
						|
    def test_message_to_self(self):
 | 
						|
        """
 | 
						|
        Sending a message to a stream to which you are subscribed is
 | 
						|
        successful.
 | 
						|
        """
 | 
						|
        self.login("hamlet@zulip.com")
 | 
						|
        result = self.client.post("/json/send_message", {"type": "stream",
 | 
						|
                                                         "to": "Verona",
 | 
						|
                                                         "client": "test suite",
 | 
						|
                                                         "content": "Test message",
 | 
						|
                                                         "subject": "Test subject"})
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
    def test_api_message_to_self(self):
 | 
						|
        """
 | 
						|
        Same as above, but for the API view
 | 
						|
        """
 | 
						|
        email = "hamlet@zulip.com"
 | 
						|
        api_key = self.get_api_key(email)
 | 
						|
        result = self.client.post("/api/v1/send_message", {"type": "stream",
 | 
						|
                                                           "to": "Verona",
 | 
						|
                                                           "client": "test suite",
 | 
						|
                                                           "content": "Test message",
 | 
						|
                                                           "subject": "Test subject",
 | 
						|
                                                           "email": email,
 | 
						|
                                                           "api-key": api_key})
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
    def test_api_message_with_default_to(self):
 | 
						|
        """
 | 
						|
        Sending messages without a to field should be sent to the default
 | 
						|
        stream for the user_profile.
 | 
						|
        """
 | 
						|
        email = "hamlet@zulip.com"
 | 
						|
        api_key = self.get_api_key(email)
 | 
						|
        user_profile = get_user_profile_by_email("hamlet@zulip.com")
 | 
						|
        user_profile.default_sending_stream = get_stream('Verona', user_profile.realm)
 | 
						|
        user_profile.save()
 | 
						|
        result = self.client.post("/api/v1/send_message", {"type": "stream",
 | 
						|
                                                           "client": "test suite",
 | 
						|
                                                           "content": "Test message no to",
 | 
						|
                                                           "subject": "Test subject",
 | 
						|
                                                           "email": email,
 | 
						|
                                                           "api-key": api_key})
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
        sent_message = Message.objects.all().order_by('-id')[0]
 | 
						|
        self.assertEqual(sent_message.content, "Test message no to")
 | 
						|
 | 
						|
    def test_message_to_nonexistent_stream(self):
 | 
						|
        """
 | 
						|
        Sending a message to a nonexistent stream fails.
 | 
						|
        """
 | 
						|
        self.login("hamlet@zulip.com")
 | 
						|
        self.assertFalse(Stream.objects.filter(name="nonexistent_stream"))
 | 
						|
        result = self.client.post("/json/send_message", {"type": "stream",
 | 
						|
                                                         "to": "nonexistent_stream",
 | 
						|
                                                         "client": "test suite",
 | 
						|
                                                         "content": "Test message",
 | 
						|
                                                         "subject": "Test subject"})
 | 
						|
        self.assert_json_error(result, "Stream does not exist")
 | 
						|
 | 
						|
    def test_personal_message(self):
 | 
						|
        """
 | 
						|
        Sending a personal message to a valid username is successful.
 | 
						|
        """
 | 
						|
        self.login("hamlet@zulip.com")
 | 
						|
        result = self.client.post("/json/send_message", {"type": "private",
 | 
						|
                                                         "content": "Test message",
 | 
						|
                                                         "client": "test suite",
 | 
						|
                                                         "to": "othello@zulip.com"})
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
    def test_personal_message_to_nonexistent_user(self):
 | 
						|
        """
 | 
						|
        Sending a personal message to an invalid email returns error JSON.
 | 
						|
        """
 | 
						|
        self.login("hamlet@zulip.com")
 | 
						|
        result = self.client.post("/json/send_message", {"type": "private",
 | 
						|
                                                         "content": "Test message",
 | 
						|
                                                         "client": "test suite",
 | 
						|
                                                         "to": "nonexistent"})
 | 
						|
        self.assert_json_error(result, "Invalid email 'nonexistent'")
 | 
						|
 | 
						|
    def test_invalid_type(self):
 | 
						|
        """
 | 
						|
        Sending a message of unknown type returns error JSON.
 | 
						|
        """
 | 
						|
        self.login("hamlet@zulip.com")
 | 
						|
        result = self.client.post("/json/send_message", {"type": "invalid type",
 | 
						|
                                                         "content": "Test message",
 | 
						|
                                                         "client": "test suite",
 | 
						|
                                                         "to": "othello@zulip.com"})
 | 
						|
        self.assert_json_error(result, "Invalid message type")
 | 
						|
 | 
						|
    def test_empty_message(self):
 | 
						|
        """
 | 
						|
        Sending a message that is empty or only whitespace should fail
 | 
						|
        """
 | 
						|
        self.login("hamlet@zulip.com")
 | 
						|
        result = self.client.post("/json/send_message", {"type": "private",
 | 
						|
                                                         "content": " ",
 | 
						|
                                                         "client": "test suite",
 | 
						|
                                                         "to": "othello@zulip.com"})
 | 
						|
        self.assert_json_error(result, "Message must not be empty")
 | 
						|
 | 
						|
 | 
						|
    def test_mirrored_huddle(self):
 | 
						|
        """
 | 
						|
        Sending a mirrored huddle message works
 | 
						|
        """
 | 
						|
        self.login("starnine@mit.edu")
 | 
						|
        result = self.client.post("/json/send_message", {"type": "private",
 | 
						|
                                                         "sender": "sipbtest@mit.edu",
 | 
						|
                                                         "content": "Test message",
 | 
						|
                                                         "client": "zephyr_mirror",
 | 
						|
                                                         "to": ujson.dumps(["starnine@mit.edu",
 | 
						|
                                                                            "espuser@mit.edu"])})
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
    def test_mirrored_personal(self):
 | 
						|
        """
 | 
						|
        Sending a mirrored personal message works
 | 
						|
        """
 | 
						|
        self.login("starnine@mit.edu")
 | 
						|
        result = self.client.post("/json/send_message", {"type": "private",
 | 
						|
                                                         "sender": "sipbtest@mit.edu",
 | 
						|
                                                         "content": "Test message",
 | 
						|
                                                         "client": "zephyr_mirror",
 | 
						|
                                                         "to": "starnine@mit.edu"})
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
    def test_duplicated_mirrored_huddle(self):
 | 
						|
        """
 | 
						|
        Sending two mirrored huddles in the row return the same ID
 | 
						|
        """
 | 
						|
        msg = {"type": "private",
 | 
						|
               "sender": "sipbtest@mit.edu",
 | 
						|
               "content": "Test message",
 | 
						|
               "client": "zephyr_mirror",
 | 
						|
               "to": ujson.dumps(["sipbcert@mit.edu",
 | 
						|
                                  "starnine@mit.edu"])}
 | 
						|
 | 
						|
        self.login("starnine@mit.edu")
 | 
						|
        result1 = self.client.post("/json/send_message", msg)
 | 
						|
        self.login("sipbcert@mit.edu")
 | 
						|
        result2 = self.client.post("/json/send_message", msg)
 | 
						|
        self.assertEqual(ujson.loads(result1.content)['id'],
 | 
						|
                         ujson.loads(result2.content)['id'])
 | 
						|
 | 
						|
    def test_long_message(self):
 | 
						|
        """
 | 
						|
        Sending a message longer than the maximum message length succeeds but is
 | 
						|
        truncated.
 | 
						|
        """
 | 
						|
        self.login("hamlet@zulip.com")
 | 
						|
        long_message = "A" * (MAX_MESSAGE_LENGTH + 1)
 | 
						|
        post_data = {"type": "stream", "to": "Verona", "client": "test suite",
 | 
						|
                     "content": long_message, "subject": "Test subject"}
 | 
						|
        result = self.client.post("/json/send_message", post_data)
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
        sent_message = Message.objects.all().order_by('-id')[0]
 | 
						|
        self.assertEquals(sent_message.content,
 | 
						|
                          "A" * (MAX_MESSAGE_LENGTH - 3) + "...")
 | 
						|
 | 
						|
    def test_long_topic(self):
 | 
						|
        """
 | 
						|
        Sending a message with a topic longer than the maximum topic length
 | 
						|
        succeeds, but the topic is truncated.
 | 
						|
        """
 | 
						|
        self.login("hamlet@zulip.com")
 | 
						|
        long_topic = "A" * (MAX_SUBJECT_LENGTH + 1)
 | 
						|
        post_data = {"type": "stream", "to": "Verona", "client": "test suite",
 | 
						|
                     "content": "test content", "subject": long_topic}
 | 
						|
        result = self.client.post("/json/send_message", post_data)
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
        sent_message = Message.objects.all().order_by('-id')[0]
 | 
						|
        self.assertEquals(sent_message.subject,
 | 
						|
                          "A" * (MAX_SUBJECT_LENGTH - 3) + "...")
 | 
						|
 | 
						|
class GetOldMessagesTest(AuthedTestCase):
 | 
						|
 | 
						|
    def post_with_params(self, modified_params):
 | 
						|
        post_params = {"anchor": 1, "num_before": 1, "num_after": 1}
 | 
						|
        post_params.update(modified_params)
 | 
						|
        result = self.client.post("/json/get_old_messages", dict(post_params))
 | 
						|
        self.assert_json_success(result)
 | 
						|
        return ujson.loads(result.content)
 | 
						|
 | 
						|
    def check_well_formed_messages_response(self, result):
 | 
						|
        self.assertIn("messages", result)
 | 
						|
        self.assertIsInstance(result["messages"], list)
 | 
						|
        for message in result["messages"]:
 | 
						|
            for field in ("content", "content_type", "display_recipient",
 | 
						|
                          "avatar_url", "recipient_id", "sender_full_name",
 | 
						|
                          "sender_short_name", "timestamp"):
 | 
						|
                self.assertIn(field, message)
 | 
						|
            # TODO: deprecate soon in favor of avatar_url
 | 
						|
            self.assertIn('gravatar_hash', message)
 | 
						|
 | 
						|
    def get_query_ids(self):
 | 
						|
        realm = get_user_profile_by_email('hamlet@zulip.com').realm
 | 
						|
        query_ids = {}
 | 
						|
 | 
						|
        scotland_stream = get_stream('Scotland', realm)
 | 
						|
        query_ids['scotland_recipient'] = get_recipient(Recipient.STREAM, scotland_stream.id).id
 | 
						|
 | 
						|
        return query_ids
 | 
						|
 | 
						|
    def test_successful_get_old_messages(self):
 | 
						|
        """
 | 
						|
        A call to /json/get_old_messages with valid parameters returns a list of
 | 
						|
        messages.
 | 
						|
        """
 | 
						|
        self.login("hamlet@zulip.com")
 | 
						|
        result = self.post_with_params(dict())
 | 
						|
        self.check_well_formed_messages_response(result)
 | 
						|
 | 
						|
        # We have to support the legacy tuple style while there are old
 | 
						|
        # clients around, which might include third party home-grown bots.
 | 
						|
        narrow = [['pm-with', 'othello@zulip.com']]
 | 
						|
        result = self.post_with_params(dict(narrow=ujson.dumps(narrow)))
 | 
						|
        self.check_well_formed_messages_response(result)
 | 
						|
 | 
						|
        narrow = [dict(operator='pm-with', operand='othello@zulip.com')]
 | 
						|
        result = self.post_with_params(dict(narrow=ujson.dumps(narrow)))
 | 
						|
        self.check_well_formed_messages_response(result)
 | 
						|
 | 
						|
    def test_get_old_messages_with_narrow_pm_with(self):
 | 
						|
        """
 | 
						|
        A request for old messages with a narrow by pm-with only returns
 | 
						|
        conversations with that user.
 | 
						|
        """
 | 
						|
        me = 'hamlet@zulip.com'
 | 
						|
        def dr_emails(dr):
 | 
						|
            return ','.join(sorted(set([r['email'] for r in dr] + [me])))
 | 
						|
 | 
						|
        personals = [m for m in get_user_messages(get_user_profile_by_email(me))
 | 
						|
            if m.recipient.type == Recipient.PERSONAL
 | 
						|
            or m.recipient.type == Recipient.HUDDLE]
 | 
						|
        if not personals:
 | 
						|
            # FIXME: This is bad.  We should use test data that is guaranteed
 | 
						|
            # to contain some personals for every user.  See #617.
 | 
						|
            return
 | 
						|
        emails = dr_emails(get_display_recipient(personals[0].recipient))
 | 
						|
 | 
						|
        self.login(me)
 | 
						|
        narrow = [dict(operator='pm-with', operand=emails)]
 | 
						|
        result = self.post_with_params(dict(narrow=ujson.dumps(narrow)))
 | 
						|
        self.check_well_formed_messages_response(result)
 | 
						|
 | 
						|
        for message in result["messages"]:
 | 
						|
            self.assertEqual(dr_emails(message['display_recipient']), emails)
 | 
						|
 | 
						|
    def test_get_old_messages_with_narrow_stream(self):
 | 
						|
        """
 | 
						|
        A request for old messages with a narrow by stream only returns
 | 
						|
        messages for that stream.
 | 
						|
        """
 | 
						|
        self.login("hamlet@zulip.com")
 | 
						|
        # We need to susbcribe to a stream and then send a message to
 | 
						|
        # it to ensure that we actually have a stream message in this
 | 
						|
        # narrow view.
 | 
						|
        realm = Realm.objects.get(domain="zulip.com")
 | 
						|
        stream, _ = create_stream_if_needed(realm, "Scotland")
 | 
						|
        do_add_subscription(get_user_profile_by_email("hamlet@zulip.com"),
 | 
						|
                            stream, no_log=True)
 | 
						|
        self.send_message("hamlet@zulip.com", "Scotland", Recipient.STREAM)
 | 
						|
        messages = get_user_messages(get_user_profile_by_email("hamlet@zulip.com"))
 | 
						|
        stream_messages = filter(lambda msg: msg.recipient.type == Recipient.STREAM,
 | 
						|
                                 messages)
 | 
						|
        stream_name = get_display_recipient(stream_messages[0].recipient)
 | 
						|
        stream_id = stream_messages[0].recipient.id
 | 
						|
 | 
						|
        narrow = [dict(operator='stream', operand=stream_name)]
 | 
						|
        result = self.post_with_params(dict(narrow=ujson.dumps(narrow)))
 | 
						|
        self.check_well_formed_messages_response(result)
 | 
						|
 | 
						|
        for message in result["messages"]:
 | 
						|
            self.assertEqual(message["type"], "stream")
 | 
						|
            self.assertEqual(message["recipient_id"], stream_id)
 | 
						|
 | 
						|
    def test_get_old_messages_with_narrow_stream_mit_unicode_regex(self):
 | 
						|
        """
 | 
						|
        A request for old messages for a user in the mit.edu relam with unicode
 | 
						|
        stream name should be correctly escaped in the database query.
 | 
						|
        """
 | 
						|
        self.login("starnine@mit.edu")
 | 
						|
        # We need to susbcribe to a stream and then send a message to
 | 
						|
        # it to ensure that we actually have a stream message in this
 | 
						|
        # narrow view.
 | 
						|
        realm = Realm.objects.get(domain="mit.edu")
 | 
						|
        lambda_stream, _ = create_stream_if_needed(realm, u"\u03bb-stream")
 | 
						|
        do_add_subscription(get_user_profile_by_email("starnine@mit.edu"),
 | 
						|
                            lambda_stream, no_log=True)
 | 
						|
 | 
						|
        lambda_stream_d, _ = create_stream_if_needed(realm, u"\u03bb-stream.d")
 | 
						|
        do_add_subscription(get_user_profile_by_email("starnine@mit.edu"),
 | 
						|
                            lambda_stream_d, no_log=True)
 | 
						|
 | 
						|
        self.send_message("starnine@mit.edu", u"\u03bb-stream", Recipient.STREAM)
 | 
						|
        self.send_message("starnine@mit.edu", u"\u03bb-stream.d", Recipient.STREAM)
 | 
						|
 | 
						|
        narrow = [dict(operator='stream', operand=u'\u03bb-stream')]
 | 
						|
        result = self.post_with_params(dict(num_after=2, narrow=ujson.dumps(narrow)))
 | 
						|
        self.check_well_formed_messages_response(result)
 | 
						|
 | 
						|
        messages = get_user_messages(get_user_profile_by_email("starnine@mit.edu"))
 | 
						|
        stream_messages = filter(lambda msg: msg.recipient.type == Recipient.STREAM,
 | 
						|
                                 messages)
 | 
						|
 | 
						|
        self.assertEqual(len(result["messages"]), 2)
 | 
						|
        for i, message in enumerate(result["messages"]):
 | 
						|
            self.assertEqual(message["type"], "stream")
 | 
						|
            stream_id = stream_messages[i].recipient.id
 | 
						|
            self.assertEqual(message["recipient_id"], stream_id)
 | 
						|
 | 
						|
    def test_get_old_messages_with_narrow_topic_mit_unicode_regex(self):
 | 
						|
        """
 | 
						|
        A request for old messages for a user in the mit.edu relam with unicode
 | 
						|
        topic name should be correctly escaped in the database query.
 | 
						|
        """
 | 
						|
        self.login("starnine@mit.edu")
 | 
						|
        # We need to susbcribe to a stream and then send a message to
 | 
						|
        # it to ensure that we actually have a stream message in this
 | 
						|
        # narrow view.
 | 
						|
        realm = Realm.objects.get(domain="mit.edu")
 | 
						|
        stream, _ = create_stream_if_needed(realm, "Scotland")
 | 
						|
        do_add_subscription(get_user_profile_by_email("starnine@mit.edu"),
 | 
						|
                            stream, no_log=True)
 | 
						|
 | 
						|
        self.send_message("starnine@mit.edu", "Scotland", Recipient.STREAM,
 | 
						|
                          subject=u"\u03bb-topic")
 | 
						|
        self.send_message("starnine@mit.edu", "Scotland", Recipient.STREAM,
 | 
						|
                          subject=u"\u03bb-topic.d")
 | 
						|
 | 
						|
        narrow = [dict(operator='topic', operand=u'\u03bb-topic')]
 | 
						|
        result = self.post_with_params(dict(num_after=2, narrow=ujson.dumps(narrow)))
 | 
						|
        self.check_well_formed_messages_response(result)
 | 
						|
 | 
						|
        messages = get_user_messages(get_user_profile_by_email("starnine@mit.edu"))
 | 
						|
        stream_messages = filter(lambda msg: msg.recipient.type == Recipient.STREAM,
 | 
						|
                                 messages)
 | 
						|
        self.assertEqual(len(result["messages"]), 2)
 | 
						|
        for i, message in enumerate(result["messages"]):
 | 
						|
            self.assertEqual(message["type"], "stream")
 | 
						|
            stream_id = stream_messages[i].recipient.id
 | 
						|
            self.assertEqual(message["recipient_id"], stream_id)
 | 
						|
 | 
						|
 | 
						|
    def test_get_old_messages_with_narrow_sender(self):
 | 
						|
        """
 | 
						|
        A request for old messages with a narrow by sender only returns
 | 
						|
        messages sent by that person.
 | 
						|
        """
 | 
						|
        self.login("hamlet@zulip.com")
 | 
						|
        # We need to send a message here to ensure that we actually
 | 
						|
        # have a stream message in this narrow view.
 | 
						|
        self.send_message("hamlet@zulip.com", "Scotland", Recipient.STREAM)
 | 
						|
        self.send_message("othello@zulip.com", "Scotland", Recipient.STREAM)
 | 
						|
        self.send_message("othello@zulip.com", "hamlet@zulip.com", Recipient.PERSONAL)
 | 
						|
        self.send_message("iago@zulip.com", "Scotland", Recipient.STREAM)
 | 
						|
 | 
						|
        narrow = [dict(operator='sender', operand='othello@zulip.com')]
 | 
						|
        result = self.post_with_params(dict(narrow=ujson.dumps(narrow)))
 | 
						|
        self.check_well_formed_messages_response(result)
 | 
						|
 | 
						|
        for message in result["messages"]:
 | 
						|
            self.assertEqual(message["sender_email"], "othello@zulip.com")
 | 
						|
 | 
						|
    def test_get_old_messages_with_only_searching_anchor(self):
 | 
						|
        """
 | 
						|
        Test that specifying an anchor but 0 for num_before and num_after
 | 
						|
        returns at most 1 message.
 | 
						|
        """
 | 
						|
        self.login("cordelia@zulip.com")
 | 
						|
        anchor = self.send_message("cordelia@zulip.com", "Scotland", Recipient.STREAM)
 | 
						|
 | 
						|
        narrow = [dict(operator='sender', operand='cordelia@zulip.com')]
 | 
						|
        result = self.post_with_params(dict(narrow=ujson.dumps(narrow),
 | 
						|
                                            anchor=anchor, num_before=0,
 | 
						|
                                            num_after=0))
 | 
						|
        self.check_well_formed_messages_response(result)
 | 
						|
        self.assertEqual(len(result['messages']), 1)
 | 
						|
 | 
						|
        narrow = [dict(operator='is', operand='mentioned')]
 | 
						|
        result = self.post_with_params(dict(narrow=ujson.dumps(narrow),
 | 
						|
                                            anchor=anchor, num_before=0,
 | 
						|
                                            num_after=0))
 | 
						|
        self.check_well_formed_messages_response(result)
 | 
						|
        self.assertEqual(len(result['messages']), 0)
 | 
						|
 | 
						|
    def test_missing_params(self):
 | 
						|
        """
 | 
						|
        anchor, num_before, and num_after are all required
 | 
						|
        POST parameters for get_old_messages.
 | 
						|
        """
 | 
						|
        self.login("hamlet@zulip.com")
 | 
						|
 | 
						|
        required_args = (("anchor", 1), ("num_before", 1), ("num_after", 1))
 | 
						|
 | 
						|
        for i in range(len(required_args)):
 | 
						|
            post_params = dict(required_args[:i] + required_args[i + 1:])
 | 
						|
            result = self.client.post("/json/get_old_messages", post_params)
 | 
						|
            self.assert_json_error(result,
 | 
						|
                                   "Missing '%s' argument" % (required_args[i][0],))
 | 
						|
 | 
						|
    def test_bad_int_params(self):
 | 
						|
        """
 | 
						|
        num_before, num_after, and narrow must all be non-negative
 | 
						|
        integers or strings that can be converted to non-negative integers.
 | 
						|
        """
 | 
						|
        self.login("hamlet@zulip.com")
 | 
						|
 | 
						|
        other_params = [("narrow", {}), ("anchor", 0)]
 | 
						|
        int_params = ["num_before", "num_after"]
 | 
						|
 | 
						|
        bad_types = (False, "", "-1", -1)
 | 
						|
        for idx, param in enumerate(int_params):
 | 
						|
            for type in bad_types:
 | 
						|
                # Rotate through every bad type for every integer
 | 
						|
                # parameter, one at a time.
 | 
						|
                post_params = dict(other_params + [(param, type)] + \
 | 
						|
                                       [(other_param, 0) for other_param in \
 | 
						|
                                            int_params[:idx] + int_params[idx + 1:]]
 | 
						|
                                   )
 | 
						|
                result = self.client.post("/json/get_old_messages", post_params)
 | 
						|
                self.assert_json_error(result,
 | 
						|
                                       "Bad value for '%s': %s" % (param, type))
 | 
						|
 | 
						|
    def test_bad_narrow_type(self):
 | 
						|
        """
 | 
						|
        narrow must be a list of string pairs.
 | 
						|
        """
 | 
						|
        self.login("hamlet@zulip.com")
 | 
						|
 | 
						|
        other_params = [("anchor", 0), ("num_before", 0), ("num_after", 0)]
 | 
						|
 | 
						|
        bad_types = (False, 0, '', '{malformed json,',
 | 
						|
            '{foo: 3}', '[1,2]', '[["x","y","z"]]')
 | 
						|
        for type in bad_types:
 | 
						|
            post_params = dict(other_params + [("narrow", type)])
 | 
						|
            result = self.client.post("/json/get_old_messages", post_params)
 | 
						|
            self.assert_json_error(result,
 | 
						|
                                   "Bad value for 'narrow': %s" % (type,))
 | 
						|
 | 
						|
    def test_old_empty_narrow(self):
 | 
						|
        """
 | 
						|
        '{}' is accepted to mean 'no narrow', for use by old mobile clients.
 | 
						|
        """
 | 
						|
        self.login("hamlet@zulip.com")
 | 
						|
        all_result    = self.post_with_params({})
 | 
						|
        narrow_result = self.post_with_params({'narrow': '{}'})
 | 
						|
 | 
						|
        for r in (all_result, narrow_result):
 | 
						|
            self.check_well_formed_messages_response(r)
 | 
						|
 | 
						|
        self.assertEqual(message_ids(all_result), message_ids(narrow_result))
 | 
						|
 | 
						|
    def test_bad_narrow_operator(self):
 | 
						|
        """
 | 
						|
        Unrecognized narrow operators are rejected.
 | 
						|
        """
 | 
						|
        self.login("hamlet@zulip.com")
 | 
						|
        for operator in ['', 'foo', 'stream:verona', '__init__']:
 | 
						|
            narrow = [dict(operator=operator, operand='')]
 | 
						|
            params = dict(anchor=0, num_before=0, num_after=0, narrow=ujson.dumps(narrow))
 | 
						|
            result = self.client.post("/json/get_old_messages", params)
 | 
						|
            self.assert_json_error_contains(result,
 | 
						|
                "Invalid narrow operator: unknown operator")
 | 
						|
 | 
						|
    def exercise_bad_narrow_operand(self, operator, operands, error_msg):
 | 
						|
        other_params = [("anchor", 0), ("num_before", 0), ("num_after", 0)]
 | 
						|
        for operand in operands:
 | 
						|
            post_params = dict(other_params + [
 | 
						|
                ("narrow", ujson.dumps([[operator, operand]]))])
 | 
						|
            result = self.client.post("/json/get_old_messages", post_params)
 | 
						|
            self.assert_json_error_contains(result, error_msg)
 | 
						|
 | 
						|
    def test_bad_narrow_stream_content(self):
 | 
						|
        """
 | 
						|
        If an invalid stream name is requested in get_old_messages, an error is
 | 
						|
        returned.
 | 
						|
        """
 | 
						|
        self.login("hamlet@zulip.com")
 | 
						|
        bad_stream_content = (0, [], ["x", "y"])
 | 
						|
        self.exercise_bad_narrow_operand("stream", bad_stream_content,
 | 
						|
            "Bad value for 'narrow'")
 | 
						|
 | 
						|
    def test_bad_narrow_one_on_one_email_content(self):
 | 
						|
        """
 | 
						|
        If an invalid 'pm-with' is requested in get_old_messages, an
 | 
						|
        error is returned.
 | 
						|
        """
 | 
						|
        self.login("hamlet@zulip.com")
 | 
						|
        bad_stream_content = (0, [], ["x","y"])
 | 
						|
        self.exercise_bad_narrow_operand("pm-with", bad_stream_content,
 | 
						|
            "Bad value for 'narrow'")
 | 
						|
 | 
						|
    def test_bad_narrow_nonexistent_stream(self):
 | 
						|
        self.login("hamlet@zulip.com")
 | 
						|
        self.exercise_bad_narrow_operand("stream", ['non-existent stream'],
 | 
						|
            "Invalid narrow operator: unknown stream")
 | 
						|
 | 
						|
    def test_bad_narrow_nonexistent_email(self):
 | 
						|
        self.login("hamlet@zulip.com")
 | 
						|
        self.exercise_bad_narrow_operand("pm-with", ['non-existent-user@zulip.com'],
 | 
						|
            "Invalid narrow operator: unknown user")
 | 
						|
 | 
						|
    def test_message_without_rendered_content(self):
 | 
						|
        """Older messages may not have rendered_content in the database"""
 | 
						|
        m = Message.objects.all().order_by('-id')[0]
 | 
						|
        m.rendered_content = m.rendered_content_version = None
 | 
						|
        m.content = 'test content'
 | 
						|
        # Use to_dict_uncached directly to avoid having to deal with memcached
 | 
						|
        d = m.to_dict_uncached(True)
 | 
						|
        self.assertEqual(d['content'], '<p>test content</p>')
 | 
						|
 | 
						|
    def common_check_get_old_messages_query(self, query_params, expected):
 | 
						|
        user_profile = get_user_profile_by_email("hamlet@zulip.com")
 | 
						|
        request = POSTRequestMock(query_params, user_profile)
 | 
						|
        with queries_captured() as queries:
 | 
						|
            get_old_messages_backend(request, user_profile)
 | 
						|
 | 
						|
        for query in queries:
 | 
						|
            if "/* get_old_messages */" in query['sql']:
 | 
						|
                sql = query['sql'].replace(" /* get_old_messages */", '')
 | 
						|
                self.assertEqual(sql, expected)
 | 
						|
                return
 | 
						|
        self.fail("get_old_messages query not found")
 | 
						|
 | 
						|
    def test_use_first_unread_anchor(self):
 | 
						|
        realm = get_realm('zulip.com')
 | 
						|
        create_stream_if_needed(realm, 'devel')
 | 
						|
        user_profile = get_user_profile_by_email("hamlet@zulip.com")
 | 
						|
        user_profile.muted_topics = ujson.dumps([['Scotland', 'golf'], ['devel', 'css'], ['bogus', 'bogus']])
 | 
						|
        user_profile.save()
 | 
						|
 | 
						|
        query_params = dict(
 | 
						|
            use_first_unread_anchor='true',
 | 
						|
            anchor=0,
 | 
						|
            num_before=0,
 | 
						|
            num_after=0,
 | 
						|
            narrow='[["stream", "Scotland"]]'
 | 
						|
        )
 | 
						|
        request = POSTRequestMock(query_params, user_profile)
 | 
						|
 | 
						|
        with queries_captured() as queries:
 | 
						|
            get_old_messages_backend(request, user_profile)
 | 
						|
 | 
						|
        queries = filter(lambda q: q['sql'].startswith("SELECT message_id, flags"), queries)
 | 
						|
 | 
						|
        ids = {}
 | 
						|
        for stream_name in ['Scotland']:
 | 
						|
            stream = get_stream(stream_name, realm)
 | 
						|
            ids[stream_name] = get_recipient(Recipient.STREAM, stream.id).id
 | 
						|
 | 
						|
        cond = '''AND NOT (recipient_id = {Scotland} AND upper(subject) = upper('golf'))'''
 | 
						|
        cond = cond.format(**ids)
 | 
						|
        self.assertTrue(cond in queries[0]['sql'])
 | 
						|
 | 
						|
    def test_exclude_muting_conditions(self):
 | 
						|
        realm = get_realm('zulip.com')
 | 
						|
        create_stream_if_needed(realm, 'devel')
 | 
						|
        user_profile = get_user_profile_by_email("hamlet@zulip.com")
 | 
						|
        user_profile.muted_topics = ujson.dumps([['Scotland', 'golf'], ['devel', 'css'], ['bogus', 'bogus']])
 | 
						|
        user_profile.save()
 | 
						|
 | 
						|
        narrow = [
 | 
						|
            dict(operator='stream', operand='Scotland'),
 | 
						|
        ]
 | 
						|
 | 
						|
        muting_conditions = exclude_muting_conditions(user_profile, narrow)
 | 
						|
        query = select([column("id").label("message_id")], None, "zerver_message")
 | 
						|
        query = query.where(*muting_conditions)
 | 
						|
        expected_query = '''
 | 
						|
            SELECT id AS message_id
 | 
						|
            FROM zerver_message
 | 
						|
            WHERE NOT (recipient_id = :recipient_id_1 AND upper(subject) = upper(:upper_1))
 | 
						|
            '''
 | 
						|
        self.assertEqual(fix_ws(query), fix_ws(expected_query))
 | 
						|
        params = get_sqlalchemy_query_params(query)
 | 
						|
 | 
						|
        self.assertEqual(params['recipient_id_1'], get_recipient_id_for_stream_name(realm, 'Scotland'))
 | 
						|
        self.assertEqual(params['upper_1'], 'golf')
 | 
						|
 | 
						|
        mute_stream(realm, user_profile, 'Verona')
 | 
						|
        narrow = []
 | 
						|
        muting_conditions = exclude_muting_conditions(user_profile, narrow)
 | 
						|
        query = select([column("id")], None, "zerver_message")
 | 
						|
        query = query.where(and_(*muting_conditions))
 | 
						|
 | 
						|
        expected_query = '''
 | 
						|
            SELECT id
 | 
						|
            FROM zerver_message
 | 
						|
            WHERE recipient_id NOT IN (:recipient_id_1)
 | 
						|
            AND NOT
 | 
						|
               (recipient_id = :recipient_id_2 AND upper(subject) = upper(:upper_1) OR
 | 
						|
                recipient_id = :recipient_id_3 AND upper(subject) = upper(:upper_2))'''
 | 
						|
        self.assertEqual(fix_ws(query), fix_ws(expected_query))
 | 
						|
        params = get_sqlalchemy_query_params(query)
 | 
						|
        self.assertEqual(params['recipient_id_1'], get_recipient_id_for_stream_name(realm, 'Verona'))
 | 
						|
        self.assertEqual(params['recipient_id_2'], get_recipient_id_for_stream_name(realm, 'Scotland'))
 | 
						|
        self.assertEqual(params['upper_1'], 'golf')
 | 
						|
        self.assertEqual(params['recipient_id_3'], get_recipient_id_for_stream_name(realm, 'devel'))
 | 
						|
        self.assertEqual(params['upper_2'], 'css')
 | 
						|
 | 
						|
    def test_get_old_messages_queries(self):
 | 
						|
        self.common_check_get_old_messages_query({'anchor': 0, 'num_before': 0, 'num_after': 10},
 | 
						|
                                                 'SELECT anon_1.message_id, anon_1.flags \nFROM (SELECT message_id, flags \nFROM zerver_usermessage \nWHERE user_profile_id = 4 AND message_id >= 0 ORDER BY message_id ASC \n LIMIT 11) AS anon_1 ORDER BY message_id ASC')
 | 
						|
        self.common_check_get_old_messages_query({'anchor': 100, 'num_before': 10, 'num_after': 0},
 | 
						|
                                                 'SELECT anon_1.message_id, anon_1.flags \nFROM (SELECT message_id, flags \nFROM zerver_usermessage \nWHERE user_profile_id = 4 AND message_id <= 100 ORDER BY message_id DESC \n LIMIT 11) AS anon_1 ORDER BY message_id ASC')
 | 
						|
        self.common_check_get_old_messages_query({'anchor': 100, 'num_before': 10, 'num_after': 10},
 | 
						|
                                                 'SELECT anon_1.message_id, anon_1.flags \nFROM ((SELECT message_id, flags \nFROM zerver_usermessage \nWHERE user_profile_id = 4 AND message_id <= 99 ORDER BY message_id DESC \n LIMIT 10) UNION ALL (SELECT message_id, flags \nFROM zerver_usermessage \nWHERE user_profile_id = 4 AND message_id >= 100 ORDER BY message_id ASC \n LIMIT 11)) AS anon_1 ORDER BY message_id ASC')
 | 
						|
 | 
						|
    def test_get_old_messages_with_narrow_queries(self):
 | 
						|
        query_ids = self.get_query_ids()
 | 
						|
 | 
						|
        sql = 'SELECT anon_1.message_id, anon_1.flags \nFROM (SELECT message_id, flags \nFROM zerver_usermessage JOIN zerver_message ON zerver_usermessage.message_id = zerver_message.id \nWHERE user_profile_id = 4 AND (sender_id = 3 AND recipient_id = 4 OR sender_id = 4 AND recipient_id = 3) AND message_id >= 0 ORDER BY message_id ASC \n LIMIT 10) AS anon_1 ORDER BY message_id ASC'
 | 
						|
        self.common_check_get_old_messages_query({'anchor': 0, 'num_before': 0, 'num_after': 10,
 | 
						|
                                                  'narrow': '[["pm-with", "othello@zulip.com"]]'},
 | 
						|
                                                 sql)
 | 
						|
 | 
						|
        sql = 'SELECT anon_1.message_id, anon_1.flags \nFROM (SELECT message_id, flags \nFROM zerver_usermessage JOIN zerver_message ON zerver_usermessage.message_id = zerver_message.id \nWHERE user_profile_id = 4 AND (flags & 2) != 0 AND message_id >= 0 ORDER BY message_id ASC \n LIMIT 10) AS anon_1 ORDER BY message_id ASC'
 | 
						|
        self.common_check_get_old_messages_query({'anchor': 0, 'num_before': 0, 'num_after': 10,
 | 
						|
                                                  'narrow': '[["is", "starred"]]'},
 | 
						|
                                                 sql)
 | 
						|
 | 
						|
        sql = 'SELECT anon_1.message_id, anon_1.flags \nFROM (SELECT message_id, flags \nFROM zerver_usermessage JOIN zerver_message ON zerver_usermessage.message_id = zerver_message.id \nWHERE user_profile_id = 4 AND sender_id = 3 AND message_id >= 0 ORDER BY message_id ASC \n LIMIT 10) AS anon_1 ORDER BY message_id ASC'
 | 
						|
        self.common_check_get_old_messages_query({'anchor': 0, 'num_before': 0, 'num_after': 10,
 | 
						|
                                                  'narrow': '[["sender", "othello@zulip.com"]]'},
 | 
						|
                                                sql)
 | 
						|
 | 
						|
        sql_template = 'SELECT anon_1.message_id \nFROM (SELECT id AS message_id \nFROM zerver_message \nWHERE recipient_id = {scotland_recipient} AND zerver_message.id >= 0 ORDER BY zerver_message.id ASC \n LIMIT 10) AS anon_1 ORDER BY message_id ASC'
 | 
						|
        sql = sql_template.format(**query_ids)
 | 
						|
        self.common_check_get_old_messages_query({'anchor': 0, 'num_before': 0, 'num_after': 10,
 | 
						|
                                                  'narrow': '[["stream", "Scotland"]]'},
 | 
						|
                                                 sql)
 | 
						|
 | 
						|
        sql = "SELECT anon_1.message_id, anon_1.flags \nFROM (SELECT message_id, flags \nFROM zerver_usermessage JOIN zerver_message ON zerver_usermessage.message_id = zerver_message.id \nWHERE user_profile_id = 4 AND upper(subject) = upper('blah') AND message_id >= 0 ORDER BY message_id ASC \n LIMIT 10) AS anon_1 ORDER BY message_id ASC"
 | 
						|
        self.common_check_get_old_messages_query({'anchor': 0, 'num_before': 0, 'num_after': 10,
 | 
						|
                                                  'narrow': '[["topic", "blah"]]'},
 | 
						|
                                                 sql)
 | 
						|
 | 
						|
        sql_template = "SELECT anon_1.message_id \nFROM (SELECT id AS message_id \nFROM zerver_message \nWHERE recipient_id = {scotland_recipient} AND upper(subject) = upper('blah') AND zerver_message.id >= 0 ORDER BY zerver_message.id ASC \n LIMIT 10) AS anon_1 ORDER BY message_id ASC"
 | 
						|
        sql = sql_template.format(**query_ids)
 | 
						|
        self.common_check_get_old_messages_query({'anchor': 0, 'num_before': 0, 'num_after': 10,
 | 
						|
                                                  'narrow': '[["stream", "Scotland"], ["topic", "blah"]]'},
 | 
						|
                                                 sql)
 | 
						|
        # Narrow to pms with yourself
 | 
						|
        sql = 'SELECT anon_1.message_id, anon_1.flags \nFROM (SELECT message_id, flags \nFROM zerver_usermessage JOIN zerver_message ON zerver_usermessage.message_id = zerver_message.id \nWHERE user_profile_id = 4 AND sender_id = 4 AND recipient_id = 4 AND message_id >= 0 ORDER BY message_id ASC \n LIMIT 10) AS anon_1 ORDER BY message_id ASC'
 | 
						|
        self.common_check_get_old_messages_query({'anchor': 0, 'num_before': 0, 'num_after': 10,
 | 
						|
                                                  'narrow': '[["pm-with", "hamlet@zulip.com"]]'},
 | 
						|
                                                sql)
 | 
						|
 | 
						|
        sql_template = 'SELECT anon_1.message_id, anon_1.flags \nFROM (SELECT message_id, flags \nFROM zerver_usermessage JOIN zerver_message ON zerver_usermessage.message_id = zerver_message.id \nWHERE user_profile_id = 4 AND recipient_id = {scotland_recipient} AND (flags & 2) != 0 AND message_id >= 0 ORDER BY message_id ASC \n LIMIT 10) AS anon_1 ORDER BY message_id ASC'
 | 
						|
        sql = sql_template.format(**query_ids)
 | 
						|
        self.common_check_get_old_messages_query({'anchor': 0, 'num_before': 0, 'num_after': 10,
 | 
						|
                                                  'narrow': '[["stream", "Scotland"], ["is", "starred"]]'},
 | 
						|
                                                 sql)
 | 
						|
 | 
						|
    def test_get_old_messages_with_search_queries(self):
 | 
						|
        query_ids = self.get_query_ids()
 | 
						|
 | 
						|
        sql = "SELECT anon_1.message_id, anon_1.flags, anon_1.subject, anon_1.rendered_content, anon_1.content_matches, anon_1.subject_matches \nFROM (SELECT message_id, flags, subject, rendered_content, ts_match_locs_array('zulip.english_us_search', rendered_content, plainto_tsquery('zulip.english_us_search', 'jumping')) AS content_matches, ts_match_locs_array('zulip.english_us_search', escape_html(subject), plainto_tsquery('zulip.english_us_search', 'jumping')) AS subject_matches \nFROM zerver_usermessage JOIN zerver_message ON zerver_usermessage.message_id = zerver_message.id \nWHERE user_profile_id = 4 AND (search_tsvector @@ plainto_tsquery('zulip.english_us_search', 'jumping')) AND message_id >= 0 ORDER BY message_id ASC \n LIMIT 10) AS anon_1 ORDER BY message_id ASC"
 | 
						|
        self.common_check_get_old_messages_query({'anchor': 0, 'num_before': 0, 'num_after': 10,
 | 
						|
                                                  'narrow': '[["search", "jumping"]]'},
 | 
						|
                                                 sql)
 | 
						|
 | 
						|
        sql_template = "SELECT anon_1.message_id, anon_1.subject, anon_1.rendered_content, anon_1.content_matches, anon_1.subject_matches \nFROM (SELECT id AS message_id, subject, rendered_content, ts_match_locs_array('zulip.english_us_search', rendered_content, plainto_tsquery('zulip.english_us_search', 'jumping')) AS content_matches, ts_match_locs_array('zulip.english_us_search', escape_html(subject), plainto_tsquery('zulip.english_us_search', 'jumping')) AS subject_matches \nFROM zerver_message \nWHERE recipient_id = {scotland_recipient} AND (search_tsvector @@ plainto_tsquery('zulip.english_us_search', 'jumping')) AND zerver_message.id >= 0 ORDER BY zerver_message.id ASC \n LIMIT 10) AS anon_1 ORDER BY message_id ASC"
 | 
						|
        sql = sql_template.format(**query_ids)
 | 
						|
        self.common_check_get_old_messages_query({'anchor': 0, 'num_before': 0, 'num_after': 10,
 | 
						|
                                                  'narrow': '[["stream", "Scotland"], ["search", "jumping"]]'},
 | 
						|
                                                 sql)
 | 
						|
 | 
						|
        sql = 'SELECT anon_1.message_id, anon_1.flags, anon_1.subject, anon_1.rendered_content, anon_1.content_matches, anon_1.subject_matches \nFROM (SELECT message_id, flags, subject, rendered_content, ts_match_locs_array(\'zulip.english_us_search\', rendered_content, plainto_tsquery(\'zulip.english_us_search\', \'"jumping" quickly\')) AS content_matches, ts_match_locs_array(\'zulip.english_us_search\', escape_html(subject), plainto_tsquery(\'zulip.english_us_search\', \'"jumping" quickly\')) AS subject_matches \nFROM zerver_usermessage JOIN zerver_message ON zerver_usermessage.message_id = zerver_message.id \nWHERE user_profile_id = 4 AND (content ILIKE \'%jumping%\' OR subject ILIKE \'%jumping%\') AND (search_tsvector @@ plainto_tsquery(\'zulip.english_us_search\', \'"jumping" quickly\')) AND message_id >= 0 ORDER BY message_id ASC \n LIMIT 10) AS anon_1 ORDER BY message_id ASC'
 | 
						|
        self.common_check_get_old_messages_query({'anchor': 0, 'num_before': 0, 'num_after': 10,
 | 
						|
                                                  'narrow': '[["search", "\\"jumping\\" quickly"]]'},
 | 
						|
                                                 sql)
 | 
						|
 | 
						|
 | 
						|
class EditMessageTest(AuthedTestCase):
 | 
						|
    def check_message(self, msg_id, subject=None, content=None):
 | 
						|
        msg = Message.objects.get(id=msg_id)
 | 
						|
        cached = msg.to_dict(False)
 | 
						|
        uncached = msg.to_dict_uncached(False)
 | 
						|
        self.assertEqual(cached, uncached)
 | 
						|
        if subject:
 | 
						|
            self.assertEqual(msg.subject, subject)
 | 
						|
        if content:
 | 
						|
            self.assertEqual(msg.content, content)
 | 
						|
        return msg
 | 
						|
 | 
						|
    def test_save_message(self):
 | 
						|
        # This is also tested by a client test, but here we can verify
 | 
						|
        # the cache against the database
 | 
						|
        self.login("hamlet@zulip.com")
 | 
						|
        msg_id = self.send_message("hamlet@zulip.com", "Scotland", Recipient.STREAM,
 | 
						|
            subject="editing", content="before edit")
 | 
						|
        result = self.client.post("/json/update_message", {
 | 
						|
            'message_id': msg_id,
 | 
						|
            'content': 'after edit'
 | 
						|
        })
 | 
						|
        self.assert_json_success(result)
 | 
						|
        self.check_message(msg_id, content="after edit")
 | 
						|
 | 
						|
        result = self.client.post("/json/update_message", {
 | 
						|
            'message_id': msg_id,
 | 
						|
            'subject': 'edited'
 | 
						|
        })
 | 
						|
        self.assert_json_success(result)
 | 
						|
        self.check_message(msg_id, subject="edited")
 | 
						|
 | 
						|
    def test_propagate_topic_forward(self):
 | 
						|
        self.login("hamlet@zulip.com")
 | 
						|
        id1 = self.send_message("hamlet@zulip.com", "Scotland", Recipient.STREAM,
 | 
						|
            subject="topic1")
 | 
						|
        id2 = self.send_message("iago@zulip.com", "Scotland", Recipient.STREAM,
 | 
						|
            subject="topic1")
 | 
						|
        id3 = self.send_message("iago@zulip.com", "Rome", Recipient.STREAM,
 | 
						|
            subject="topic1")
 | 
						|
        id4 = self.send_message("hamlet@zulip.com", "Scotland", Recipient.STREAM,
 | 
						|
            subject="topic2")
 | 
						|
        id5 = self.send_message("iago@zulip.com", "Scotland", Recipient.STREAM,
 | 
						|
            subject="topic1")
 | 
						|
 | 
						|
        result = self.client.post("/json/update_message", {
 | 
						|
            'message_id': id1,
 | 
						|
            'subject': 'edited',
 | 
						|
            'propagate_mode': 'change_later'
 | 
						|
        })
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
        self.check_message(id1, subject="edited")
 | 
						|
        self.check_message(id2, subject="edited")
 | 
						|
        self.check_message(id3, subject="topic1")
 | 
						|
        self.check_message(id4, subject="topic2")
 | 
						|
        self.check_message(id5, subject="edited")
 | 
						|
 | 
						|
    def test_propagate_all_topics(self):
 | 
						|
        self.login("hamlet@zulip.com")
 | 
						|
        id1 = self.send_message("hamlet@zulip.com", "Scotland", Recipient.STREAM,
 | 
						|
            subject="topic1")
 | 
						|
        id2 = self.send_message("hamlet@zulip.com", "Scotland", Recipient.STREAM,
 | 
						|
            subject="topic1")
 | 
						|
        id3 = self.send_message("iago@zulip.com", "Rome", Recipient.STREAM,
 | 
						|
            subject="topic1")
 | 
						|
        id4 = self.send_message("hamlet@zulip.com", "Scotland", Recipient.STREAM,
 | 
						|
            subject="topic2")
 | 
						|
        id5 = self.send_message("iago@zulip.com", "Scotland", Recipient.STREAM,
 | 
						|
            subject="topic1")
 | 
						|
        id6 = self.send_message("iago@zulip.com", "Scotland", Recipient.STREAM,
 | 
						|
            subject="topic3")
 | 
						|
 | 
						|
        result = self.client.post("/json/update_message", {
 | 
						|
            'message_id': id2,
 | 
						|
            'subject': 'edited',
 | 
						|
            'propagate_mode': 'change_all'
 | 
						|
        })
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
        self.check_message(id1, subject="edited")
 | 
						|
        self.check_message(id2, subject="edited")
 | 
						|
        self.check_message(id3, subject="topic1")
 | 
						|
        self.check_message(id4, subject="topic2")
 | 
						|
        self.check_message(id5, subject="edited")
 | 
						|
        self.check_message(id6, subject="topic3")
 | 
						|
 | 
						|
class StarTests(AuthedTestCase):
 | 
						|
 | 
						|
    def change_star(self, messages, add=True):
 | 
						|
        return self.client.post("/json/update_message_flags",
 | 
						|
                                {"messages": ujson.dumps(messages),
 | 
						|
                                 "op": "add" if add else "remove",
 | 
						|
                                 "flag": "starred"})
 | 
						|
 | 
						|
    def test_change_star(self):
 | 
						|
        """
 | 
						|
        You can set a message as starred/un-starred through
 | 
						|
        /json/update_message_flags.
 | 
						|
        """
 | 
						|
        self.login("hamlet@zulip.com")
 | 
						|
        message_ids = [self.send_message("hamlet@zulip.com", "hamlet@zulip.com",
 | 
						|
                                         Recipient.PERSONAL, "test")]
 | 
						|
 | 
						|
        # Star a message.
 | 
						|
        result = self.change_star(message_ids)
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
        for msg in self.get_old_messages():
 | 
						|
            if msg['id'] in message_ids:
 | 
						|
                self.assertEqual(msg['flags'], ['starred'])
 | 
						|
            else:
 | 
						|
                self.assertEqual(msg['flags'], ['read'])
 | 
						|
 | 
						|
        result = self.change_star(message_ids, False)
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
        # Remove the stars.
 | 
						|
        for msg in self.get_old_messages():
 | 
						|
            if msg['id'] in message_ids:
 | 
						|
                self.assertEqual(msg['flags'], [])
 | 
						|
 | 
						|
    def test_new_message(self):
 | 
						|
        """
 | 
						|
        New messages aren't starred.
 | 
						|
        """
 | 
						|
        test_email = "hamlet@zulip.com"
 | 
						|
        self.login(test_email)
 | 
						|
        content = "Test message for star"
 | 
						|
        self.send_message(test_email, "Verona", Recipient.STREAM,
 | 
						|
                          content=content)
 | 
						|
 | 
						|
        sent_message = UserMessage.objects.filter(
 | 
						|
            user_profile=get_user_profile_by_email(test_email)
 | 
						|
            ).order_by("id").reverse()[0]
 | 
						|
        self.assertEqual(sent_message.message.content, content)
 | 
						|
        self.assertFalse(sent_message.flags.starred)
 | 
						|
 | 
						|
class AttachmentTest(TestCase):
 | 
						|
    def test_basics(self):
 | 
						|
        self.assertFalse(Message.content_has_attachment('whatever'))
 | 
						|
        self.assertFalse(Message.content_has_attachment('yo http://foo.com'))
 | 
						|
        self.assertTrue(Message.content_has_attachment('yo\n https://staging.zulip.com/user_uploads/'))
 | 
						|
        self.assertTrue(Message.content_has_attachment('yo\n /user_uploads/1/wEAnI-PEmVmCjo15xxNaQbnj/photo-10.jpg foo'))
 | 
						|
 | 
						|
        self.assertFalse(Message.content_has_image('whatever'))
 | 
						|
        self.assertFalse(Message.content_has_image('yo http://foo.com'))
 | 
						|
        self.assertFalse(Message.content_has_image('yo\n /user_uploads/1/wEAnI-PEmVmCjo15xxNaQbnj/photo-10.pdf foo'))
 | 
						|
        for ext in [".bmp", ".gif", ".jpg", "jpeg", ".png", ".webp", ".JPG"]:
 | 
						|
            content = 'yo\n /user_uploads/1/wEAnI-PEmVmCjo15xxNaQbnj/photo-10.%s foo' % (ext,)
 | 
						|
            self.assertTrue(Message.content_has_image(content))
 | 
						|
 | 
						|
        self.assertFalse(Message.content_has_link('whatever'))
 | 
						|
        self.assertTrue(Message.content_has_link('yo\n http://foo.com'))
 | 
						|
        self.assertTrue(Message.content_has_link('yo\n https://example.com?spam=1&eggs=2'))
 | 
						|
        self.assertTrue(Message.content_has_link('yo /user_uploads/1/wEAnI-PEmVmCjo15xxNaQbnj/photo-10.pdf foo'))
 | 
						|
 | 
						|
class CheckMessageTest(AuthedTestCase):
 | 
						|
    def test_basic_check_message_call(self):
 | 
						|
        sender = get_user_profile_by_email('othello@zulip.com')
 | 
						|
        client, _ = Client.objects.get_or_create(name="test suite")
 | 
						|
        stream_name = 'integration'
 | 
						|
        stream, _ = create_stream_if_needed(Realm.objects.get(domain="zulip.com"), stream_name)
 | 
						|
        message_type_name = 'stream'
 | 
						|
        message_to = None
 | 
						|
        message_to = [stream_name]
 | 
						|
        subject_name = 'issue'
 | 
						|
        message_content = 'whatever'
 | 
						|
        ret = check_message(sender, client, message_type_name, message_to,
 | 
						|
                      subject_name, message_content)
 | 
						|
        self.assertEqual(ret['message'].sender.email, 'othello@zulip.com')
 | 
						|
 | 
						|
    def test_bot_pm_feature(self):
 | 
						|
        # We send a PM to a bot's owner if their bot sends a message to
 | 
						|
        # an unsubscribed stream
 | 
						|
        parent = get_user_profile_by_email('othello@zulip.com')
 | 
						|
        bot = do_create_user(
 | 
						|
                email='othello-bot@zulip.com',
 | 
						|
                password='',
 | 
						|
                realm=parent.realm,
 | 
						|
                full_name='',
 | 
						|
                short_name='',
 | 
						|
                active=True,
 | 
						|
                bot=True,
 | 
						|
                bot_owner=parent
 | 
						|
        )
 | 
						|
        bot.last_reminder = None
 | 
						|
 | 
						|
        sender = bot
 | 
						|
        client, _ = Client.objects.get_or_create(name="test suite")
 | 
						|
        stream_name = 'integration'
 | 
						|
        stream, _ = create_stream_if_needed(Realm.objects.get(domain="zulip.com"), stream_name)
 | 
						|
        message_type_name = 'stream'
 | 
						|
        message_to = None
 | 
						|
        message_to = [stream_name]
 | 
						|
        subject_name = 'issue'
 | 
						|
        message_content = 'whatever'
 | 
						|
        old_count = message_stream_count(parent)
 | 
						|
        ret = check_message(sender, client, message_type_name, message_to,
 | 
						|
                      subject_name, message_content)
 | 
						|
        new_count = message_stream_count(parent)
 | 
						|
        self.assertEqual(new_count, old_count + 1)
 | 
						|
        self.assertEqual(ret['message'].sender.email, 'othello-bot@zulip.com')
 | 
						|
 |