mirror of
				https://github.com/zulip/zulip.git
				synced 2025-11-03 21:43:21 +00:00 
			
		
		
		
	Noun: backup, checkout, cleanup, login, logout, setup, shutdown, signup, timeout. Verb: back up, check out, clean up, log in, log out, set up, shut down, sign up, time out. Signed-off-by: Anders Kaseorg <anders@zulip.com>
		
			
				
	
	
		
			715 lines
		
	
	
		
			27 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			715 lines
		
	
	
		
			27 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import datetime
 | 
						|
from datetime import timedelta
 | 
						|
from typing import Any, Dict
 | 
						|
from unittest import mock
 | 
						|
 | 
						|
from django.utils.timezone import now as timezone_now
 | 
						|
 | 
						|
from zerver.lib.actions import do_deactivate_user
 | 
						|
from zerver.lib.presence import get_status_dict_by_realm
 | 
						|
from zerver.lib.statistics import seconds_usage_between
 | 
						|
from zerver.lib.test_classes import ZulipTestCase
 | 
						|
from zerver.lib.test_helpers import make_client, queries_captured, reset_emails_in_zulip_realm
 | 
						|
from zerver.lib.timestamp import datetime_to_timestamp
 | 
						|
from zerver.models import (
 | 
						|
    Client,
 | 
						|
    PushDeviceToken,
 | 
						|
    UserActivity,
 | 
						|
    UserActivityInterval,
 | 
						|
    UserPresence,
 | 
						|
    UserProfile,
 | 
						|
    flush_per_request_caches,
 | 
						|
)
 | 
						|
 | 
						|
 | 
						|
class ActivityTest(ZulipTestCase):
 | 
						|
    @mock.patch("stripe.Customer.list", return_value=[])
 | 
						|
    def test_activity(self, unused_mock: mock.Mock) -> None:
 | 
						|
        self.login("hamlet")
 | 
						|
        client, _ = Client.objects.get_or_create(name="website")
 | 
						|
        query = "/json/messages/flags"
 | 
						|
        last_visit = timezone_now()
 | 
						|
        count = 150
 | 
						|
        for activity_user_profile in UserProfile.objects.all():
 | 
						|
            UserActivity.objects.get_or_create(
 | 
						|
                user_profile=activity_user_profile,
 | 
						|
                client=client,
 | 
						|
                query=query,
 | 
						|
                count=count,
 | 
						|
                last_visit=last_visit,
 | 
						|
            )
 | 
						|
 | 
						|
        # Fails when not staff
 | 
						|
        result = self.client_get("/activity")
 | 
						|
        self.assertEqual(result.status_code, 302)
 | 
						|
 | 
						|
        user_profile = self.example_user("hamlet")
 | 
						|
        user_profile.is_staff = True
 | 
						|
        user_profile.save()
 | 
						|
 | 
						|
        flush_per_request_caches()
 | 
						|
        with queries_captured() as queries:
 | 
						|
            result = self.client_get("/activity")
 | 
						|
            self.assertEqual(result.status_code, 200)
 | 
						|
 | 
						|
        self.assert_length(queries, 18)
 | 
						|
 | 
						|
        flush_per_request_caches()
 | 
						|
        with queries_captured() as queries:
 | 
						|
            result = self.client_get("/realm_activity/zulip/")
 | 
						|
            self.assertEqual(result.status_code, 200)
 | 
						|
 | 
						|
        self.assert_length(queries, 8)
 | 
						|
 | 
						|
        flush_per_request_caches()
 | 
						|
        with queries_captured() as queries:
 | 
						|
            result = self.client_get("/user_activity/iago@zulip.com/")
 | 
						|
            self.assertEqual(result.status_code, 200)
 | 
						|
 | 
						|
        self.assert_length(queries, 4)
 | 
						|
 | 
						|
 | 
						|
class TestClientModel(ZulipTestCase):
 | 
						|
    def test_client_stringification(self) -> None:
 | 
						|
        """
 | 
						|
        This test is designed to cover __str__ method for Client.
 | 
						|
        """
 | 
						|
        client = make_client("some_client")
 | 
						|
        self.assertEqual(str(client), "<Client: some_client>")
 | 
						|
 | 
						|
 | 
						|
