mirror of
				https://github.com/zulip/zulip.git
				synced 2025-10-30 19:43:47 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			176 lines
		
	
	
		
			6.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			176 lines
		
	
	
		
			6.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| from collections.abc import Callable
 | |
| from datetime import timedelta
 | |
| from typing import Any
 | |
| 
 | |
| import time_machine
 | |
| from django.utils.timezone import now as timezone_now
 | |
| from typing_extensions import override
 | |
| 
 | |
| from zerver.actions.realm_settings import do_set_realm_property
 | |
| from zerver.actions.users import change_user_is_active
 | |
| from zerver.lib.sessions import (
 | |
|     delete_all_deactivated_user_sessions,
 | |
|     delete_all_user_sessions,
 | |
|     delete_realm_user_sessions,
 | |
|     delete_session,
 | |
|     delete_user_sessions,
 | |
|     get_expirable_session_var,
 | |
|     set_expirable_session_var,
 | |
|     user_sessions,
 | |
| )
 | |
| from zerver.lib.test_classes import ZulipTestCase
 | |
| from zerver.models import Realm, UserProfile
 | |
| from zerver.models.realms import get_realm
 | |
| 
 | |
| 
 | |
| class TestSessions(ZulipTestCase):
 | |
|     def do_test_session(
 | |
|         self, user: UserProfile, action: Callable[[], Any], realm: Realm, expected_result: bool
 | |
|     ) -> None:
 | |
|         self.login_user(user)
 | |
|         self.assertIn("_auth_user_id", self.client.session)
 | |
|         action()
 | |
|         if expected_result:
 | |
|             result = self.client_get("/", subdomain=realm.subdomain)
 | |
|             self.assertEqual(200, result.status_code)
 | |
|             self.assertTrue('is_spectator":true' in str(result.content))
 | |
|         else:
 | |
|             self.assertIn("_auth_user_id", self.client.session)
 | |
| 
 | |
|     def test_delete_session(self) -> None:
 | |
|         user_profile = self.example_user("hamlet")
 | |
|         self.login_user(user_profile)
 | |
|         self.assertIn("_auth_user_id", self.client.session)
 | |
|         for session in user_sessions(user_profile):
 | |
|             delete_session(session)
 | |
|         result = self.client_get("/")
 | |
|         self.assertEqual(result.status_code, 200)
 | |
|         self.assertTrue('is_spectator":true' in str(result.content))
 | |
| 
 | |
|     def test_delete_user_sessions(self) -> None:
 | |
|         user_profile = self.example_user("hamlet")
 | |
|         self.do_test_session(
 | |
|             user_profile, lambda: delete_user_sessions(user_profile), get_realm("zulip"), True
 | |
|         )
 | |
|         self.do_test_session(
 | |
|             self.example_user("othello"),
 | |
|             lambda: delete_user_sessions(user_profile),
 | |
|             get_realm("zulip"),
 | |
|             False,
 | |
|         )
 | |
| 
 | |
|     def test_delete_realm_user_sessions(self) -> None:
 | |
|         realm = get_realm("zulip")
 | |
|         self.do_test_session(
 | |
|             self.example_user("hamlet"),
 | |
|             lambda: delete_realm_user_sessions(realm),
 | |
|             get_realm("zulip"),
 | |
|             True,
 | |
|         )
 | |
|         self.do_test_session(
 | |
|             self.mit_user("sipbtest"),
 | |
|             lambda: delete_realm_user_sessions(realm),
 | |
|             get_realm("zephyr"),
 | |
|             False,
 | |
|         )
 | |
| 
 | |
|     def test_delete_all_user_sessions(self) -> None:
 | |
|         self.do_test_session(
 | |
|             self.example_user("hamlet"),
 | |
|             delete_all_user_sessions,
 | |
|             get_realm("zulip"),
 | |
|             True,
 | |
|         )
 | |
| 
 | |
|         lear_realm = get_realm("lear")
 | |
|         do_set_realm_property(lear_realm, "enable_spectator_access", True, acting_user=None)
 | |
|         self.make_stream(
 | |
|             "web_public_stream",
 | |
|             realm=lear_realm,
 | |
|             is_web_public=True,
 | |
|         )
 | |
