From f5f7801302597f6f6cdadab28fb4934ea7861ef8 Mon Sep 17 00:00:00 2001 From: Steve Howell Date: Tue, 13 Sep 2016 14:45:39 -0700 Subject: [PATCH] tests: Extract test_presence.py --- zerver/tests/test_presence.py | 185 ++++++++++++++++++++++++++++++++++ zerver/tests/tests.py | 170 +------------------------------ 2 files changed, 188 insertions(+), 167 deletions(-) create mode 100644 zerver/tests/test_presence.py diff --git a/zerver/tests/test_presence.py b/zerver/tests/test_presence.py new file mode 100644 index 0000000000..57afefa971 --- /dev/null +++ b/zerver/tests/test_presence.py @@ -0,0 +1,185 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import +from __future__ import print_function + +from django.http import HttpResponse + +from typing import Any, Dict +from zerver.lib.test_helpers import ( + get_user_profile_by_email, + make_client, + queries_captured, + ZulipTestCase, +) +from zerver.models import ( + split_email_to_domain, + Client, + UserActivity, + UserProfile, +) + +import datetime +import ujson + +class ActivityTest(ZulipTestCase): + def test_activity(self): + # type: () -> None + self.login("hamlet@zulip.com") + client, _ = Client.objects.get_or_create(name='website') + query = '/json/users/me/pointer' + last_visit = datetime.datetime.now() + count=150 + for user_profile in UserProfile.objects.all(): + UserActivity.objects.get_or_create( + user_profile=user_profile, + client=client, + query=query, + count=count, + last_visit=last_visit + ) + with queries_captured() as queries: + self.client_get('/activity') + + self.assert_length(queries, 13) + +class UserPresenceTests(ZulipTestCase): + def test_get_empty(self): + # type: () -> None + self.login("hamlet@zulip.com") + result = self.client_post("/json/get_active_statuses") + + self.assert_json_success(result) + json = ujson.loads(result.content) + for email, presence in json['presences'].items(): + self.assertEqual(presence, {}) + + def test_invalid_presence(self): + # type: () -> None + email = "hamlet@zulip.com" + self.login(email) + result = self.client_post("/json/users/me/presence", {'status': 'foo'}) + self.assert_json_error(result, 'Invalid presence status: foo') + + def test_set_idle(self): + # type: () -> None + email = "hamlet@zulip.com" + self.login(email) + client = 'website' + + def test_result(result): + # type: (HttpResponse) -> datetime.datetime + self.assert_json_success(result) + json = ujson.loads(result.content) + self.assertEqual(json['presences'][email][client]['status'], 'idle') + self.assertIn('timestamp', json['presences'][email][client]) + self.assertIsInstance(json['presences'][email][client]['timestamp'], int) + self.assertEqual(list(json['presences'].keys()), ['hamlet@zulip.com']) + return json['presences'][email][client]['timestamp'] + + result = self.client_post("/json/users/me/presence", {'status': 'idle'}) + test_result(result) + + result = self.client_post("/json/get_active_statuses", {}) + timestamp = test_result(result) + + email = "othello@zulip.com" + self.login(email) + self.client_post("/json/users/me/presence", {'status': 'idle'}) + result = self.client_post("/json/get_active_statuses", {}) + self.assert_json_success(result) + json = ujson.loads(result.content) + self.assertEqual(json['presences'][email][client]['status'], 'idle') + self.assertEqual(json['presences']['hamlet@zulip.com'][client]['status'], 'idle') + self.assertEqual(sorted(json['presences'].keys()), ['hamlet@zulip.com', 'othello@zulip.com']) + newer_timestamp = json['presences'][email][client]['timestamp'] + self.assertGreaterEqual(newer_timestamp, timestamp) + + def test_set_active(self): + # type: () -> None + self.login("hamlet@zulip.com") + client = 'website' + + self.client_post("/json/users/me/presence", {'status': 'idle'}) + result = self.client_post("/json/get_active_statuses", {}) + + self.assert_json_success(result) + json = ujson.loads(result.content) + self.assertEqual(json['presences']["hamlet@zulip.com"][client]['status'], 'idle') + + email = "othello@zulip.com" + self.login("othello@zulip.com") + self.client_post("/json/users/me/presence", {'status': 'idle'}) + result = self.client_post("/json/get_active_statuses", {}) + self.assert_json_success(result) + json = ujson.loads(result.content) + self.assertEqual(json['presences'][email][client]['status'], 'idle') + self.assertEqual(json['presences']['hamlet@zulip.com'][client]['status'], 'idle') + + self.client_post("/json/users/me/presence", {'status': 'active'}) + result = self.client_post("/json/get_active_statuses", {}) + self.assert_json_success(result) + json = ujson.loads(result.content) + self.assertEqual(json['presences'][email][client]['status'], 'active') + self.assertEqual(json['presences']['hamlet@zulip.com'][client]['status'], 'idle') + + def test_no_mit(self): + # type: () -> None + """Zephyr mirror realms such as MIT never get a list of users""" + self.login("espuser@mit.edu") + result = self.client_post("/json/users/me/presence", {'status': 'idle'}) + self.assert_json_success(result) + json = ujson.loads(result.content) + self.assertEqual(json['presences'], {}) + + def test_mirror_presence(self): + # type: () -> None + """Zephyr mirror realms find out the status of their mirror bot""" + email = 'espuser@mit.edu' + user_profile = get_user_profile_by_email(email) + self.login(email) + + def post_presence(): + # type: () -> Dict[str, Any] + result = self.client_post("/json/users/me/presence", {'status': 'idle'}) + self.assert_json_success(result) + json = ujson.loads(result.content) + return json + + json = post_presence() + self.assertEqual(json['zephyr_mirror_active'], False) + + self._simulate_mirror_activity_for_user(user_profile) + json = post_presence() + self.assertEqual(json['zephyr_mirror_active'], True) + + + def _simulate_mirror_activity_for_user(self, user_profile): + # type: (UserProfile) -> None + last_visit = datetime.datetime.now() + client = make_client('zephyr_mirror') + + UserActivity.objects.get_or_create( + user_profile=user_profile, + client=client, + query='get_events_backend', + count=2, + last_visit=last_visit + ) + + + def test_same_realm(self): + # type: () -> None + self.login("espuser@mit.edu") + self.client_post("/json/users/me/presence", {'status': 'idle'}) + result = self.client_post("/accounts/logout/") + + # Ensure we don't see hamlet@zulip.com information leakage + self.login("hamlet@zulip.com") + result = self.client_post("/json/users/me/presence", {'status': 'idle'}) + self.assert_json_success(result) + json = ujson.loads(result.content) + self.assertEqual(json['presences']["hamlet@zulip.com"]["website"]['status'], 'idle') + # We only want @zulip.com emails + for email in json['presences'].keys(): + self.assertEqual(split_email_to_domain(email), 'zulip.com') + diff --git a/zerver/tests/tests.py b/zerver/tests/tests.py index ce25283b43..26464058de 100644 --- a/zerver/tests/tests.py +++ b/zerver/tests/tests.py @@ -5,19 +5,18 @@ from __future__ import print_function from typing import Any, Callable, Dict, Iterable, List, Mapping, Tuple, TypeVar from mock import patch, MagicMock -from django.http import HttpResponse from django.test import TestCase, override_settings from zerver.lib.test_helpers import ( queries_captured, simulated_empty_cache, simulated_queue_client, tornado_redirected_to_list, ZulipTestCase, - most_recent_usermessage, most_recent_message, make_client + most_recent_usermessage, most_recent_message ) from zerver.lib.test_runner import slow from zerver.models import UserProfile, Recipient, \ - Realm, Client, UserActivity, \ - get_user_profile_by_email, split_email_to_domain, get_realm, \ + Realm, UserActivity, \ + get_user_profile_by_email, get_realm, \ get_client, get_stream, Message, get_unique_open_realm, \ completely_open @@ -41,7 +40,6 @@ from django.conf import settings from django.core import mail from six import text_type from six.moves import range -import datetime import os import re import sys @@ -454,27 +452,6 @@ class WorkerTest(TestCase): worker = TestWorker() worker.consume({}) -class ActivityTest(ZulipTestCase): - def test_activity(self): - # type: () -> None - self.login("hamlet@zulip.com") - client, _ = Client.objects.get_or_create(name='website') - query = '/json/users/me/pointer' - last_visit = datetime.datetime.now() - count=150 - for user_profile in UserProfile.objects.all(): - UserActivity.objects.get_or_create( - user_profile=user_profile, - client=client, - query=query, - count=count, - last_visit=last_visit - ) - with queries_captured() as queries: - self.client_get('/activity') - - self.assert_length(queries, 13) - class DocPageTest(ZulipTestCase): def _test(self, url, expected_content): # type: (str, str) -> None @@ -1578,147 +1555,6 @@ class GetProfileTest(ZulipTestCase): get_avatar_url(user_profile.avatar_source, user_profile.email), ) -class UserPresenceTests(ZulipTestCase): - def test_get_empty(self): - # type: () -> None - self.login("hamlet@zulip.com") - result = self.client_post("/json/get_active_statuses") - - self.assert_json_success(result) - json = ujson.loads(result.content) - for email, presence in json['presences'].items(): - self.assertEqual(presence, {}) - - def test_invalid_presence(self): - # type: () -> None - email = "hamlet@zulip.com" - self.login(email) - result = self.client_post("/json/users/me/presence", {'status': 'foo'}) - self.assert_json_error(result, 'Invalid presence status: foo') - - def test_set_idle(self): - # type: () -> None - email = "hamlet@zulip.com" - self.login(email) - client = 'website' - - def test_result(result): - # type: (HttpResponse) -> datetime.datetime - self.assert_json_success(result) - json = ujson.loads(result.content) - self.assertEqual(json['presences'][email][client]['status'], 'idle') - self.assertIn('timestamp', json['presences'][email][client]) - self.assertIsInstance(json['presences'][email][client]['timestamp'], int) - self.assertEqual(list(json['presences'].keys()), ['hamlet@zulip.com']) - return json['presences'][email][client]['timestamp'] - - result = self.client_post("/json/users/me/presence", {'status': 'idle'}) - test_result(result) - - result = self.client_post("/json/get_active_statuses", {}) - timestamp = test_result(result) - - email = "othello@zulip.com" - self.login(email) - self.client_post("/json/users/me/presence", {'status': 'idle'}) - result = self.client_post("/json/get_active_statuses", {}) - self.assert_json_success(result) - json = ujson.loads(result.content) - self.assertEqual(json['presences'][email][client]['status'], 'idle') - self.assertEqual(json['presences']['hamlet@zulip.com'][client]['status'], 'idle') - self.assertEqual(sorted(json['presences'].keys()), ['hamlet@zulip.com', 'othello@zulip.com']) - newer_timestamp = json['presences'][email][client]['timestamp'] - self.assertGreaterEqual(newer_timestamp, timestamp) - - def test_set_active(self): - # type: () -> None - self.login("hamlet@zulip.com") - client = 'website' - - self.client_post("/json/users/me/presence", {'status': 'idle'}) - result = self.client_post("/json/get_active_statuses", {}) - - self.assert_json_success(result) - json = ujson.loads(result.content) - self.assertEqual(json['presences']["hamlet@zulip.com"][client]['status'], 'idle') - - email = "othello@zulip.com" - self.login("othello@zulip.com") - self.client_post("/json/users/me/presence", {'status': 'idle'}) - result = self.client_post("/json/get_active_statuses", {}) - self.assert_json_success(result) - json = ujson.loads(result.content) - self.assertEqual(json['presences'][email][client]['status'], 'idle') - self.assertEqual(json['presences']['hamlet@zulip.com'][client]['status'], 'idle') - - self.client_post("/json/users/me/presence", {'status': 'active'}) - result = self.client_post("/json/get_active_statuses", {}) - self.assert_json_success(result) - json = ujson.loads(result.content) - self.assertEqual(json['presences'][email][client]['status'], 'active') - self.assertEqual(json['presences']['hamlet@zulip.com'][client]['status'], 'idle') - - def test_no_mit(self): - # type: () -> None - """Zephyr mirror realms such as MIT never get a list of users""" - self.login("espuser@mit.edu") - result = self.client_post("/json/users/me/presence", {'status': 'idle'}) - self.assert_json_success(result) - json = ujson.loads(result.content) - self.assertEqual(json['presences'], {}) - - def test_mirror_presence(self): - # type: () -> None - """Zephyr mirror realms find out the status of their mirror bot""" - email = 'espuser@mit.edu' - user_profile = get_user_profile_by_email(email) - self.login(email) - - def post_presence(): - # type: () -> Dict[str, Any] - result = self.client_post("/json/users/me/presence", {'status': 'idle'}) - self.assert_json_success(result) - json = ujson.loads(result.content) - return json - - json = post_presence() - self.assertEqual(json['zephyr_mirror_active'], False) - - self._simulate_mirror_activity_for_user(user_profile) - json = post_presence() - self.assertEqual(json['zephyr_mirror_active'], True) - - - def _simulate_mirror_activity_for_user(self, user_profile): - # type: (UserProfile) -> None - last_visit = datetime.datetime.now() - client = make_client('zephyr_mirror') - - UserActivity.objects.get_or_create( - user_profile=user_profile, - client=client, - query='get_events_backend', - count=2, - last_visit=last_visit - ) - - - def test_same_realm(self): - # type: () -> None - self.login("espuser@mit.edu") - self.client_post("/json/users/me/presence", {'status': 'idle'}) - result = self.client_post("/accounts/logout/") - - # Ensure we don't see hamlet@zulip.com information leakage - self.login("hamlet@zulip.com") - result = self.client_post("/json/users/me/presence", {'status': 'idle'}) - self.assert_json_success(result) - json = ujson.loads(result.content) - self.assertEqual(json['presences']["hamlet@zulip.com"]["website"]['status'], 'idle') - # We only want @zulip.com emails - for email in json['presences'].keys(): - self.assertEqual(split_email_to_domain(email), 'zulip.com') - class AlertWordTests(ZulipTestCase): interesting_alert_word_list = ['alert', 'multi-word word', u'☃']