tests: Extract test_presence.py

This commit is contained in:
Steve Howell
2016-09-13 14:45:39 -07:00
parent 2b4d59d6e8
commit f5f7801302
2 changed files with 188 additions and 167 deletions

View File

@@ -0,0 +1,185 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import
from __future__ import print_function
from django.http import HttpResponse
from typing import Any, Dict
from zerver.lib.test_helpers import (
get_user_profile_by_email,
make_client,
queries_captured,
ZulipTestCase,
)
from zerver.models import (
split_email_to_domain,
Client,
UserActivity,
UserProfile,
)
import datetime
import ujson
class ActivityTest(ZulipTestCase):
def test_activity(self):
# type: () -> None
self.login("hamlet@zulip.com")
client, _ = Client.objects.get_or_create(name='website')
query = '/json/users/me/pointer'
last_visit = datetime.datetime.now()
count=150
for user_profile in UserProfile.objects.all():
UserActivity.objects.get_or_create(
user_profile=user_profile,
client=client,
query=query,
count=count,
last_visit=last_visit
)
with queries_captured() as queries:
self.client_get('/activity')
self.assert_length(queries, 13)
class UserPresenceTests(ZulipTestCase):
def test_get_empty(self):
# type: () -> None
self.login("hamlet@zulip.com")
result = self.client_post("/json/get_active_statuses")
self.assert_json_success(result)
json = ujson.loads(result.content)
for email, presence in json['presences'].items():
self.assertEqual(presence, {})
def test_invalid_presence(self):
# type: () -> None
email = "hamlet@zulip.com"
self.login(email)
result = self.client_post("/json/users/me/presence", {'status': 'foo'})
self.assert_json_error(result, 'Invalid presence status: foo')
def test_set_idle(self):
# type: () -> None
email = "hamlet@zulip.com"
self.login(email)
client = 'website'
def test_result(result):
# type: (HttpResponse) -> datetime.datetime
self.assert_json_success(result)
json = ujson.loads(result.content)
self.assertEqual(json['presences'][email][client]['status'], 'idle')
self.assertIn('timestamp', json['presences'][email][client])
self.assertIsInstance(json['presences'][email][client]['timestamp'], int)
self.assertEqual(list(json['presences'].keys()), ['hamlet@zulip.com'])
return json['presences'][email][client]['timestamp']
result = self.client_post("/json/users/me/presence", {'status': 'idle'})
test_result(result)
result = self.client_post("/json/get_active_statuses", {})
timestamp = test_result(result)
email = "othello@zulip.com"
self.login(email)
self.client_post("/json/users/me/presence", {'status': 'idle'})
result = self.client_post("/json/get_active_statuses", {})
self.assert_json_success(result)
json = ujson.loads(result.content)
self.assertEqual(json['presences'][email][client]['status'], 'idle')
self.assertEqual(json['presences']['hamlet@zulip.com'][client]['status'], 'idle')
self.assertEqual(sorted(json['presences'].keys()), ['hamlet@zulip.com', 'othello@zulip.com'])
newer_timestamp = json['presences'][email][client]['timestamp']
self.assertGreaterEqual(newer_timestamp, timestamp)
def test_set_active(self):
# type: () -> None
self.login("hamlet@zulip.com")
client = 'website'
self.client_post("/json/users/me/presence", {'status': 'idle'})
result = self.client_post("/json/get_active_statuses", {})
self.assert_json_success(result)
json = ujson.loads(result.content)
self.assertEqual(json['presences']["hamlet@zulip.com"][client]['status'], 'idle')
email = "othello@zulip.com"
self.login("othello@zulip.com")
self.client_post("/json/users/me/presence", {'status': 'idle'})
result = self.client_post("/json/get_active_statuses", {})
self.assert_json_success(result)
json = ujson.loads(result.content)
self.assertEqual(json['presences'][email][client]['status'], 'idle')
self.assertEqual(json['presences']['hamlet@zulip.com'][client]['status'], 'idle')
self.client_post("/json/users/me/presence", {'status': 'active'})
result = self.client_post("/json/get_active_statuses", {})
self.assert_json_success(result)
json = ujson.loads(result.content)
self.assertEqual(json['presences'][email][client]['status'], 'active')
self.assertEqual(json['presences']['hamlet@zulip.com'][client]['status'], 'idle')
def test_no_mit(self):
# type: () -> None
"""Zephyr mirror realms such as MIT never get a list of users"""
self.login("espuser@mit.edu")
result = self.client_post("/json/users/me/presence", {'status': 'idle'})
self.assert_json_success(result)
json = ujson.loads(result.content)
self.assertEqual(json['presences'], {})
def test_mirror_presence(self):
# type: () -> None
"""Zephyr mirror realms find out the status of their mirror bot"""
email = 'espuser@mit.edu'
user_profile = get_user_profile_by_email(email)
self.login(email)
def post_presence():
# type: () -> Dict[str, Any]
result = self.client_post("/json/users/me/presence", {'status': 'idle'})
self.assert_json_success(result)
json = ujson.loads(result.content)
return json
json = post_presence()
self.assertEqual(json['zephyr_mirror_active'], False)
self._simulate_mirror_activity_for_user(user_profile)
json = post_presence()
self.assertEqual(json['zephyr_mirror_active'], True)
def _simulate_mirror_activity_for_user(self, user_profile):
# type: (UserProfile) -> None
last_visit = datetime.datetime.now()
client = make_client('zephyr_mirror')
UserActivity.objects.get_or_create(
user_profile=user_profile,
client=client,
query='get_events_backend',
count=2,
last_visit=last_visit
)
def test_same_realm(self):
# type: () -> None
self.login("espuser@mit.edu")
self.client_post("/json/users/me/presence", {'status': 'idle'})
result = self.client_post("/accounts/logout/")
# Ensure we don't see hamlet@zulip.com information leakage
self.login("hamlet@zulip.com")
result = self.client_post("/json/users/me/presence", {'status': 'idle'})
self.assert_json_success(result)
json = ujson.loads(result.content)
self.assertEqual(json['presences']["hamlet@zulip.com"]["website"]['status'], 'idle')
# We only want @zulip.com emails
for email in json['presences'].keys():
self.assertEqual(split_email_to_domain(email), 'zulip.com')

