mirror of
				https://github.com/zulip/zulip.git
				synced 2025-11-03 21:43:21 +00:00 
			
		
		
		
	This fixes an unpleasant regression in
f5edeb01ae, where we stopped correctly
filtering users who have an open browser session that's idle.  These
users are tagged as "UserPresence.IDLE" with an current timestamp in
the database, and should be treated as idle for presence purposes.
As a result, if you had an open Zulip browser session, you incorrectly
wouldn't get missed-message emails for PMs and mentions before this fix.
		
	
		
			
				
	
	
		
			389 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			389 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# -*- coding: utf-8 -*-
 | 
						|
 | 
						|
from django.http import HttpResponse
 | 
						|
from django.test import override_settings
 | 
						|
from django.utils.timezone import now as timezone_now
 | 
						|
from mock import mock
 | 
						|
 | 
						|
from typing import Any, Dict
 | 
						|
from zerver.lib.actions import do_deactivate_user
 | 
						|
from zerver.lib.test_helpers import (
 | 
						|
    make_client,
 | 
						|
    queries_captured,
 | 
						|
)
 | 
						|
from zerver.lib.test_classes import (
 | 
						|
    ZulipTestCase,
 | 
						|
)
 | 
						|
from zerver.lib.timestamp import datetime_to_timestamp
 | 
						|
from zerver.models import (
 | 
						|
    email_to_domain,
 | 
						|
    Client,
 | 
						|
    PushDeviceToken,
 | 
						|
    UserActivity,
 | 
						|
    UserProfile,
 | 
						|
    UserPresence,
 | 
						|
    flush_per_request_caches,
 | 
						|
    get_realm,
 | 
						|
)
 | 
						|
 | 
						|
import datetime
 | 
						|
 | 
						|
class ActivityTest(ZulipTestCase):
 | 
						|
    def test_activity(self) -> None:
 | 
						|
        self.login(self.example_email("hamlet"))
 | 
						|
        client, _ = Client.objects.get_or_create(name='website')
 | 
						|
        query = '/json/users/me/pointer'
 | 
						|
        last_visit = timezone_now()
 | 
						|
        count = 150
 | 
						|
        for user_profile in UserProfile.objects.all():
 | 
						|
            UserActivity.objects.get_or_create(
 | 
						|
                user_profile=user_profile,
 | 
						|
                client=client,
 | 
						|
                query=query,
 | 
						|
                count=count,
 | 
						|
                last_visit=last_visit
 | 
						|
            )
 | 
						|
        flush_per_request_caches()
 | 
						|
        with queries_captured() as queries:
 | 
						|
            self.client_get('/activity')
 | 
						|
 | 
						|
        self.assert_length(queries, 4)
 | 
						|
 | 
						|
class TestClientModel(ZulipTestCase):
 | 
						|
    def test_client_stringification(self) -> None:
 | 
						|
        '''
 | 
						|
        This test is designed to cover __str__ method for Client.
 | 
						|
        '''
 | 
						|
        client = make_client('some_client')
 | 
						|
        self.assertEqual(str(client), '<Client: some_client>')
 | 
						|
 | 
						|