|         self.do_test_session(
 | |
|             self.lear_user("cordelia"),
 | |
|             delete_all_user_sessions,
 | |
|             lear_realm,
 | |
|             True,
 | |
|         )
 | |
| 
 | |
|     def test_delete_all_deactivated_user_sessions(self) -> None:
 | |
|         # Test that no exception is thrown with a logged-out session
 | |
|         self.login("othello")
 | |
|         self.assertIn("_auth_user_id", self.client.session)
 | |
|         self.client_post("/accounts/logout/")
 | |
|         delete_all_deactivated_user_sessions()
 | |
|         result = self.client_get("/")
 | |
|         self.assertEqual(result.status_code, 200)
 | |
|         self.assertTrue('is_spectator":true' in str(result.content))
 | |
| 
 | |
|         # Test nothing happens to an active user's session
 | |
|         self.login("othello")
 | |
|         self.assertIn("_auth_user_id", self.client.session)
 | |
|         delete_all_deactivated_user_sessions()
 | |
|         self.assertIn("_auth_user_id", self.client.session)
 | |
| 
 | |
|         # Test that a deactivated session gets logged out
 | |
|         user_profile_3 = self.example_user("cordelia")
 | |
|         self.login_user(user_profile_3)
 | |
|         self.assertIn("_auth_user_id", self.client.session)
 | |
|         change_user_is_active(user_profile_3, False)
 | |
|         with self.assertLogs(level="INFO") as info_logs:
 | |
|             delete_all_deactivated_user_sessions()
 | |
|         self.assertEqual(
 | |
|             info_logs.output,
 | |
|             [f"INFO:root:Deactivating session for deactivated user {user_profile_3.id}"],
 | |
|         )
 | |
|         result = self.client_get("/")
 | |
|         self.assertEqual(result.status_code, 200)
 | |
|         self.assertTrue('is_spectator":true' in str(result.content))
 | |
| 
 | |
| 
 | |
| class TestExpirableSessionVars(ZulipTestCase):
 | |
|     @override
 | |
|     def setUp(self) -> None:
 | |
|         self.session = self.client.session
 | |
|         super().setUp()
 | |
| 
 | |
|     def test_set_and_get_basic(self) -> None:
 | |
|         start_time = timezone_now()
 | |
|         with time_machine.travel(start_time, tick=False):
 | |
|             set_expirable_session_var(
 | |
|                 self.session, "test_set_and_get_basic", "some_value", expiry_seconds=10
 | |
|             )
 | |
|             value = get_expirable_session_var(self.session, "test_set_and_get_basic")
 | |
|             self.assertEqual(value, "some_value")
 | |
|         with time_machine.travel((start_time + timedelta(seconds=11)), tick=False):
 | |
|             value = get_expirable_session_var(self.session, "test_set_and_get_basic")
 | |
|             self.assertEqual(value, None)
 | |
| 
 | |
|     def test_set_and_get_with_delete(self) -> None:
 | |
|         set_expirable_session_var(
 | |
|             self.session, "test_set_and_get_with_delete", "some_value", expiry_seconds=10
 | |
|         )
 | |
|         value = get_expirable_session_var(self.session, "test_set_and_get_with_delete", delete=True)
 | |
|         self.assertEqual(value, "some_value")
 | |
|         self.assertEqual(
 | |
|             get_expirable_session_var(self.session, "test_set_and_get_with_delete"), None
 | |
|         )
 | |
| 
 | |
|     def test_get_var_not_set(self) -> None:
 | |
|         value = get_expirable_session_var(
 | |
|             self.session, "test_get_var_not_set", default_value="default"
 | |
|         )
 | |
|         self.assertEqual(value, "default")
 | |
| 
 | |
|     def test_get_var_is_not_expirable(self) -> None:
 | |
|         self.session["test_get_var_is_not_expirable"] = 0
 | |
|         with self.assertLogs(level="WARNING") as m:
 | |
|             value = get_expirable_session_var(
 | |
|                 self.session, "test_get_var_is_not_expirable", default_value="default"
 | |
|             )
 | |
|             self.assertEqual(value, "default")
 | |
|             self.assertIn(
 | |
|                 "WARNING:root:get_expirable_session_var: error getting test_get_var_is_not_expirable",
 | |
|                 m.output[0],
 | |
|             )
 |