class UserPresenceModelTests(ZulipTestCase):
 | 
						|
    def test_date_logic(self) -> None:
 | 
						|
        UserPresence.objects.all().delete()
 | 
						|
 | 
						|
        user_profile = self.example_user("hamlet")
 | 
						|
        email = user_profile.email
 | 
						|
        presence_dct = get_status_dict_by_realm(user_profile.realm_id)
 | 
						|
        self.assertEqual(len(presence_dct), 0)
 | 
						|
 | 
						|
        self.login_user(user_profile)
 | 
						|
        result = self.client_post("/json/users/me/presence", {"status": "active"})
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
        slim_presence = False
 | 
						|
        presence_dct = get_status_dict_by_realm(user_profile.realm_id, slim_presence)
 | 
						|
        self.assertEqual(len(presence_dct), 1)
 | 
						|
        self.assertEqual(presence_dct[email]["website"]["status"], "active")
 | 
						|
 | 
						|
        slim_presence = True
 | 
						|
        presence_dct = get_status_dict_by_realm(user_profile.realm_id, slim_presence)
 | 
						|
        self.assertEqual(len(presence_dct), 1)
 | 
						|
        info = presence_dct[str(user_profile.id)]
 | 
						|
        self.assertEqual(set(info.keys()), {"active_timestamp"})
 | 
						|
 | 
						|
        def back_date(num_weeks: int) -> None:
 | 
						|
            user_presence = UserPresence.objects.filter(user_profile=user_profile)[0]
 | 
						|
            user_presence.timestamp = timezone_now() - datetime.timedelta(weeks=num_weeks)
 | 
						|
            user_presence.save()
 | 
						|
 | 
						|
        # Simulate the presence being a week old first.  Nothing should change.
 | 
						|
        back_date(num_weeks=1)
 | 
						|
        presence_dct = get_status_dict_by_realm(user_profile.realm_id)
 | 
						|
        self.assertEqual(len(presence_dct), 1)
 | 
						|
 | 
						|
        # If the UserPresence row is three weeks old, we ignore it.
 | 
						|
        back_date(num_weeks=3)
 | 
						|
        presence_dct = get_status_dict_by_realm(user_profile.realm_id)
 | 
						|
        self.assertEqual(len(presence_dct), 0)
 | 
						|
 | 
						|
    def test_push_tokens(self) -> None:
 | 
						|
        UserPresence.objects.all().delete()
 | 
						|
 | 
						|
        user_profile = self.example_user("hamlet")
 | 
						|
        email = user_profile.email
 | 
						|
 | 
						|
        self.login_user(user_profile)
 | 
						|
        result = self.client_post("/json/users/me/presence", {"status": "active"})
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
        def pushable() -> bool:
 | 
						|
            presence_dct = get_status_dict_by_realm(user_profile.realm_id)
 | 
						|
            self.assertEqual(len(presence_dct), 1)
 | 
						|
            return presence_dct[email]["website"]["pushable"]
 | 
						|
 | 
						|
        self.assertFalse(pushable())
 | 
						|
 | 
						|
        user_profile.enable_offline_push_notifications = True
 | 
						|
        user_profile.save()
 | 
						|
 | 
						|
        self.assertFalse(pushable())
 | 
						|
 | 
						|
        PushDeviceToken.objects.create(
 | 
						|
            user=user_profile,
 | 
						|
            kind=PushDeviceToken.APNS,
 | 
						|
        )
 | 
						|
        self.assertTrue(pushable())
 | 
						|
 | 
						|
 | 
						|