class UserPresenceModelTests(ZulipTestCase):
 | 
						|
    def test_date_logic(self) -> None:
 | 
						|
        UserPresence.objects.all().delete()
 | 
						|
 | 
						|
        user_profile = self.example_user('hamlet')
 | 
						|
        email = user_profile.email
 | 
						|
        presence_dct = UserPresence.get_status_dict_by_realm(user_profile.realm_id)
 | 
						|
        self.assertEqual(len(presence_dct), 0)
 | 
						|
 | 
						|
        self.login(email)
 | 
						|
        result = self.client_post("/json/users/me/presence", {'status': 'active'})
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
        presence_dct = UserPresence.get_status_dict_by_realm(user_profile.realm_id)
 | 
						|
        self.assertEqual(len(presence_dct), 1)
 | 
						|
        self.assertEqual(presence_dct[email]['website']['status'], 'active')
 | 
						|
 | 
						|
        def back_date(num_weeks: int) -> None:
 | 
						|
            user_presence = UserPresence.objects.filter(user_profile=user_profile)[0]
 | 
						|
            user_presence.timestamp = timezone_now() - datetime.timedelta(weeks=num_weeks)
 | 
						|
            user_presence.save()
 | 
						|
 | 
						|
        # Simulate the presence being a week old first.  Nothing should change.
 | 
						|
        back_date(num_weeks=1)
 | 
						|
        presence_dct = UserPresence.get_status_dict_by_realm(user_profile.realm_id)
 | 
						|
        self.assertEqual(len(presence_dct), 1)
 | 
						|
 | 
						|
        # If the UserPresence row is three weeks old, we ignore it.
 | 
						|
        back_date(num_weeks=3)
 | 
						|
        presence_dct = UserPresence.get_status_dict_by_realm(user_profile.realm_id)
 | 
						|
        self.assertEqual(len(presence_dct), 0)
 | 
						|
 | 
						|
    def test_push_tokens(self) -> None:
 | 
						|
        UserPresence.objects.all().delete()
 | 
						|
 | 
						|
        user_profile = self.example_user('hamlet')
 | 
						|
        email = user_profile.email
 | 
						|
 | 
						|
        self.login(email)
 | 
						|
        result = self.client_post("/json/users/me/presence", {'status': 'active'})
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
        def pushable() -> bool:
 | 
						|
            presence_dct = UserPresence.get_status_dict_by_realm(user_profile.realm_id)
 | 
						|
            self.assertEqual(len(presence_dct), 1)
 | 
						|
            return presence_dct[email]['website']['pushable']
 | 
						|
 | 
						|
        self.assertFalse(pushable())
 | 
						|
 | 
						|
        user_profile.enable_offline_push_notifications = True
 | 
						|
        user_profile.save()
 | 
						|
 | 
						|
        self.assertFalse(pushable())
 | 
						|
 | 
						|
        PushDeviceToken.objects.create(
 | 
						|
            user=user_profile,
 | 
						|
            kind=PushDeviceToken.APNS
 | 
						|
        )
 | 
						|
        self.assertTrue(pushable())
 | 
						|
 | 
						|