View File

@@ -5,19 +5,18 @@ from __future__ import print_function
from typing import Any, Callable, Dict, Iterable, List, Mapping, Tuple, TypeVar from typing import Any, Callable, Dict, Iterable, List, Mapping, Tuple, TypeVar
from mock import patch, MagicMock from mock import patch, MagicMock
from django.http import HttpResponse
from django.test import TestCase, override_settings from django.test import TestCase, override_settings
from zerver.lib.test_helpers import ( from zerver.lib.test_helpers import (
queries_captured, simulated_empty_cache, queries_captured, simulated_empty_cache,
simulated_queue_client, tornado_redirected_to_list, ZulipTestCase, simulated_queue_client, tornado_redirected_to_list, ZulipTestCase,
most_recent_usermessage, most_recent_message, make_client most_recent_usermessage, most_recent_message
) )
from zerver.lib.test_runner import slow from zerver.lib.test_runner import slow
from zerver.models import UserProfile, Recipient, \ from zerver.models import UserProfile, Recipient, \
Realm, Client, UserActivity, \ Realm, UserActivity, \
get_user_profile_by_email, split_email_to_domain, get_realm, \ get_user_profile_by_email, get_realm, \
get_client, get_stream, Message, get_unique_open_realm, \ get_client, get_stream, Message, get_unique_open_realm, \
completely_open completely_open
@@ -41,7 +40,6 @@ from django.conf import settings
from django.core import mail from django.core import mail
from six import text_type from six import text_type
from six.moves import range from six.moves import range
import datetime
import os import os
import re import re
import sys import sys
@@ -454,27 +452,6 @@ class WorkerTest(TestCase):
worker = TestWorker() worker = TestWorker()
worker.consume({}) worker.consume({})
class ActivityTest(ZulipTestCase):
def test_activity(self):
# type: () -> None
self.login("hamlet@zulip.com")
client, _ = Client.objects.get_or_create(name='website')
query = '/json/users/me/pointer'
last_visit = datetime.datetime.now()
count=150
for user_profile in UserProfile.objects.all():
UserActivity.objects.get_or_create(
user_profile=user_profile,
client=client,
query=query,
count=count,
last_visit=last_visit
)
with queries_captured() as queries:
self.client_get('/activity')
self.assert_length(queries, 13)
class DocPageTest(ZulipTestCase): class DocPageTest(ZulipTestCase):
def _test(self, url, expected_content): def _test(self, url, expected_content):
# type: (str, str) -> None # type: (str, str) -> None
@@ -1578,147 +1555,6 @@ class GetProfileTest(ZulipTestCase):
get_avatar_url(user_profile.avatar_source, user_profile.email), get_avatar_url(user_profile.avatar_source, user_profile.email),
) )
class UserPresenceTests(ZulipTestCase):
def test_get_empty(self):
# type: () -> None
self.login("hamlet@zulip.com")
result = self.client_post("/json/get_active_statuses")
self.assert_json_success(result)
json = ujson.loads(result.content)
for email, presence in json['presences'].items():
self.assertEqual(presence, {})
def test_invalid_presence(self):
# type: () -> None
email = "hamlet@zulip.com"
self.login(email)
result = self.client_post("/json/users/me/presence", {'status': 'foo'})
self.assert_json_error(result, 'Invalid presence status: foo')
def test_set_idle(self):
# type: () -> None
email = "hamlet@zulip.com"
self.login(email)
client = 'website'
def test_result(result):
# type: (HttpResponse) -> datetime.datetime
self.assert_json_success(result)
json = ujson.loads(result.content)
self.assertEqual(json['presences'][email][client]['status'], 'idle')
self.assertIn('timestamp', json['presences'][email][client])
self.assertIsInstance(json['presences'][email][client]['timestamp'], int)
self.assertEqual(list(json['presences'].keys()), ['hamlet@zulip.com'])
return json['presences'][email][client]['timestamp']
result = self.client_post("/json/users/me/presence", {'status': 'idle'})
test_result(result)
result = self.client_post("/json/get_active_statuses", {})
timestamp = test_result(result)
email = "othello@zulip.com"
self.login(email)
self.client_post("/json/users/me/presence", {'status': 'idle'})
result = self.client_post("/json/get_active_statuses", {})
self.assert_json_success(result)
json = ujson.loads(result.content)
self.assertEqual(json['presences'][email][client]['status'], 'idle')
self.assertEqual(json['presences']['hamlet@zulip.com'][client]['status'], 'idle')
self.assertEqual(sorted(json['presences'].keys()), ['hamlet@zulip.com', 'othello@zulip.com'])
newer_timestamp = json['presences'][email][client]['timestamp']
self.assertGreaterEqual(newer_timestamp, timestamp)
def test_set_active(self):
# type: () -> None
self.login("hamlet@zulip.com")
client = 'website'
self.client_post("/json/users/me/presence", {'status': 'idle'})
result = self.client_post("/json/get_active_statuses", {})
self.assert_json_success(result)
json = ujson.loads(result.content)
self.assertEqual(json['presences']["hamlet@zulip.com"][client]['status'], 'idle')
email = "othello@zulip.com"
self.login("othello@zulip.com")
self.client_post("/json/users/me/presence", {'status': 'idle'})
result = self.client_post("/json/get_active_statuses", {})
self.assert_json_success(result)
json = ujson.loads(result.content)
self.assertEqual(json['presences'][email][client]['status'], 'idle')
self.assertEqual(json['presences']['hamlet@zulip.com'][client]['status'], 'idle')
self.client_post("/json/users/me/presence", {'status': 'active'})
result = self.client_post("/json/get_active_statuses", {})
self.assert_json_success(result)
json = ujson.loads(result.content)
self.assertEqual(json['presences'][email][client]['status'], 'active')
self.assertEqual(json['presences']['hamlet@zulip.com'][client]['status'], 'idle')
def test_no_mit(self):
# type: () -> None
"""Zephyr mirror realms such as MIT never get a list of users"""
self.login("espuser@mit.edu")
result = self.client_post("/json/users/me/presence", {'status': 'idle'})
self.assert_json_success(result)
json = ujson.loads(result.content)
self.assertEqual(json['presences'], {})
def test_mirror_presence(self):
# type: () -> None
"""Zephyr mirror realms find out the status of their mirror bot"""
email = 'espuser@mit.edu'
user_profile = get_user_profile_by_email(email)
self.login(email)
def post_presence():
# type: () -> Dict[str, Any]
result = self.client_post("/json/users/me/presence", {'status': 'idle'})
self.assert_json_success(result)
json = ujson.loads(result.content)
return json
json = post_presence()
self.assertEqual(json['zephyr_mirror_active'], False)
self._simulate_mirror_activity_for_user(user_profile)
json = post_presence()
self.assertEqual(json['zephyr_mirror_active'], True)
def _simulate_mirror_activity_for_user(self, user_profile):
# type: (UserProfile) -> None
last_visit = datetime.datetime.now()
client = make_client('zephyr_mirror')
UserActivity.objects.get_or_create(
user_profile=user_profile,
client=client,
query='get_events_backend',
count=2,
last_visit=last_visit
)
def test_same_realm(self):
# type: () -> None
self.login("espuser@mit.edu")
self.client_post("/json/users/me/presence", {'status': 'idle'})
result = self.client_post("/accounts/logout/")
# Ensure we don't see hamlet@zulip.com information leakage
self.login("hamlet@zulip.com")
result = self.client_post("/json/users/me/presence", {'status': 'idle'})
self.assert_json_success(result)
json = ujson.loads(result.content)
self.assertEqual(json['presences']["hamlet@zulip.com"]["website"]['status'], 'idle')
# We only want @zulip.com emails
for email in json['presences'].keys():
self.assertEqual(split_email_to_domain(email), 'zulip.com')
class AlertWordTests(ZulipTestCase): class AlertWordTests(ZulipTestCase):
interesting_alert_word_list = ['alert', 'multi-word word', u''] interesting_alert_word_list = ['alert', 'multi-word word', u'']