class UserPresenceTests(ZulipTestCase):
 | 
						|
    def test_invalid_presence(self) -> None:
 | 
						|
        user = self.example_user("hamlet")
 | 
						|
        self.login_user(user)
 | 
						|
        result = self.client_post("/json/users/me/presence", {"status": "foo"})
 | 
						|
        self.assert_json_error(result, "Invalid status: foo")
 | 
						|
 | 
						|
    def test_set_idle(self) -> None:
 | 
						|
        client = "website"
 | 
						|
 | 
						|
        hamlet = self.example_user("hamlet")
 | 
						|
        othello = self.example_user("othello")
 | 
						|
 | 
						|
        self.login_user(hamlet)
 | 
						|
 | 
						|
        params = dict(status="idle")
 | 
						|
        result = self.client_post("/json/users/me/presence", params)
 | 
						|
        self.assert_json_success(result)
 | 
						|
        json = result.json()
 | 
						|
        self.assertEqual(json["presences"][hamlet.email][client]["status"], "idle")
 | 
						|
        self.assertIn("timestamp", json["presences"][hamlet.email][client])
 | 
						|
        self.assertIsInstance(json["presences"][hamlet.email][client]["timestamp"], int)
 | 
						|
        self.assertEqual(set(json["presences"].keys()), {hamlet.email})
 | 
						|
 | 
						|
        self.login_user(othello)
 | 
						|
        params = dict(status="idle", slim_presence="true")
 | 
						|
        result = self.client_post("/json/users/me/presence", params)
 | 
						|
        json = result.json()
 | 
						|
        presences = json["presences"]
 | 
						|
        self.assertEqual(
 | 
						|
            set(presences.keys()),
 | 
						|
            {str(hamlet.id), str(othello.id)},
 | 
						|
        )
 | 
						|
        hamlet_info = presences[str(hamlet.id)]
 | 
						|
        othello_info = presences[str(othello.id)]
 | 
						|
 | 
						|
        self.assertEqual(set(hamlet_info.keys()), {"idle_timestamp"})
 | 
						|
        self.assertEqual(set(othello_info.keys()), {"idle_timestamp"})
 | 
						|
 | 
						|
        self.assertGreaterEqual(
 | 
						|
            othello_info["idle_timestamp"],
 | 
						|
            hamlet_info["idle_timestamp"],
 | 
						|
        )
 | 
						|
 | 
						|
    def test_set_active(self) -> None:
 | 
						|
        hamlet = self.example_user("hamlet")
 | 
						|
        othello = self.example_user("othello")
 | 
						|
 | 
						|
        self.login_user(hamlet)
 | 
						|
        client = "website"
 | 
						|
 | 
						|
        params = dict(status="idle")
 | 
						|
        result = self.client_post("/json/users/me/presence", params)
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
        self.assertEqual(result.json()["presences"][hamlet.email][client]["status"], "idle")
 | 
						|
 | 
						|
        self.login("othello")
 | 
						|
        params = dict(status="idle")
 | 
						|
        result = self.client_post("/json/users/me/presence", params)
 | 
						|
        self.assert_json_success(result)
 | 
						|
        json = result.json()
 | 
						|
 | 
						|
        self.assertEqual(json["presences"][othello.email][client]["status"], "idle")
 | 
						|
        self.assertEqual(json["presences"][hamlet.email][client]["status"], "idle")
 | 
						|
 | 
						|
        params = dict(status="active")
 | 
						|
        result = self.client_post("/json/users/me/presence", params)
 | 
						|
        self.assert_json_success(result)
 | 
						|
        json = result.json()
 | 
						|
 | 
						|
        self.assertEqual(json["presences"][othello.email][client]["status"], "active")
 | 
						|
        self.assertEqual(json["presences"][hamlet.email][client]["status"], "idle")
 | 
						|
 | 
						|
        self.login_user(hamlet)
 | 
						|
        params = dict(status="active", slim_presence="true")
 | 
						|
        result = self.client_post("/json/users/me/presence", params)
 | 
						|
        self.assert_json_success(result)
 | 
						|
        json = result.json()
 | 
						|
 | 
						|
        presences = json["presences"]
 | 
						|
        self.assertEqual(
 | 
						|
            set(presences.keys()),
 | 
						|
            {str(hamlet.id), str(othello.id)},
 | 
						|
        )
 | 
						|
        othello_info = presences[str(othello.id)]
 | 
						|
        hamlet_info = presences[str(hamlet.id)]
 | 
						|
 | 
						|
        self.assertEqual(
 | 
						|
            set(othello_info.keys()),
 | 
						|
            {"active_timestamp"},
 | 
						|
        )
 | 
						|
 | 
						|
        self.assertEqual(
 | 
						|
            set(hamlet_info.keys()),
 | 
						|
            {"active_timestamp"},
 | 
						|
        )
 | 
						|
 | 
						|
        self.assertGreaterEqual(
 | 
						|
            hamlet_info["active_timestamp"],
 | 
						|
            othello_info["active_timestamp"],
 | 
						|
        )
 | 
						|
 | 
						|
    @mock.patch("stripe.Customer.list", return_value=[])
 | 
						|
    def test_new_user_input(self, unused_mock: mock.Mock) -> None:
 | 
						|
        """Mostly a test for UserActivityInterval"""
 | 
						|
        user_profile = self.example_user("hamlet")
 | 
						|
        self.login("hamlet")
 | 
						|
        self.assertEqual(UserActivityInterval.objects.filter(user_profile=user_profile).count(), 0)
 | 
						|
        time_zero = timezone_now().replace(microsecond=0)
 | 
						|
        with mock.patch("zerver.views.presence.timezone_now", return_value=time_zero):
 | 
						|
            result = self.client_post(
 | 
						|
                "/json/users/me/presence", {"status": "active", "new_user_input": "true"}
 | 
						|
            )
 | 
						|
        self.assert_json_success(result)
 | 
						|
        self.assertEqual(UserActivityInterval.objects.filter(user_profile=user_profile).count(), 1)
 | 
						|
        interval = UserActivityInterval.objects.get(user_profile=user_profile)
 | 
						|
        self.assertEqual(interval.start, time_zero)
 | 
						|
        self.assertEqual(interval.end, time_zero + UserActivityInterval.MIN_INTERVAL_LENGTH)
 | 
						|
 | 
						|
        second_time = time_zero + timedelta(seconds=600)
 | 
						|
        # Extent the interval
 | 
						|
        with mock.patch("zerver.views.presence.timezone_now", return_value=second_time):
 | 
						|
            result = self.client_post(
 | 
						|
                "/json/users/me/presence", {"status": "active", "new_user_input": "true"}
 | 
						|
            )
 | 
						|
        self.assert_json_success(result)
 | 
						|
        self.assertEqual(UserActivityInterval.objects.filter(user_profile=user_profile).count(), 1)
 | 
						|
        interval = UserActivityInterval.objects.get(user_profile=user_profile)
 | 
						|
        self.assertEqual(interval.start, time_zero)
 | 
						|
        self.assertEqual(interval.end, second_time + UserActivityInterval.MIN_INTERVAL_LENGTH)
 | 
						|
 | 
						|
        third_time = time_zero + timedelta(seconds=6000)
 | 
						|
        with mock.patch("zerver.views.presence.timezone_now", return_value=third_time):
 | 
						|
            result = self.client_post(
 | 
						|
                "/json/users/me/presence", {"status": "active", "new_user_input": "true"}
 | 
						|
            )
 | 
						|
        self.assert_json_success(result)
 | 
						|
        self.assertEqual(UserActivityInterval.objects.filter(user_profile=user_profile).count(), 2)
 | 
						|
        interval = UserActivityInterval.objects.filter(user_profile=user_profile).order_by("start")[
 | 
						|
            0
 | 
						|
        ]
 | 
						|
        self.assertEqual(interval.start, time_zero)
 | 
						|
        self.assertEqual(interval.end, second_time + UserActivityInterval.MIN_INTERVAL_LENGTH)
 | 
						|
        interval = UserActivityInterval.objects.filter(user_profile=user_profile).order_by("start")[
 | 
						|
            1
 | 
						|
        ]
 | 
						|
        self.assertEqual(interval.start, third_time)
 | 
						|
        self.assertEqual(interval.end, third_time + UserActivityInterval.MIN_INTERVAL_LENGTH)
 | 
						|
 | 
						|
        self.assertEqual(
 | 
						|
            seconds_usage_between(user_profile, time_zero, third_time).total_seconds(), 1500
 | 
						|
        )
 | 
						|
        self.assertEqual(
 | 
						|
            seconds_usage_between(
 | 
						|
                user_profile, time_zero, third_time + timedelta(seconds=10)
 | 
						|
            ).total_seconds(),
 | 
						|
            1510,
 | 
						|
        )
 | 
						|
        self.assertEqual(
 | 
						|
            seconds_usage_between(
 | 
						|
                user_profile, time_zero, third_time + timedelta(seconds=1000)
 | 
						|
            ).total_seconds(),
 | 
						|
            2400,
 | 
						|
        )
 | 
						|
        self.assertEqual(
 | 
						|
            seconds_usage_between(
 | 
						|
                user_profile, time_zero, third_time - timedelta(seconds=100)
 | 
						|
            ).total_seconds(),
 | 
						|
            1500,
 | 
						|
        )
 | 
						|
        self.assertEqual(
 | 
						|
            seconds_usage_between(
 | 
						|
                user_profile,
 | 
						|
                time_zero + timedelta(seconds=100),
 | 
						|
                third_time - timedelta(seconds=100),
 | 
						|
            ).total_seconds(),
 | 
						|
            1400,
 | 
						|
        )
 | 
						|
        self.assertEqual(
 | 
						|
            seconds_usage_between(
 | 
						|
                user_profile,
 | 
						|
                time_zero + timedelta(seconds=1200),
 | 
						|
                third_time - timedelta(seconds=100),
 | 
						|
            ).total_seconds(),
 | 
						|
            300,
 | 
						|
        )
 | 
						|
 | 
						|
        # Now test /activity with actual data
 | 
						|
        user_profile.is_staff = True
 | 
						|
        user_profile.save()
 | 
						|
        result = self.client_get("/activity")
 | 
						|
        self.assertEqual(result.status_code, 200)
 | 
						|
 | 
						|
    def test_filter_presence_idle_user_ids(self) -> None:
 | 
						|
        user_profile = self.example_user("hamlet")
 | 
						|
        from zerver.lib.actions import filter_presence_idle_user_ids
 | 
						|
 | 
						|
        self.login("hamlet")
 | 
						|
 | 
						|
        self.assertEqual(filter_presence_idle_user_ids({user_profile.id}), [user_profile.id])
 | 
						|
        self.client_post("/json/users/me/presence", {"status": "idle"})
 | 
						|
        self.assertEqual(filter_presence_idle_user_ids({user_profile.id}), [user_profile.id])
 | 
						|
 | 
						|
        # Active presence from the mobile app doesn't count
 | 
						|
        self.client_post(
 | 
						|
            "/json/users/me/presence", {"status": "active"}, HTTP_USER_AGENT="ZulipMobile/1.0"
 | 
						|
        )
 | 
						|
        self.assertEqual(filter_presence_idle_user_ids({user_profile.id}), [user_profile.id])
 | 
						|
 | 
						|
        self.client_post("/json/users/me/presence", {"status": "active"})
 | 
						|
        self.assertEqual(filter_presence_idle_user_ids({user_profile.id}), [])
 | 
						|
 | 
						|
    def test_no_mit(self) -> None:
 | 
						|
        """Zephyr mirror realms such as MIT never get a list of users"""
 | 
						|
        user = self.mit_user("espuser")
 | 
						|
        self.login_user(user)
 | 
						|
        result = self.client_post("/json/users/me/presence", {"status": "idle"}, subdomain="zephyr")
 | 
						|
        self.assert_json_success(result)
 | 
						|
        self.assertEqual(result.json()["presences"], {})
 | 
						|
 | 
						|
    def test_mirror_presence(self) -> None:
 | 
						|
        """Zephyr mirror realms find out the status of their mirror bot"""
 | 
						|
        user_profile = self.mit_user("espuser")
 | 
						|
        self.login_user(user_profile)
 | 
						|
 | 
						|
        def post_presence() -> Dict[str, Any]:
 | 
						|
            result = self.client_post(
 | 
						|
                "/json/users/me/presence", {"status": "idle"}, subdomain="zephyr"
 | 
						|
            )
 | 
						|
            self.assert_json_success(result)
 | 
						|
            json = result.json()
 | 
						|
            return json
 | 
						|
 | 
						|
        json = post_presence()
 | 
						|
        self.assertEqual(json["zephyr_mirror_active"], False)
 | 
						|
 | 
						|
        self._simulate_mirror_activity_for_user(user_profile)
 | 
						|
        json = post_presence()
 | 
						|
        self.assertEqual(json["zephyr_mirror_active"], True)
 | 
						|
 | 
						|
    def _simulate_mirror_activity_for_user(self, user_profile: UserProfile) -> None:
 | 
						|
        last_visit = timezone_now()
 | 
						|
        client = make_client("zephyr_mirror")
 | 
						|
 | 
						|
        UserActivity.objects.get_or_create(
 | 
						|
            user_profile=user_profile,
 | 
						|
            client=client,
 | 
						|
            query="get_events",
 | 
						|
            count=2,
 | 
						|
            last_visit=last_visit,
 | 
						|
        )
 | 
						|
 | 
						|
    def test_same_realm(self) -> None:
 | 
						|
        espuser = self.mit_user("espuser")
 | 
						|
        self.login_user(espuser)
 | 
						|
        self.client_post("/json/users/me/presence", {"status": "idle"}, subdomain="zephyr")
 | 
						|
        self.logout()
 | 
						|
 | 
						|
        # Ensure we don't see hamlet@zulip.com information leakage
 | 
						|
        hamlet = self.example_user("hamlet")
 | 
						|
        self.login_user(hamlet)
 | 
						|
        result = self.client_post("/json/users/me/presence", {"status": "idle"})
 | 
						|
        self.assert_json_success(result)
 | 
						|
        json = result.json()
 | 
						|
        self.assertEqual(json["presences"][hamlet.email]["website"]["status"], "idle")
 | 
						|
        self.assertEqual(
 | 
						|
            json["presences"].keys(),
 | 
						|
            {hamlet.email},
 | 
						|
        )
 | 
						|
 | 
						|
 | 
						|