class UserPresenceTests(ZulipTestCase):
 | 
						|
    def test_invalid_presence(self) -> None:
 | 
						|
        email = self.example_email("hamlet")
 | 
						|
        self.login(email)
 | 
						|
        result = self.client_post("/json/users/me/presence", {'status': 'foo'})
 | 
						|
        self.assert_json_error(result, 'Invalid status: foo')
 | 
						|
 | 
						|
    def test_set_idle(self) -> None:
 | 
						|
        email = self.example_email("hamlet")
 | 
						|
        self.login(email)
 | 
						|
        client = 'website'
 | 
						|
 | 
						|
        result = self.client_post("/json/users/me/presence", {'status': 'idle'})
 | 
						|
        self.assert_json_success(result)
 | 
						|
        json = result.json()
 | 
						|
        self.assertEqual(json['presences'][email][client]['status'], 'idle')
 | 
						|
        self.assertIn('timestamp', json['presences'][email][client])
 | 
						|
        self.assertIsInstance(json['presences'][email][client]['timestamp'], int)
 | 
						|
        self.assertEqual(list(json['presences'].keys()), [self.example_email("hamlet")])
 | 
						|
        timestamp = json['presences'][email][client]['timestamp']
 | 
						|
 | 
						|
        email = self.example_email("othello")
 | 
						|
        self.login(email)
 | 
						|
        result = self.client_post("/json/users/me/presence", {'status': 'idle'})
 | 
						|
        json = result.json()
 | 
						|
        self.assertEqual(json['presences'][email][client]['status'], 'idle')
 | 
						|
        self.assertEqual(json['presences'][self.example_email("hamlet")][client]['status'], 'idle')
 | 
						|
        self.assertEqual(sorted(json['presences'].keys()), [self.example_email("hamlet"), self.example_email("othello")])
 | 
						|
        newer_timestamp = json['presences'][email][client]['timestamp']
 | 
						|
        self.assertGreaterEqual(newer_timestamp, timestamp)
 | 
						|
 | 
						|
    def test_set_active(self) -> None:
 | 
						|
        self.login(self.example_email("hamlet"))
 | 
						|
        client = 'website'
 | 
						|
 | 
						|
        result = self.client_post("/json/users/me/presence", {'status': 'idle'})
 | 
						|
 | 
						|
        self.assert_json_success(result)
 | 
						|
        self.assertEqual(result.json()['presences'][self.example_email("hamlet")][client]['status'], 'idle')
 | 
						|
 | 
						|
        email = self.example_email("othello")
 | 
						|
        self.login(self.example_email("othello"))
 | 
						|
        result = self.client_post("/json/users/me/presence", {'status': 'idle'})
 | 
						|
        self.assert_json_success(result)
 | 
						|
        json = result.json()
 | 
						|
        self.assertEqual(json['presences'][email][client]['status'], 'idle')
 | 
						|
        self.assertEqual(json['presences'][self.example_email("hamlet")][client]['status'], 'idle')
 | 
						|
 | 
						|
        result = self.client_post("/json/users/me/presence", {'status': 'active'})
 | 
						|
        self.assert_json_success(result)
 | 
						|
        json = result.json()
 | 
						|
        self.assertEqual(json['presences'][email][client]['status'], 'active')
 | 
						|
        self.assertEqual(json['presences'][self.example_email("hamlet")][client]['status'], 'idle')
 | 
						|
 | 
						|
    def test_filter_presence_idle_user_ids(self) -> None:
 | 
						|
        user_profile = self.example_user("hamlet")
 | 
						|
        from zerver.lib.actions import filter_presence_idle_user_ids
 | 
						|
        self.login(self.example_email("hamlet"))
 | 
						|
 | 
						|
        self.assertEqual(filter_presence_idle_user_ids({user_profile.id}), [user_profile.id])
 | 
						|
        self.client_post("/json/users/me/presence", {'status': 'idle'})
 | 
						|
        self.assertEqual(filter_presence_idle_user_ids({user_profile.id}), [user_profile.id])
 | 
						|
        self.client_post("/json/users/me/presence", {'status': 'active'})
 | 
						|
        self.assertEqual(filter_presence_idle_user_ids({user_profile.id}), [])
 | 
						|
 | 
						|
    def test_no_mit(self) -> None:
 | 
						|
        """Zephyr mirror realms such as MIT never get a list of users"""
 | 
						|
        self.login(self.mit_email("espuser"), realm=get_realm("zephyr"))
 | 
						|
        result = self.client_post("/json/users/me/presence", {'status': 'idle'},
 | 
						|
                                  subdomain="zephyr")
 | 
						|
        self.assert_json_success(result)
 | 
						|
        self.assertEqual(result.json()['presences'], {})
 | 
						|
 | 
						|
    def test_mirror_presence(self) -> None:
 | 
						|
        """Zephyr mirror realms find out the status of their mirror bot"""
 | 
						|
        user_profile = self.mit_user('espuser')
 | 
						|
        email = user_profile.email
 | 
						|
        self.login(email, realm=user_profile.realm)
 | 
						|
 | 
						|
        def post_presence() -> Dict[str, Any]:
 | 
						|
            result = self.client_post("/json/users/me/presence", {'status': 'idle'},
 | 
						|
                                      subdomain="zephyr")
 | 
						|
            self.assert_json_success(result)
 | 
						|
            json = result.json()
 | 
						|
            return json
 | 
						|
 | 
						|
        json = post_presence()
 | 
						|
        self.assertEqual(json['zephyr_mirror_active'], False)
 | 
						|
 | 
						|
        self._simulate_mirror_activity_for_user(user_profile)
 | 
						|
        json = post_presence()
 | 
						|
        self.assertEqual(json['zephyr_mirror_active'], True)
 | 
						|
 | 
						|
    def _simulate_mirror_activity_for_user(self, user_profile: UserProfile) -> None:
 | 
						|
        last_visit = timezone_now()
 | 
						|
        client = make_client('zephyr_mirror')
 | 
						|
 | 
						|
        UserActivity.objects.get_or_create(
 | 
						|
            user_profile=user_profile,
 | 
						|
            client=client,
 | 
						|
            query='get_events_backend',
 | 
						|
            count=2,
 | 
						|
            last_visit=last_visit
 | 
						|
        )
 | 
						|
 | 
						|
    def test_same_realm(self) -> None:
 | 
						|
        self.login(self.mit_email("espuser"), realm=get_realm("zephyr"))
 | 
						|
        self.client_post("/json/users/me/presence", {'status': 'idle'},
 | 
						|
                         subdomain="zephyr")
 | 
						|
        self.logout()
 | 
						|
 | 
						|
        # Ensure we don't see hamlet@zulip.com information leakage
 | 
						|
        self.login(self.example_email("hamlet"))
 | 
						|
        result = self.client_post("/json/users/me/presence", {'status': 'idle'})
 | 
						|
        self.assert_json_success(result)
 | 
						|
        json = result.json()
 | 
						|
        self.assertEqual(json['presences'][self.example_email("hamlet")]["website"]['status'], 'idle')
 | 
						|
        # We only want @zulip.com emails
 | 
						|
        for email in json['presences'].keys():
 | 
						|
            self.assertEqual(email_to_domain(email), 'zulip.com')
 | 
						|
 | 
						|
