mypy: Annotate zerver/tests/test_narrow.py

This commit is contained in:
Zac Pullar-Strecker
2016-12-08 08:39:48 +00:00
committed by showell
parent eb35d53ed1
commit 4eb6adf547
2 changed files with 111 additions and 26 deletions

View File

@@ -41,7 +41,6 @@ tools/deprecated/inject-messages/inject-messages
zproject/settings.py
zproject/test_settings.py
zerver/tests/test_decorators.py
zerver/tests/test_narrow.py
""".split()
# We don't run mypy on contrib_bots, since the code there will

View File

@@ -1,12 +1,14 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import
from __future__ import print_function
from django.db import connection
from django.test import override_settings
from sqlalchemy.sql import (
and_, select, column, compiler
and_, select, column,
)
from sqlalchemy.sql import compiler # type: ignore
from zerver.models import (
Realm, Recipient, Stream, Subscription, UserProfile, Attachment,
@@ -18,6 +20,7 @@ from zerver.lib.message import (
from zerver.lib.narrow import (
build_narrow_filter,
)
from zerver.lib.str_utils import force_bytes
from zerver.lib.sqlalchemy_utils import get_sqlalchemy_connection
from zerver.lib.test_helpers import (
POSTRequestMock,
@@ -30,29 +33,33 @@ from zerver.lib.test_classes import (
from zerver.views.messages import (
exclude_muting_conditions,
get_old_messages_backend, ok_to_include_history,
NarrowBuilder, BadNarrowOperator
NarrowBuilder, BadNarrowOperator, Query
)
from typing import Text
from typing import Mapping, Sequence, Tuple, Generic, Union, Any, Text
from six.moves import range
import os
import re
import ujson
def get_sqlalchemy_query_params(query):
dialect = get_sqlalchemy_connection().dialect
# type: (Text) -> Dict[Text, Text]
dialect = get_sqlalchemy_connection().dialect # type: ignore
comp = compiler.SQLCompiler(dialect, query)
comp.compile()
return comp.params
def fix_ws(s):
# type: (Text) -> Text
return re.sub('\s+', ' ', str(s)).strip()
def get_recipient_id_for_stream_name(realm, stream_name):
# type: (Realm, Text) -> Text
stream = get_stream(stream_name, realm)
return get_recipient(Recipient.STREAM, stream.id).id
def mute_stream(realm, user_profile, stream_name):
# type: (Realm, Text, Text) -> None
stream = Stream.objects.get(realm=realm, name=stream_name)
recipient = Recipient.objects.get(type_id=stream.id, type=Recipient.STREAM)
subscription = Subscription.objects.get(recipient=recipient, user_profile=user_profile)
@@ -61,183 +68,225 @@ def mute_stream(realm, user_profile, stream_name):
class NarrowBuilderTest(ZulipTestCase):
def setUp(self):
# type: () -> None
self.realm = get_realm_by_string_id('zulip')
self.user_profile = get_user_profile_by_email("hamlet@zulip.com")
self.builder = NarrowBuilder(self.user_profile, column('id'))
self.raw_query = select([column("id")], None, "zerver_message")
def test_add_term_using_not_defined_operator(self):
# type: () -> None
term = dict(operator='not-defined', operand='any')
self.assertRaises(BadNarrowOperator, self._build_query, term)
def test_add_term_using_stream_operator(self):
# type: () -> None
term = dict(operator='stream', operand='Scotland')
self._do_add_term_test(term, 'WHERE recipient_id = :recipient_id_1')
def test_add_term_using_stream_operator_and_negated(self): # NEGATED
# type: () -> None
term = dict(operator='stream', operand='Scotland', negated=True)
self._do_add_term_test(term, 'WHERE recipient_id != :recipient_id_1')
def test_add_term_using_stream_operator_and_non_existing_operand_should_raise_error(self): # NEGATED
# type: () -> None
term = dict(operator='stream', operand='NonExistingStream')
self.assertRaises(BadNarrowOperator, self._build_query, term)
def test_add_term_using_is_operator_and_private_operand(self):
# type: () -> None
term = dict(operator='is', operand='private')
self._do_add_term_test(term, 'WHERE type = :type_1 OR type = :type_2')
def test_add_term_using_is_operator_private_operand_and_negated(self): # NEGATED
# type: () -> None
term = dict(operator='is', operand='private', negated=True)
self._do_add_term_test(term, 'WHERE NOT (type = :type_1 OR type = :type_2)')
def test_add_term_using_is_operator_and_non_private_operand(self):
# type: () -> None
for operand in ['starred', 'mentioned', 'alerted']:
term = dict(operator='is', operand=operand)
self._do_add_term_test(term, 'WHERE (flags & :flags_1) != :param_1')
def test_add_term_using_is_operator_non_private_operand_and_negated(self): # NEGATED
# type: () -> None
for operand in ['starred', 'mentioned', 'alerted']:
term = dict(operator='is', operand=operand, negated=True)
self._do_add_term_test(term, 'WHERE (flags & :flags_1) = :param_1')
def test_add_term_using_non_supported_operator_should_raise_error(self):
# type: () -> None
term = dict(operator='is', operand='non_supported')
self.assertRaises(BadNarrowOperator, self._build_query, term)
def test_add_term_using_topic_operator_and_lunch_operand(self):
# type: () -> None
term = dict(operator='topic', operand='lunch')
self._do_add_term_test(term, 'WHERE upper(subject) = upper(:param_1)')
def test_add_term_using_topic_operator_lunch_operand_and_negated(self): # NEGATED
# type: () -> None
term = dict(operator='topic', operand='lunch', negated=True)
self._do_add_term_test(term, 'WHERE upper(subject) != upper(:param_1)')
def test_add_term_using_topic_operator_and_personal_operand(self):
# type: () -> None
term = dict(operator='topic', operand='personal')
self._do_add_term_test(term, 'WHERE upper(subject) = upper(:param_1)')
def test_add_term_using_topic_operator_personal_operand_and_negated(self): # NEGATED
# type: () -> None
term = dict(operator='topic', operand='personal', negated=True)
self._do_add_term_test(term, 'WHERE upper(subject) != upper(:param_1)')
def test_add_term_using_sender_operator(self):
# type: () -> None
term = dict(operator='sender', operand='othello@zulip.com')
self._do_add_term_test(term, 'WHERE sender_id = :param_1')
def test_add_term_using_sender_operator_and_negated(self): # NEGATED
# type: () -> None
term = dict(operator='sender', operand='othello@zulip.com', negated=True)
self._do_add_term_test(term, 'WHERE sender_id != :param_1')
def test_add_term_using_sender_operator_with_non_existing_user_as_operand(self): # NEGATED
# type: () -> None
term = dict(operator='sender', operand='non-existing@zulip.com')
self.assertRaises(BadNarrowOperator, self._build_query, term)
def test_add_term_using_pm_with_operator_and_not_the_same_user_as_operand(self):
# type: () -> None
term = dict(operator='pm-with', operand='othello@zulip.com')
self._do_add_term_test(term, 'WHERE sender_id = :sender_id_1 AND recipient_id = :recipient_id_1 OR sender_id = :sender_id_2 AND recipient_id = :recipient_id_2')
def test_add_term_using_pm_with_operator_not_the_same_user_as_operand_and_negated(self): # NEGATED
# type: () -> None
term = dict(operator='pm-with', operand='othello@zulip.com', negated=True)
self._do_add_term_test(term, 'WHERE NOT (sender_id = :sender_id_1 AND recipient_id = :recipient_id_1 OR sender_id = :sender_id_2 AND recipient_id = :recipient_id_2)')
def test_add_term_using_pm_with_operator_the_same_user_as_operand(self):
# type: () -> None
term = dict(operator='pm-with', operand='hamlet@zulip.com')
self._do_add_term_test(term, 'WHERE sender_id = :sender_id_1 AND recipient_id = :recipient_id_1')
def test_add_term_using_pm_with_operator_the_same_user_as_operand_and_negated(self): # NEGATED
# type: () -> None
term = dict(operator='pm-with', operand='hamlet@zulip.com', negated=True)
self._do_add_term_test(term, 'WHERE NOT (sender_id = :sender_id_1 AND recipient_id = :recipient_id_1)')
def test_add_term_using_pm_with_operator_and_more_than_user_as_operand(self):
# type: () -> None
term = dict(operator='pm-with', operand='hamlet@zulip.com, othello@zulip.com')
self._do_add_term_test(term, 'WHERE recipient_id = :recipient_id_1')
def test_add_term_using_pm_with_operator_more_than_user_as_operand_and_negated(self): # NEGATED
# type: () -> None
term = dict(operator='pm-with', operand='hamlet@zulip.com, othello@zulip.com', negated=True)
self._do_add_term_test(term, 'WHERE recipient_id != :recipient_id_1')
def test_add_term_using_pm_with_operator_with_non_existing_user_as_operand(self):
# type: () -> None
term = dict(operator='pm-with', operand='non-existing@zulip.com')
self.assertRaises(BadNarrowOperator, self._build_query, term)
def test_add_term_using_pm_with_operator_with_existing_and_non_existing_user_as_operand(self):
# type: () -> None
term = dict(operator='pm-with', operand='othello@zulip.com,non-existing@zulip.com')
self.assertRaises(BadNarrowOperator, self._build_query, term)
def test_add_term_using_id_operator(self):
# type: () -> None
term = dict(operator='id', operand=555)
self._do_add_term_test(term, 'WHERE id = :param_1')
def test_add_term_using_id_operator_and_negated(self): # NEGATED
# type: () -> None
term = dict(operator='id', operand=555, negated=True)
self._do_add_term_test(term, 'WHERE id != :param_1')
@override_settings(USING_PGROONGA=False)
def test_add_term_using_search_operator(self):
# type: () -> None
term = dict(operator='search', operand='"french fries"')
self._do_add_term_test(term, 'WHERE (lower(content) LIKE lower(:content_1) OR lower(subject) LIKE lower(:subject_1)) AND (search_tsvector @@ plainto_tsquery(:param_2, :param_3))')
@override_settings(USING_PGROONGA=False)
def test_add_term_using_search_operator_and_negated(self): # NEGATED
# type: () -> None
term = dict(operator='search', operand='"french fries"', negated=True)
self._do_add_term_test(term, 'WHERE NOT (lower(content) LIKE lower(:content_1) OR lower(subject) LIKE lower(:subject_1)) AND NOT (search_tsvector @@ plainto_tsquery(:param_2, :param_3))')
@override_settings(USING_PGROONGA=True)
def test_add_term_using_search_operator_pgroonga(self):
# type: () -> None
term = dict(operator='search', operand='"french fries"')
self._do_add_term_test(term, 'WHERE search_pgroonga @@ :search_pgroonga_1')
@override_settings(USING_PGROONGA=True)
def test_add_term_using_search_operator_and_negated_pgroonga(self): # NEGATED
# type: () -> None
term = dict(operator='search', operand='"french fries"', negated=True)
self._do_add_term_test(term, 'WHERE NOT (search_pgroonga @@ :search_pgroonga_1)')
def test_add_term_using_has_operator_and_attachment_operand(self):
# type: () -> None
term = dict(operator='has', operand='attachment')
self._do_add_term_test(term, 'WHERE has_attachment')
def test_add_term_using_has_operator_attachment_operand_and_negated(self): # NEGATED
# type: () -> None
term = dict(operator='has', operand='attachment', negated=True)
self._do_add_term_test(term, 'WHERE NOT has_attachment')
def test_add_term_using_has_operator_and_image_operand(self):
# type: () -> None
term = dict(operator='has', operand='image')
self._do_add_term_test(term, 'WHERE has_image')
def test_add_term_using_has_operator_image_operand_and_negated(self): # NEGATED
# type: () -> None
term = dict(operator='has', operand='image', negated=True)
self._do_add_term_test(term, 'WHERE NOT has_image')
def test_add_term_using_has_operator_and_link_operand(self):
# type: () -> None
term = dict(operator='has', operand='link')
self._do_add_term_test(term, 'WHERE has_link')
def test_add_term_using_has_operator_link_operand_and_negated(self): # NEGATED
# type: () -> None
term = dict(operator='has', operand='link', negated=True)
self._do_add_term_test(term, 'WHERE NOT has_link')
def test_add_term_using_has_operator_non_supported_operand_should_raise_error(self):
# type: () -> None
term = dict(operator='has', operand='non_supported')
self.assertRaises(BadNarrowOperator, self._build_query, term)
def test_add_term_using_in_operator(self):
# type: () -> None
mute_stream(self.realm, self.user_profile, 'Verona')
term = dict(operator='in', operand='home')
self._do_add_term_test(term, 'WHERE recipient_id NOT IN (:recipient_id_1)')
def test_add_term_using_in_operator_and_negated(self):
# type: () -> None
# negated = True should not change anything
mute_stream(self.realm, self.user_profile, 'Verona')
term = dict(operator='in', operand='home', negated=True)
self._do_add_term_test(term, 'WHERE recipient_id NOT IN (:recipient_id_1)')
def test_add_term_using_in_operator_and_all_operand(self):
# type: () -> None
mute_stream(self.realm, self.user_profile, 'Verona')
term = dict(operator='in', operand='all')
query = self._build_query(term)
self.assertEqual(str(query), 'SELECT id \nFROM zerver_message')
def test_add_term_using_in_operator_all_operand_and_negated(self):
# type: () -> None
# negated = True should not change anything
mute_stream(self.realm, self.user_profile, 'Verona')
term = dict(operator='in', operand='all', negated=True)
@@ -245,22 +294,27 @@ class NarrowBuilderTest(ZulipTestCase):
self.assertEqual(str(query), 'SELECT id \nFROM zerver_message')
def test_add_term_using_in_operator_and_not_defined_operand(self):
# type: () -> None
term = dict(operator='in', operand='not_defined')
self.assertRaises(BadNarrowOperator, self._build_query, term)
def test_add_term_using_near_operator(self):
# type: () -> None
term = dict(operator='near', operand='operand')
query = self._build_query(term)
self.assertEqual(str(query), 'SELECT id \nFROM zerver_message')
def _do_add_term_test(self, term, where_clause):
# type: (Dict[str, Any], Text) -> None
self.assertTrue(where_clause in str(self._build_query(term)))
def _build_query(self, term):
# type: (Dict[str, Any]) -> Query
return self.builder.add_term(self.raw_query, term)
class BuildNarrowFilterTest(TestCase):
def test_build_narrow_filter(self):
# type: () -> None
fixtures_path = os.path.join(os.path.dirname(__file__),
'../fixtures/narrow.json')
scenarios = ujson.loads(open(fixtures_path, 'r').read())
@@ -277,6 +331,7 @@ class BuildNarrowFilterTest(TestCase):
class IncludeHistoryTest(ZulipTestCase):
def test_ok_to_include_history(self):
# type: () -> None
realm = get_realm_by_string_id('zulip')
self.make_stream('public_stream', realm=realm)
@@ -322,7 +377,8 @@ class IncludeHistoryTest(ZulipTestCase):
class GetOldMessagesTest(ZulipTestCase):
def get_and_check_messages(self, modified_params):
post_params = {"anchor": 1, "num_before": 1, "num_after": 1}
# type: (Dict[str, Union[str, int]]) -> Dict[str, Dict]
post_params = {"anchor": 1, "num_before": 1, "num_after": 1} # type: Dict[str, Union[str, int]]
post_params.update(modified_params)
payload = self.client_get("/json/messages", dict(post_params))
self.assert_json_success(payload)
@@ -341,10 +397,11 @@ class GetOldMessagesTest(ZulipTestCase):
return result
def get_query_ids(self):
# type: () -> Dict[Text, int]
hamlet_user = get_user_profile_by_email('hamlet@zulip.com')
othello_user = get_user_profile_by_email('othello@zulip.com')
query_ids = {}
query_ids = {} # type: Dict[Text, int]
scotland_stream = get_stream('Scotland', hamlet_user.realm)
query_ids['scotland_recipient'] = get_recipient(Recipient.STREAM, scotland_stream.id).id
@@ -356,6 +413,7 @@ class GetOldMessagesTest(ZulipTestCase):
return query_ids
def test_successful_get_old_messages(self):
# type: () -> None
"""
A call to GET /json/messages with valid parameters returns a list of
messages.
@@ -365,13 +423,12 @@ class GetOldMessagesTest(ZulipTestCase):
# We have to support the legacy tuple style while there are old
# clients around, which might include third party home-grown bots.
narrow = [['pm-with', 'othello@zulip.com']]
self.get_and_check_messages(dict(narrow=ujson.dumps(narrow)))
self.get_and_check_messages(dict(narrow=ujson.dumps([['pm-with', 'othello@zulip.com']])))
narrow = [dict(operator='pm-with', operand='othello@zulip.com')]
self.get_and_check_messages(dict(narrow=ujson.dumps(narrow)))
self.get_and_check_messages(dict(narrow=ujson.dumps([dict(operator='pm-with', operand='othello@zulip.com')])))
def test_get_old_messages_with_narrow_pm_with(self):
# type: () -> None
"""
A request for old messages with a narrow by pm-with only returns
conversations with that user.
@@ -379,6 +436,8 @@ class GetOldMessagesTest(ZulipTestCase):
me = 'hamlet@zulip.com'
def dr_emails(dr):
# type: (Union[Text, List[Dict[str, Any]]]) -> Text
assert isinstance(dr, list)
return ','.join(sorted(set([r['email'] for r in dr] + [me])))
personals = [m for m in get_user_messages(get_user_profile_by_email(me))
@@ -398,6 +457,7 @@ class GetOldMessagesTest(ZulipTestCase):
self.assertEqual(dr_emails(message['display_recipient']), emails)
def test_get_old_messages_with_narrow_stream(self):
# type: () -> None
"""
A request for old messages with a narrow by stream only returns
messages for that stream.
@@ -421,6 +481,7 @@ class GetOldMessagesTest(ZulipTestCase):
self.assertEqual(message["recipient_id"], stream_id)
def test_get_old_messages_with_narrow_stream_mit_unicode_regex(self):
# type: () -> None
"""
A request for old messages for a user in the mit.edu relam with unicode
stream name should be correctly escaped in the database query.
@@ -452,6 +513,7 @@ class GetOldMessagesTest(ZulipTestCase):
self.assertEqual(message["recipient_id"], stream_id)
def test_get_old_messages_with_narrow_topic_mit_unicode_regex(self):
# type: () -> None
"""
A request for old messages for a user in the mit.edu relam with unicode
topic name should be correctly escaped in the database query.
@@ -481,6 +543,7 @@ class GetOldMessagesTest(ZulipTestCase):
self.assertEqual(message["recipient_id"], stream_id)
def test_get_old_messages_with_narrow_sender(self):
# type: () -> None
"""
A request for old messages with a narrow by sender only returns
messages sent by that person.
@@ -519,7 +582,7 @@ class GetOldMessagesTest(ZulipTestCase):
self.login(email)
def send(content):
# type: (Text) -> None
# type: (Text) -> int
msg_id = self.send_message(
sender_name=email,
raw_recipients="Verona",
@@ -551,6 +614,7 @@ class GetOldMessagesTest(ZulipTestCase):
@override_settings(USING_PGROONGA=False)
def test_get_old_messages_with_search(self):
# type: () -> None
self.login("cordelia@zulip.com")
messages_to_search = [
@@ -580,7 +644,7 @@ class GetOldMessagesTest(ZulipTestCase):
narrow=ujson.dumps(narrow),
anchor=0,
num_after=10,
))
)) # type: Dict[str, Dict]
self.assertEqual(len(result['messages']), 2)
messages = result['messages']
@@ -603,6 +667,7 @@ class GetOldMessagesTest(ZulipTestCase):
@override_settings(USING_PGROONGA=True)
def test_get_old_messages_with_search_pgroonga(self):
# type: () -> None
self.login("cordelia@zulip.com")
messages_to_search = [
@@ -639,7 +704,7 @@ class GetOldMessagesTest(ZulipTestCase):
narrow=ujson.dumps(narrow),
anchor=0,
num_after=10,
))
)) # type: Dict[str, Dict]
self.assertEqual(len(result['messages']), 4)
messages = result['messages']
@@ -661,6 +726,7 @@ class GetOldMessagesTest(ZulipTestCase):
u'<p>I want to go to <span class="highlight">日本</span>!</p>')
def test_get_old_messages_with_only_searching_anchor(self):
# type: () -> None
"""
Test that specifying an anchor but 0 for num_before and num_after
returns at most 1 message.
@@ -671,7 +737,7 @@ class GetOldMessagesTest(ZulipTestCase):
narrow = [dict(operator='sender', operand='cordelia@zulip.com')]
result = self.get_and_check_messages(dict(narrow=ujson.dumps(narrow),
anchor=anchor, num_before=0,
num_after=0))
num_after=0)) # type: Dict[str, Dict]
self.assertEqual(len(result['messages']), 1)
narrow = [dict(operator='is', operand='mentioned')]
@@ -681,13 +747,14 @@ class GetOldMessagesTest(ZulipTestCase):
self.assertEqual(len(result['messages']), 0)
def test_missing_params(self):
# type: () -> None
"""
anchor, num_before, and num_after are all required
POST parameters for get_old_messages.
"""
self.login("hamlet@zulip.com")
required_args = (("anchor", 1), ("num_before", 1), ("num_after", 1))
required_args = (("anchor", 1), ("num_before", 1), ("num_after", 1)) # type: Tuple[Tuple[Text, int], ...]
for i in range(len(required_args)):
post_params = dict(required_args[:i] + required_args[i + 1:])
@@ -696,6 +763,7 @@ class GetOldMessagesTest(ZulipTestCase):
"Missing '%s' argument" % (required_args[i][0],))
def test_bad_int_params(self):
# type: () -> None
"""
num_before, num_after, and narrow must all be non-negative
integers or strings that can be converted to non-negative integers.
@@ -719,15 +787,16 @@ class GetOldMessagesTest(ZulipTestCase):
"Bad value for '%s': %s" % (param, type))
def test_bad_narrow_type(self):
# type: () -> None
"""
narrow must be a list of string pairs.
"""
self.login("hamlet@zulip.com")
other_params = [("anchor", 0), ("num_before", 0), ("num_after", 0)]
other_params = [("anchor", 0), ("num_before", 0), ("num_after", 0)] # type: list[Tuple[Text, Union[int, str, bool]]]
bad_types = (False, 0, '', '{malformed json,',
'{foo: 3}', '[1,2]', '[["x","y","z"]]')
'{foo: 3}', '[1,2]', '[["x","y","z"]]') # type: Tuple[Union[int, str, bool], ...]
for type in bad_types:
post_params = dict(other_params + [("narrow", type)])
result = self.client_get("/json/messages", post_params)
@@ -735,16 +804,18 @@ class GetOldMessagesTest(ZulipTestCase):
"Bad value for 'narrow': %s" % (type,))
def test_old_empty_narrow(self):
# type: () -> None
"""
'{}' is accepted to mean 'no narrow', for use by old mobile clients.
"""
self.login("hamlet@zulip.com")
all_result = self.get_and_check_messages({})
narrow_result = self.get_and_check_messages({'narrow': '{}'})
all_result = self.get_and_check_messages({}) # type: Dict[str, Dict]
narrow_result = self.get_and_check_messages({'narrow': '{}'}) # type: Dict[str, Dict]
self.assertEqual(message_ids(all_result), message_ids(narrow_result))
def test_bad_narrow_operator(self):
# type: () -> None
"""
Unrecognized narrow operators are rejected.
"""
@@ -757,6 +828,7 @@ class GetOldMessagesTest(ZulipTestCase):
"Invalid narrow operator: unknown operator")
def test_non_string_narrow_operand_in_dict(self):
# type: () -> None
"""
We expect search operands to be strings, not integers.
"""
@@ -768,7 +840,8 @@ class GetOldMessagesTest(ZulipTestCase):
self.assert_json_error_contains(result, 'elem["operand"] is not a string')
def exercise_bad_narrow_operand(self, operator, operands, error_msg):
other_params = [("anchor", 0), ("num_before", 0), ("num_after", 0)]
# type: (Text, Sequence, Text) -> None
other_params = [("anchor", 0), ("num_before", 0), ("num_after", 0)] # type: List
for operand in operands:
post_params = dict(other_params + [
("narrow", ujson.dumps([[operator, operand]]))])
@@ -776,36 +849,41 @@ class GetOldMessagesTest(ZulipTestCase):
self.assert_json_error_contains(result, error_msg)
def test_bad_narrow_stream_content(self):
# type: () -> None
"""
If an invalid stream name is requested in get_old_messages, an error is
returned.
"""
self.login("hamlet@zulip.com")
bad_stream_content = (0, [], ["x", "y"])
bad_stream_content = (0, [], ["x", "y"]) # type: Sequence
self.exercise_bad_narrow_operand("stream", bad_stream_content,
"Bad value for 'narrow'")
def test_bad_narrow_one_on_one_email_content(self):
# type: () -> None
"""
If an invalid 'pm-with' is requested in get_old_messages, an
error is returned.
"""
self.login("hamlet@zulip.com")
bad_stream_content = (0, [], ["x", "y"])
bad_stream_content = (0, [], ["x", "y"]) # type: Tuple[int, List[None], List[Text]]
self.exercise_bad_narrow_operand("pm-with", bad_stream_content,
"Bad value for 'narrow'")
def test_bad_narrow_nonexistent_stream(self):
# type: () -> None
self.login("hamlet@zulip.com")
self.exercise_bad_narrow_operand("stream", ['non-existent stream'],
"Invalid narrow operator: unknown stream")
def test_bad_narrow_nonexistent_email(self):
# type: () -> None
self.login("hamlet@zulip.com")
self.exercise_bad_narrow_operand("pm-with", ['non-existent-user@zulip.com'],
"Invalid narrow operator: unknown user")
def test_message_without_rendered_content(self):
# type: () -> None
"""Older messages may not have rendered_content in the database"""
m = self.get_last_message()
m.rendered_content = m.rendered_content_version = None
@@ -815,6 +893,7 @@ class GetOldMessagesTest(ZulipTestCase):
self.assertEqual(d['content'], '<p>test content</p>')
def common_check_get_old_messages_query(self, query_params, expected):
# type: (Dict[str, object], Text) -> None
user_profile = get_user_profile_by_email("hamlet@zulip.com")
request = POSTRequestMock(query_params, user_profile)
with queries_captured() as queries:
@@ -822,12 +901,13 @@ class GetOldMessagesTest(ZulipTestCase):
for query in queries:
if "/* get_old_messages */" in query['sql']:
sql = query['sql'].replace(" /* get_old_messages */", '')
sql = str(query['sql']).replace(" /* get_old_messages */", '')
self.assertEqual(sql, expected)
return
self.fail("get_old_messages query not found")
def test_use_first_unread_anchor_with_some_unread_messages(self):
# type: () -> None
user_profile = get_user_profile_by_email("hamlet@zulip.com")
# Have Othello send messages to Hamlet that he hasn't read.
@@ -862,6 +942,7 @@ class GetOldMessagesTest(ZulipTestCase):
self.assertIn(cond, sql)
def test_use_first_unread_anchor_with_no_unread_messages(self):
# type: () -> None
user_profile = get_user_profile_by_email("hamlet@zulip.com")
query_params = dict(
@@ -883,6 +964,7 @@ class GetOldMessagesTest(ZulipTestCase):
self.assertIn('AND message_id = 10000000000000000', queries[0]['sql'])
def test_use_first_unread_anchor_with_muted_topics(self):
# type: () -> None
"""
Test that our logic related to `use_first_unread_anchor`
invokes the `message_id = 10000000000000000` hack for
@@ -915,7 +997,7 @@ class GetOldMessagesTest(ZulipTestCase):
# Do some tests on the main query, to verify the muting logic
# runs on this code path.
queries = [q for q in all_queries if q['sql'].startswith("SELECT message_id, flags")]
queries = [q for q in all_queries if str(q['sql']).startswith("SELECT message_id, flags")]
self.assertEqual(len(queries), 1)
stream = get_stream('Scotland', realm)
@@ -930,6 +1012,7 @@ class GetOldMessagesTest(ZulipTestCase):
self.assertIn('AND message_id = 10000000000000000', queries[0]['sql'])
def test_exclude_muting_conditions(self):
# type: () -> None
realm = get_realm_by_string_id('zulip')
self.make_stream('web stuff')
user_profile = get_user_profile_by_email("hamlet@zulip.com")
@@ -991,6 +1074,7 @@ class GetOldMessagesTest(ZulipTestCase):
self.assertEqual(params['upper_2'], 'css')
def test_get_old_messages_queries(self):
# type: () -> None
query_ids = self.get_query_ids()
sql_template = 'SELECT anon_1.message_id, anon_1.flags \nFROM (SELECT message_id, flags \nFROM zerver_usermessage \nWHERE user_profile_id = {hamlet_id} AND message_id >= 0 ORDER BY message_id ASC \n LIMIT 11) AS anon_1 ORDER BY message_id ASC'
@@ -1006,6 +1090,7 @@ class GetOldMessagesTest(ZulipTestCase):
self.common_check_get_old_messages_query({'anchor': 100, 'num_before': 10, 'num_after': 10}, sql)
def test_get_old_messages_with_narrow_queries(self):
# type: () -> None
query_ids = self.get_query_ids()
sql_template = 'SELECT anon_1.message_id, anon_1.flags \nFROM (SELECT message_id, flags \nFROM zerver_usermessage JOIN zerver_message ON zerver_usermessage.message_id = zerver_message.id \nWHERE user_profile_id = {hamlet_id} AND (sender_id = {othello_id} AND recipient_id = {hamlet_recipient} OR sender_id = {hamlet_id} AND recipient_id = {othello_recipient}) AND message_id >= 0 ORDER BY message_id ASC \n LIMIT 10) AS anon_1 ORDER BY message_id ASC'
@@ -1059,9 +1144,10 @@ class GetOldMessagesTest(ZulipTestCase):
@override_settings(USING_PGROONGA=False)
def test_get_old_messages_with_search_queries(self):
# type: () -> None
query_ids = self.get_query_ids()
sql_template = "SELECT anon_1.message_id, anon_1.flags, anon_1.subject, anon_1.rendered_content, anon_1.content_matches, anon_1.subject_matches \nFROM (SELECT message_id, flags, subject, rendered_content, ts_match_locs_array('zulip.english_us_search', rendered_content, plainto_tsquery('zulip.english_us_search', 'jumping')) AS content_matches, ts_match_locs_array('zulip.english_us_search', escape_html(subject), plainto_tsquery('zulip.english_us_search', 'jumping')) AS subject_matches \nFROM zerver_usermessage JOIN zerver_message ON zerver_usermessage.message_id = zerver_message.id \nWHERE user_profile_id = {hamlet_id} AND (search_tsvector @@ plainto_tsquery('zulip.english_us_search', 'jumping')) AND message_id >= 0 ORDER BY message_id ASC \n LIMIT 10) AS anon_1 ORDER BY message_id ASC"
sql_template = "SELECT anon_1.message_id, anon_1.flags, anon_1.subject, anon_1.rendered_content, anon_1.content_matches, anon_1.subject_matches \nFROM (SELECT message_id, flags, subject, rendered_content, ts_match_locs_array('zulip.english_us_search', rendered_content, plainto_tsquery('zulip.english_us_search', 'jumping')) AS content_matches, ts_match_locs_array('zulip.english_us_search', escape_html(subject), plainto_tsquery('zulip.english_us_search', 'jumping')) AS subject_matches \nFROM zerver_usermessage JOIN zerver_message ON zerver_usermessage.message_id = zerver_message.id \nWHERE user_profile_id = {hamlet_id} AND (search_tsvector @@ plainto_tsquery('zulip.english_us_search', 'jumping')) AND message_id >= 0 ORDER BY message_id ASC \n LIMIT 10) AS anon_1 ORDER BY message_id ASC" # type: Text
sql = sql_template.format(**query_ids)
self.common_check_get_old_messages_query({'anchor': 0, 'num_before': 0, 'num_after': 10,
'narrow': '[["search", "jumping"]]'},