class SingleUserPresenceTests(ZulipTestCase):
 | 
						|
    def test_email_access(self) -> None:
 | 
						|
        user = self.example_user("hamlet")
 | 
						|
        self.login_user(user)
 | 
						|
 | 
						|
        other_user = self.example_user("othello")
 | 
						|
        other_user.email = "email@zulip.com"
 | 
						|
        other_user.delivery_email = "delivery_email@zulip.com"
 | 
						|
        other_user.save()
 | 
						|
 | 
						|
        # Note that we don't leak any info on delivery emails.
 | 
						|
        result = self.client_get("/json/users/delivery_email@zulip.com/presence")
 | 
						|
        self.assert_json_error(result, "No such user")
 | 
						|
 | 
						|
        result = self.client_get("/json/users/not_even_in_realm@zulip.com/presence")
 | 
						|
        self.assert_json_error(result, "No such user")
 | 
						|
 | 
						|
        # For a known email, we may simply complain about lack of presence info.
 | 
						|
        result = self.client_get("/json/users/email@zulip.com/presence")
 | 
						|
        self.assert_json_error(result, "No presence data for email@zulip.com")
 | 
						|
 | 
						|
    def test_single_user_get(self) -> None:
 | 
						|
        reset_emails_in_zulip_realm()
 | 
						|
 | 
						|
        # First, we set up the test with some data
 | 
						|
        user = self.example_user("othello")
 | 
						|
        self.login_user(user)
 | 
						|
        result = self.client_post("/json/users/me/presence", {"status": "active"})
 | 
						|
        result = self.client_post(
 | 
						|
            "/json/users/me/presence", {"status": "active"}, HTTP_USER_AGENT="ZulipDesktop/1.0"
 | 
						|
        )
 | 
						|
        result = self.api_post(
 | 
						|
            user,
 | 
						|
            "/api/v1/users/me/presence",
 | 
						|
            {"status": "idle"},
 | 
						|
            HTTP_USER_AGENT="ZulipAndroid/1.0",
 | 
						|
        )
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
        # Check some error conditions
 | 
						|
        result = self.client_get("/json/users/nonexistence@zulip.com/presence")
 | 
						|
        self.assert_json_error(result, "No such user")
 | 
						|
 | 
						|
        result = self.client_get("/json/users/cordelia@zulip.com/presence")
 | 
						|
        self.assert_json_error(result, "No presence data for cordelia@zulip.com")
 | 
						|
 | 
						|
        cordelia = self.example_user("cordelia")
 | 
						|
        result = self.client_get(f"/json/users/{cordelia.id}/presence")
 | 
						|
        self.assert_json_error(result, f"No presence data for {cordelia.id}")
 | 
						|
 | 
						|
        do_deactivate_user(self.example_user("cordelia"), acting_user=None)
 | 
						|
        result = self.client_get("/json/users/cordelia@zulip.com/presence")
 | 
						|
        self.assert_json_error(result, "No such user")
 | 
						|
 | 
						|
        result = self.client_get(f"/json/users/{cordelia.id}/presence")
 | 
						|
        self.assert_json_error(result, "No such user")
 | 
						|
 | 
						|
        result = self.client_get("/json/users/default-bot@zulip.com/presence")
 | 
						|
        self.assert_json_error(result, "Presence is not supported for bot users.")
 | 
						|
 | 
						|
        sipbtest = self.mit_user("sipbtest")
 | 
						|
        self.login_user(sipbtest)
 | 
						|
        result = self.client_get("/json/users/othello@zulip.com/presence", subdomain="zephyr")
 | 
						|
        self.assert_json_error(result, "No such user")
 | 
						|
 | 
						|
        othello = self.example_user("othello")
 | 
						|
        result = self.client_get(f"/json/users/{othello.id}/presence", subdomain="zephyr")
 | 
						|
        self.assert_json_error(result, "No such user")
 | 
						|
 | 
						|
        # Then, we check everything works
 | 
						|
        self.login("hamlet")
 | 
						|
        result = self.client_get("/json/users/othello@zulip.com/presence")
 | 
						|
        result_dict = result.json()
 | 
						|
        self.assertEqual(
 | 
						|
            set(result_dict["presence"].keys()), {"ZulipAndroid", "website", "aggregated"}
 | 
						|
        )
 | 
						|
        self.assertEqual(set(result_dict["presence"]["website"].keys()), {"status", "timestamp"})
 | 
						|
 | 
						|
        result = self.client_get(f"/json/users/{othello.id}/presence")
 | 
						|
        result_dict = result.json()
 | 
						|
        self.assertEqual(
 | 
						|
            set(result_dict["presence"].keys()), {"ZulipAndroid", "website", "aggregated"}
 | 
						|
        )
 | 
						|
        self.assertEqual(set(result_dict["presence"]["website"].keys()), {"status", "timestamp"})
 | 
						|
 | 
						|
    def test_ping_only(self) -> None:
 | 
						|
 | 
						|
        self.login("othello")
 | 
						|
        req = dict(
 | 
						|
            status="active",
 | 
						|
            ping_only="true",
 | 
						|
        )
 | 
						|
        result = self.client_post("/json/users/me/presence", req)
 | 
						|
        self.assertEqual(result.json()["msg"], "")
 | 
						|
 | 
						|
 | 
						|