class SingleUserPresenceTests(ZulipTestCase):
 | 
						|
    def test_single_user_get(self) -> None:
 | 
						|
 | 
						|
        # First, we setup the test with some data
 | 
						|
        email = self.example_email("othello")
 | 
						|
        self.login(self.example_email("othello"))
 | 
						|
        result = self.client_post("/json/users/me/presence", {'status': 'active'})
 | 
						|
        result = self.client_post("/json/users/me/presence", {'status': 'active'},
 | 
						|
                                  HTTP_USER_AGENT="ZulipDesktop/1.0")
 | 
						|
        result = self.api_post(email, "/api/v1/users/me/presence", {'status': 'idle'},
 | 
						|
                               HTTP_USER_AGENT="ZulipAndroid/1.0")
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
        # Check some error conditions
 | 
						|
        result = self.client_get("/json/users/nonexistence@zulip.com/presence")
 | 
						|
        self.assert_json_error(result, "No such user")
 | 
						|
 | 
						|
        result = self.client_get("/json/users/cordelia@zulip.com/presence")
 | 
						|
        self.assert_json_error(result, "No presence data for cordelia@zulip.com")
 | 
						|
 | 
						|
        do_deactivate_user(self.example_user('cordelia'))
 | 
						|
        result = self.client_get("/json/users/cordelia@zulip.com/presence")
 | 
						|
        self.assert_json_error(result, "No such user")
 | 
						|
 | 
						|
        result = self.client_get("/json/users/new-user-bot@zulip.com/presence")
 | 
						|
        self.assert_json_error(result, "Presence is not supported for bot users.")
 | 
						|
 | 
						|
        self.login(self.mit_email("sipbtest"), realm=get_realm("zephyr"))
 | 
						|
        result = self.client_get("/json/users/othello@zulip.com/presence",
 | 
						|
                                 subdomain="zephyr")
 | 
						|
        self.assert_json_error(result, "No such user")
 | 
						|
 | 
						|
        # Then, we check everything works
 | 
						|
        self.login(self.example_email("hamlet"))
 | 
						|
        result = self.client_get("/json/users/othello@zulip.com/presence")
 | 
						|
        result_dict = result.json()
 | 
						|
        self.assertEqual(
 | 
						|
            set(result_dict['presence'].keys()),
 | 
						|
            {"ZulipAndroid", "website", "aggregated"})
 | 
						|
        self.assertEqual(set(result_dict['presence']['website'].keys()), {"status", "timestamp"})
 | 
						|
 | 
						|
    def test_ping_only(self) -> None:
 | 
						|
 | 
						|
        self.login(self.example_email("othello"))
 | 
						|
        req = dict(
 | 
						|
            status='active',
 | 
						|
            ping_only='true',
 | 
						|
        )
 | 
						|
        result = self.client_post("/json/users/me/presence", req)
 | 
						|
        self.assertEqual(result.json()['msg'], '')
 | 
						|
 | 
						|