class UserPresenceAggregationTests(ZulipTestCase):
 | 
						|
    def _send_presence_for_aggregated_tests(
 | 
						|
        self, user: UserProfile, status: str, validate_time: datetime.datetime
 | 
						|
    ) -> Dict[str, Dict[str, Any]]:
 | 
						|
        self.login_user(user)
 | 
						|
        timezone_util = "zerver.views.presence.timezone_now"
 | 
						|
        with mock.patch(timezone_util, return_value=validate_time - datetime.timedelta(seconds=5)):
 | 
						|
            self.client_post("/json/users/me/presence", {"status": status})
 | 
						|
        with mock.patch(timezone_util, return_value=validate_time - datetime.timedelta(seconds=2)):
 | 
						|
            self.api_post(
 | 
						|
                user,
 | 
						|
                "/api/v1/users/me/presence",
 | 
						|
                {"status": status},
 | 
						|
                HTTP_USER_AGENT="ZulipAndroid/1.0",
 | 
						|
            )
 | 
						|
        with mock.patch(timezone_util, return_value=validate_time - datetime.timedelta(seconds=7)):
 | 
						|
            latest_result = self.api_post(
 | 
						|
                user,
 | 
						|
                "/api/v1/users/me/presence",
 | 
						|
                {"status": status},
 | 
						|
                HTTP_USER_AGENT="ZulipIOS/1.0",
 | 
						|
            )
 | 
						|
        latest_result_dict = latest_result.json()
 | 
						|
        self.assertDictEqual(
 | 
						|
            latest_result_dict["presences"][user.email]["aggregated"],
 | 
						|
            {
 | 
						|
                "status": status,
 | 
						|
                "timestamp": datetime_to_timestamp(validate_time - datetime.timedelta(seconds=2)),
 | 
						|
                "client": "ZulipAndroid",
 | 
						|
            },
 | 
						|
        )
 | 
						|
 | 
						|
        result = self.client_get(f"/json/users/{user.email}/presence")
 | 
						|
        return result.json()
 | 
						|
 | 
						|
    def test_aggregated_info(self) -> None:
 | 
						|
        user = self.example_user("othello")
 | 
						|
        validate_time = timezone_now()
 | 
						|
        self._send_presence_for_aggregated_tests(user, "active", validate_time)
 | 
						|
        with mock.patch(
 | 
						|
            "zerver.views.presence.timezone_now",
 | 
						|
            return_value=validate_time - datetime.timedelta(seconds=1),
 | 
						|
        ):
 | 
						|
            result = self.api_post(
 | 
						|
                user,
 | 
						|
                "/api/v1/users/me/presence",
 | 
						|
                {"status": "active"},
 | 
						|
                HTTP_USER_AGENT="ZulipTestDev/1.0",
 | 
						|
            )
 | 
						|
        result_dict = result.json()
 | 
						|
        self.assertDictEqual(
 | 
						|
            result_dict["presences"][user.email]["aggregated"],
 | 
						|
            {
 | 
						|
                "status": "active",
 | 
						|
                "timestamp": datetime_to_timestamp(validate_time - datetime.timedelta(seconds=1)),
 | 
						|
                "client": "ZulipTestDev",
 | 
						|
            },
 | 
						|
        )
 | 
						|
 | 
						|
    def test_aggregated_presense_active(self) -> None:
 | 
						|
        user = self.example_user("othello")
 | 
						|
        validate_time = timezone_now()
 | 
						|
        result_dict = self._send_presence_for_aggregated_tests(user, "active", validate_time)
 | 
						|
        self.assertDictEqual(
 | 
						|
            result_dict["presence"]["aggregated"],
 | 
						|
            {
 | 
						|
                "status": "active",
 | 
						|
                "timestamp": datetime_to_timestamp(validate_time - datetime.timedelta(seconds=2)),
 | 
						|
            },
 | 
						|
        )
 | 
						|
 | 
						|
    def test_aggregated_presense_idle(self) -> None:
 | 
						|
        user = self.example_user("othello")
 | 
						|
        validate_time = timezone_now()
 | 
						|
        result_dict = self._send_presence_for_aggregated_tests(user, "idle", validate_time)
 | 
						|
        self.assertDictEqual(
 | 
						|
            result_dict["presence"]["aggregated"],
 | 
						|
            {
 | 
						|
                "status": "idle",
 | 
						|
                "timestamp": datetime_to_timestamp(validate_time - datetime.timedelta(seconds=2)),
 | 
						|
            },
 | 
						|
        )
 | 
						|
 | 
						|
    def test_aggregated_presense_mixed(self) -> None:
 | 
						|
        user = self.example_user("othello")
 | 
						|
        self.login_user(user)
 | 
						|
        validate_time = timezone_now()
 | 
						|
        with mock.patch(
 | 
						|
            "zerver.views.presence.timezone_now",
 | 
						|
            return_value=validate_time - datetime.timedelta(seconds=3),
 | 
						|
        ):
 | 
						|
            self.api_post(
 | 
						|
                user,
 | 
						|
                "/api/v1/users/me/presence",
 | 
						|
                {"status": "active"},
 | 
						|
                HTTP_USER_AGENT="ZulipTestDev/1.0",
 | 
						|
            )
 | 
						|
        result_dict = self._send_presence_for_aggregated_tests(user, "idle", validate_time)
 | 
						|
        self.assertDictEqual(
 | 
						|
            result_dict["presence"]["aggregated"],
 | 
						|
            {
 | 
						|
                "status": "idle",
 | 
						|
                "timestamp": datetime_to_timestamp(validate_time - datetime.timedelta(seconds=2)),
 | 
						|
            },
 | 
						|
        )
 | 
						|
 | 
						|
    def test_aggregated_presense_offline(self) -> None:
 | 
						|
        user = self.example_user("othello")
 | 
						|
        self.login_user(user)
 | 
						|
        validate_time = timezone_now()
 | 
						|
        with self.settings(OFFLINE_THRESHOLD_SECS=1):
 | 
						|
            result_dict = self._send_presence_for_aggregated_tests(user, "idle", validate_time)
 | 
						|
        self.assertDictEqual(
 | 
						|
            result_dict["presence"]["aggregated"],
 | 
						|
            {
 | 
						|
                "status": "offline",
 | 
						|
                "timestamp": datetime_to_timestamp(validate_time - datetime.timedelta(seconds=2)),
 | 
						|
            },
 | 
						|
        )
 | 
						|
 | 
						|
 | 
						|