class UserPresenceAggregationTests(ZulipTestCase):
 | 
						|
    def _send_presence_for_aggregated_tests(self, email: str, status: str,
 | 
						|
                                            validate_time: datetime.datetime) -> Dict[str, Dict[str, Any]]:
 | 
						|
        self.login(email)
 | 
						|
        timezone_util = 'zerver.views.presence.timezone_now'
 | 
						|
        with mock.patch(timezone_util, return_value=validate_time - datetime.timedelta(seconds=5)):
 | 
						|
            self.client_post("/json/users/me/presence", {'status': status})
 | 
						|
        with mock.patch(timezone_util, return_value=validate_time - datetime.timedelta(seconds=2)):
 | 
						|
            self.api_post(email, "/api/v1/users/me/presence", {'status': status},
 | 
						|
                          HTTP_USER_AGENT="ZulipAndroid/1.0")
 | 
						|
        with mock.patch(timezone_util, return_value=validate_time - datetime.timedelta(seconds=7)):
 | 
						|
            latest_result = self.api_post(email, "/api/v1/users/me/presence", {'status': status},
 | 
						|
                                          HTTP_USER_AGENT="ZulipIOS/1.0")
 | 
						|
        latest_result_dict = latest_result.json()
 | 
						|
        self.assertDictEqual(
 | 
						|
            latest_result_dict['presences'][email]['aggregated'],
 | 
						|
            {
 | 
						|
                'status': status,
 | 
						|
                'timestamp': datetime_to_timestamp(validate_time - datetime.timedelta(seconds=2)),
 | 
						|
                'client': 'ZulipAndroid'
 | 
						|
            }
 | 
						|
        )
 | 
						|
        result = self.client_get("/json/users/%s/presence" % (email,))
 | 
						|
        return result.json()
 | 
						|
 | 
						|
    def test_aggregated_info(self) -> None:
 | 
						|
        email = self.example_email("othello")
 | 
						|
        validate_time = timezone_now()
 | 
						|
        self._send_presence_for_aggregated_tests(str(self.example_email("othello")), 'active', validate_time)
 | 
						|
        with mock.patch('zerver.views.presence.timezone_now',
 | 
						|
                        return_value=validate_time - datetime.timedelta(seconds=1)):
 | 
						|
            result = self.api_post(email, "/api/v1/users/me/presence", {'status': 'active'},
 | 
						|
                                   HTTP_USER_AGENT="ZulipTestDev/1.0")
 | 
						|
        result_dict = result.json()
 | 
						|
        self.assertDictEqual(
 | 
						|
            result_dict['presences'][email]['aggregated'],
 | 
						|
            {
 | 
						|
                'status': 'active',
 | 
						|
                'timestamp': datetime_to_timestamp(validate_time - datetime.timedelta(seconds=1)),
 | 
						|
                'client': 'ZulipTestDev'
 | 
						|
            }
 | 
						|
        )
 | 
						|
 | 
						|
    def test_aggregated_presense_active(self) -> None:
 | 
						|
        validate_time = timezone_now()
 | 
						|
        result_dict = self._send_presence_for_aggregated_tests(str(self.example_email("othello")), 'active',
 | 
						|
                                                               validate_time)
 | 
						|
        self.assertDictEqual(
 | 
						|
            result_dict['presence']['aggregated'],
 | 
						|
            {
 | 
						|
                "status": "active",
 | 
						|
                "timestamp": datetime_to_timestamp(validate_time - datetime.timedelta(seconds=2))
 | 
						|
            }
 | 
						|
        )
 | 
						|
 | 
						|
    def test_aggregated_presense_idle(self) -> None:
 | 
						|
        validate_time = timezone_now()
 | 
						|
        result_dict = self._send_presence_for_aggregated_tests(str(self.example_email("othello")), 'idle',
 | 
						|
                                                               validate_time)
 | 
						|
        self.assertDictEqual(
 | 
						|
            result_dict['presence']['aggregated'],
 | 
						|
            {
 | 
						|
                "status": "idle",
 | 
						|
                "timestamp": datetime_to_timestamp(validate_time - datetime.timedelta(seconds=2))
 | 
						|
            }
 | 
						|
        )
 | 
						|
 | 
						|
    def test_aggregated_presense_mixed(self) -> None:
 | 
						|
        email = self.example_email("othello")
 | 
						|
        self.login(email)
 | 
						|
        validate_time = timezone_now()
 | 
						|
        with mock.patch('zerver.views.presence.timezone_now',
 | 
						|
                        return_value=validate_time - datetime.timedelta(seconds=3)):
 | 
						|
            self.api_post(email, "/api/v1/users/me/presence", {'status': 'active'},
 | 
						|
                          HTTP_USER_AGENT="ZulipTestDev/1.0")
 | 
						|
        result_dict = self._send_presence_for_aggregated_tests(str(email), 'idle', validate_time)
 | 
						|
        self.assertDictEqual(
 | 
						|
            result_dict['presence']['aggregated'],
 | 
						|
            {
 | 
						|
                "status": "idle",
 | 
						|
                "timestamp": datetime_to_timestamp(validate_time - datetime.timedelta(seconds=2))
 | 
						|
            }
 | 
						|
        )
 | 
						|
 | 
						|
    def test_aggregated_presense_offline(self) -> None:
 | 
						|
        email = self.example_email("othello")
 | 
						|
        self.login(email)
 | 
						|
        validate_time = timezone_now()
 | 
						|
        with self.settings(OFFLINE_THRESHOLD_SECS=1):
 | 
						|
            result_dict = self._send_presence_for_aggregated_tests(str(email), 'idle', validate_time)
 | 
						|
        self.assertDictEqual(
 | 
						|
            result_dict['presence']['aggregated'],
 | 
						|
            {
 | 
						|
                "status": "offline",
 | 
						|
                "timestamp": datetime_to_timestamp(validate_time - datetime.timedelta(seconds=2))
 | 
						|
            }
 | 
						|
        )
 |