class GetRealmStatusesTest(ZulipTestCase):
 | 
						|
    def test_get_statuses(self) -> None:
 | 
						|
        # Set up the test by simulating users reporting their presence data.
 | 
						|
        othello = self.example_user("othello")
 | 
						|
        hamlet = self.example_user("hamlet")
 | 
						|
 | 
						|
        result = self.api_post(
 | 
						|
            othello,
 | 
						|
            "/api/v1/users/me/presence",
 | 
						|
            dict(status="active"),
 | 
						|
            HTTP_USER_AGENT="ZulipAndroid/1.0",
 | 
						|
        )
 | 
						|
 | 
						|
        result = self.api_post(
 | 
						|
            hamlet,
 | 
						|
            "/api/v1/users/me/presence",
 | 
						|
            dict(status="idle"),
 | 
						|
            HTTP_USER_AGENT="ZulipDesktop/1.0",
 | 
						|
        )
 | 
						|
        self.assert_json_success(result)
 | 
						|
        json = result.json()
 | 
						|
        self.assertEqual(set(json["presences"].keys()), {hamlet.email, othello.email})
 | 
						|
 | 
						|
        result = self.api_post(
 | 
						|
            hamlet,
 | 
						|
            "/api/v1/users/me/presence",
 | 
						|
            dict(status="active", slim_presence="true"),
 | 
						|
            HTTP_USER_AGENT="ZulipDesktop/1.0",
 | 
						|
        )
 | 
						|
        self.assert_json_success(result)
 | 
						|
        json = result.json()
 | 
						|
        self.assertEqual(set(json["presences"].keys()), {str(hamlet.id), str(othello.id)})
 | 
						|
 | 
						|
        # Check that a bot can fetch the presence data for the realm.
 | 
						|
        result = self.api_get(self.example_user("default_bot"), "/api/v1/realm/presence")
 | 
						|
        self.assert_json_success(result)
 | 
						|
        json = result.json()
 | 
						|
        self.assertEqual(set(json["presences"].keys()), {hamlet.email, othello.email})
 | 
						|
 | 
						|
    def test_presence_disabled(self) -> None:
 | 
						|
        # Disable presence status and test whether the presence
 | 
						|
        # is reported or not.
 | 
						|
        othello = self.example_user("othello")
 | 
						|
        hamlet = self.example_user("hamlet")
 | 
						|
        othello.presence_enabled = False
 | 
						|
        hamlet.presence_enabled = True
 | 
						|
        othello.save(update_fields=["presence_enabled"])
 | 
						|
        hamlet.save(update_fields=["presence_enabled"])
 | 
						|
 | 
						|
        result = self.api_post(
 | 
						|
            othello,
 | 
						|
            "/api/v1/users/me/presence",
 | 
						|
            dict(status="active"),
 | 
						|
            HTTP_USER_AGENT="ZulipAndroid/1.0",
 | 
						|
        )
 | 
						|
 | 
						|
        result = self.api_post(
 | 
						|
            hamlet,
 | 
						|
            "/api/v1/users/me/presence",
 | 
						|
            dict(status="idle"),
 | 
						|
            HTTP_USER_AGENT="ZulipDesktop/1.0",
 | 
						|
        )
 | 
						|
        self.assert_json_success(result)
 | 
						|
        json = result.json()
 | 
						|
 | 
						|
        # Othello's presence status is disabled so it won't be reported.
 | 
						|
        self.assertEqual(set(json["presences"].keys()), {hamlet.email})
 | 
						|
 | 
						|
        result = self.api_post(
 | 
						|
            hamlet,
 | 
						|
            "/api/v1/users/me/presence",
 | 
						|
            dict(status="active", slim_presence="true"),
 | 
						|
            HTTP_USER_AGENT="ZulipDesktop/1.0",
 | 
						|
        )
 | 
						|
        self.assert_json_success(result)
 | 
						|
        json = result.json()
 | 
						|
        self.assertEqual(set(json["presences"].keys()), {str(hamlet.id)})
 |