mirror of
https://github.com/zulip/zulip.git
synced 2025-10-24 08:33:43 +00:00
When naming things with units of time involved, it's always good to include units in the name, so that the meaning is clear.
1136 lines
44 KiB
Python
1136 lines
44 KiB
Python
import calendar
|
|
import urllib
|
|
from datetime import timedelta
|
|
from typing import Any
|
|
from unittest.mock import patch
|
|
|
|
import orjson
|
|
import pytz
|
|
from django.conf import settings
|
|
from django.http import HttpResponse
|
|
from django.utils.timezone import now as timezone_now
|
|
|
|
from corporate.models import Customer, CustomerPlan
|
|
from zerver.lib.actions import (
|
|
change_user_is_active,
|
|
do_change_logo_source,
|
|
do_change_plan_type,
|
|
do_create_user,
|
|
)
|
|
from zerver.lib.events import add_realm_logo_fields
|
|
from zerver.lib.home import LAST_SERVER_UPGRADE_TIME, get_furthest_read_time, is_outdated_server
|
|
from zerver.lib.soft_deactivation import do_soft_deactivate_users
|
|
from zerver.lib.test_classes import ZulipTestCase
|
|
from zerver.lib.test_helpers import get_user_messages, override_settings, queries_captured
|
|
from zerver.lib.users import compute_show_invites_and_add_streams
|
|
from zerver.models import (
|
|
DefaultStream,
|
|
Realm,
|
|
UserActivity,
|
|
UserProfile,
|
|
flush_per_request_caches,
|
|
get_realm,
|
|
get_stream,
|
|
get_system_bot,
|
|
get_user,
|
|
)
|
|
from zerver.views.home import compute_navbar_logo_url
|
|
from zerver.worker.queue_processors import UserActivityWorker
|
|
|
|
logger_string = "zulip.soft_deactivation"
|
|
|
|
|
|
class HomeTest(ZulipTestCase):
|
|
|
|
# Keep this list sorted!!!
|
|
expected_page_params_keys = [
|
|
"alert_words",
|
|
"available_notification_sounds",
|
|
"avatar_source",
|
|
"avatar_url",
|
|
"avatar_url_medium",
|
|
"bot_types",
|
|
"can_create_streams",
|
|
"can_invite_others_to_realm",
|
|
"can_subscribe_other_users",
|
|
"color_scheme",
|
|
"cross_realm_bots",
|
|
"custom_profile_field_types",
|
|
"custom_profile_fields",
|
|
"debug_mode",
|
|
"default_language",
|
|
"default_language_name",
|
|
"default_view",
|
|
"delivery_email",
|
|
"demote_inactive_streams",
|
|
"dense_mode",
|
|
"desktop_icon_count_display",
|
|
"development_environment",
|
|
"email",
|
|
"emojiset",
|
|
"emojiset_choices",
|
|
"enable_desktop_notifications",
|
|
"enable_digest_emails",
|
|
"enable_login_emails",
|
|
"enable_offline_email_notifications",
|
|
"enable_offline_push_notifications",
|
|
"enable_online_push_notifications",
|
|
"enable_sounds",
|
|
"enable_stream_audible_notifications",
|
|
"enable_stream_desktop_notifications",
|
|
"enable_stream_email_notifications",
|
|
"enable_stream_push_notifications",
|
|
"enter_sends",
|
|
"first_in_realm",
|
|
"fluid_layout_width",
|
|
"full_name",
|
|
"furthest_read_time",
|
|
"giphy_api_key",
|
|
"giphy_rating_options",
|
|
"has_mobile_devices",
|
|
"has_zoom_token",
|
|
"high_contrast_mode",
|
|
"hotspots",
|
|
"initial_servertime",
|
|
"insecure_desktop_app",
|
|
"is_admin",
|
|
"is_guest",
|
|
"is_owner",
|
|
"is_web_public_visitor",
|
|
"jitsi_server_url",
|
|
"language_list",
|
|
"language_list_dbl_col",
|
|
"last_event_id",
|
|
"left_side_userlist",
|
|
"login_page",
|
|
"max_avatar_file_size_mib",
|
|
"max_file_upload_size_mib",
|
|
"max_icon_file_size",
|
|
"max_logo_file_size",
|
|
"max_message_id",
|
|
"max_message_length",
|
|
"max_stream_description_length",
|
|
"max_stream_name_length",
|
|
"max_topic_length",
|
|
"message_content_in_email_notifications",
|
|
"muted_topics",
|
|
"muted_users",
|
|
"narrow",
|
|
"narrow_stream",
|
|
"needs_tutorial",
|
|
"never_subscribed",
|
|
"no_event_queue",
|
|
"notification_sound",
|
|
"password_min_guesses",
|
|
"password_min_length",
|
|
"pm_content_in_desktop_notifications",
|
|
"poll_timeout",
|
|
"presence_enabled",
|
|
"presences",
|
|
"prompt_for_invites",
|
|
"queue_id",
|
|
"realm_add_emoji_by_admins_only",
|
|
"realm_allow_community_topic_editing",
|
|
"realm_allow_edit_history",
|
|
"realm_allow_message_deleting",
|
|
"realm_allow_message_editing",
|
|
"realm_authentication_methods",
|
|
"realm_available_video_chat_providers",
|
|
"realm_avatar_changes_disabled",
|
|
"realm_bot_creation_policy",
|
|
"realm_bot_domain",
|
|
"realm_bots",
|
|
"realm_community_topic_editing_limit_seconds",
|
|
"realm_create_stream_policy",
|
|
"realm_default_code_block_language",
|
|
"realm_default_external_accounts",
|
|
"realm_default_language",
|
|
"realm_default_stream_groups",
|
|
"realm_default_streams",
|
|
"realm_default_twenty_four_hour_time",
|
|
"realm_description",
|
|
"realm_digest_emails_enabled",
|
|
"realm_digest_weekday",
|
|
"realm_disallow_disposable_email_addresses",
|
|
"realm_domains",
|
|
"realm_email_address_visibility",
|
|
"realm_email_auth_enabled",
|
|
"realm_email_changes_disabled",
|
|
"realm_emails_restricted_to_domains",
|
|
"realm_embedded_bots",
|
|
"realm_emoji",
|
|
"realm_filters",
|
|
"realm_giphy_rating",
|
|
"realm_icon_source",
|
|
"realm_icon_url",
|
|
"realm_incoming_webhook_bots",
|
|
"realm_inline_image_preview",
|
|
"realm_inline_url_embed_preview",
|
|
"realm_invite_required",
|
|
"realm_invite_to_realm_policy",
|
|
"realm_invite_to_stream_policy",
|
|
"realm_is_zephyr_mirror_realm",
|
|
"realm_linkifiers",
|
|
"realm_logo_source",
|
|
"realm_logo_url",
|
|
"realm_mandatory_topics",
|
|
"realm_message_content_allowed_in_email_notifications",
|
|
"realm_message_content_delete_limit_seconds",
|
|
"realm_message_content_edit_limit_seconds",
|
|
"realm_message_retention_days",
|
|
"realm_move_messages_between_streams_policy",
|
|
"realm_name",
|
|
"realm_name_changes_disabled",
|
|
"realm_name_in_notifications",
|
|
"realm_night_logo_source",
|
|
"realm_night_logo_url",
|
|
"realm_non_active_users",
|
|
"realm_notifications_stream_id",
|
|
"realm_password_auth_enabled",
|
|
"realm_plan_type",
|
|
"realm_playgrounds",
|
|
"realm_presence_disabled",
|
|
"realm_private_message_policy",
|
|
"realm_push_notifications_enabled",
|
|
"realm_send_welcome_emails",
|
|
"realm_signup_notifications_stream_id",
|
|
"realm_upload_quota",
|
|
"realm_uri",
|
|
"realm_user_group_edit_policy",
|
|
"realm_user_groups",
|
|
"realm_users",
|
|
"realm_video_chat_provider",
|
|
"realm_waiting_period_threshold",
|
|
"realm_wildcard_mention_policy",
|
|
"recent_private_conversations",
|
|
"request_language",
|
|
"root_domain_uri",
|
|
"save_stacktraces",
|
|
"search_pills_enabled",
|
|
"server_avatar_changes_disabled",
|
|
"server_generation",
|
|
"server_inline_image_preview",
|
|
"server_inline_url_embed_preview",
|
|
"server_name_changes_disabled",
|
|
"server_needs_upgrade",
|
|
"settings_send_digest_emails",
|
|
"starred_message_counts",
|
|
"starred_messages",
|
|
"stop_words",
|
|
"subscriptions",
|
|
"test_suite",
|
|
"timezone",
|
|
"translate_emoticons",
|
|
"translation_data",
|
|
"twenty_four_hour_time",
|
|
"two_fa_enabled",
|
|
"two_fa_enabled_user",
|
|
"unread_msgs",
|
|
"unsubscribed",
|
|
"upgrade_text_for_wide_organization_logo",
|
|
"user_id",
|
|
"user_status",
|
|
"warn_no_email",
|
|
"webpack_public_path",
|
|
"wildcard_mentions_notify",
|
|
"zulip_feature_level",
|
|
"zulip_plan_is_not_limited",
|
|
"zulip_version",
|
|
]
|
|
|
|
def test_home(self) -> None:
|
|
# Keep this list sorted!!!
|
|
html_bits = [
|
|
"start the conversation",
|
|
"Exclude messages with topic",
|
|
"Keyboard shortcuts",
|
|
"Loading...",
|
|
"Manage streams",
|
|
"Narrow to topic",
|
|
"Next message",
|
|
"Filter streams",
|
|
# Verify that the app styles get included
|
|
"app-stubentry.js",
|
|
"data-params",
|
|
]
|
|
|
|
self.login("hamlet")
|
|
|
|
# Create bot for realm_bots testing. Must be done before fetching home_page.
|
|
bot_info = {
|
|
"full_name": "The Bot of Hamlet",
|
|
"short_name": "hambot",
|
|
}
|
|
self.client_post("/json/bots", bot_info)
|
|
|
|
# Verify succeeds once logged-in
|
|
flush_per_request_caches()
|
|
with queries_captured() as queries:
|
|
with patch("zerver.lib.cache.cache_set") as cache_mock:
|
|
result = self._get_home_page(stream="Denmark")
|
|
self.check_rendered_logged_in_app(result)
|
|
self.assertEqual(
|
|
set(result["Cache-Control"].split(", ")), {"must-revalidate", "no-store", "no-cache"}
|
|
)
|
|
|
|
self.assert_length(queries, 41)
|
|
self.assert_length(cache_mock.call_args_list, 5)
|
|
|
|
html = result.content.decode("utf-8")
|
|
|
|
for html_bit in html_bits:
|
|
if html_bit not in html:
|
|
raise AssertionError(f"{html_bit} not in result")
|
|
|
|
page_params = self._get_page_params(result)
|
|
|
|
actual_keys = sorted(str(k) for k in page_params.keys())
|
|
|
|
self.assertEqual(actual_keys, self.expected_page_params_keys)
|
|
|
|
# TODO: Inspect the page_params data further.
|
|
# print(orjson.dumps(page_params, option=orjson.OPT_INDENT_2).decode())
|
|
realm_bots_expected_keys = [
|
|
"api_key",
|
|
"avatar_url",
|
|
"bot_type",
|
|
"default_all_public_streams",
|
|
"default_events_register_stream",
|
|
"default_sending_stream",
|
|
"email",
|
|
"full_name",
|
|
"is_active",
|
|
"owner_id",
|
|
"services",
|
|
"user_id",
|
|
]
|
|
|
|
realm_bots_actual_keys = sorted(str(key) for key in page_params["realm_bots"][0].keys())
|
|
self.assertEqual(realm_bots_actual_keys, realm_bots_expected_keys)
|
|
|
|
def test_logged_out_home(self) -> None:
|
|
result = self.client_get("/")
|
|
self.assertEqual(result.status_code, 200)
|
|
|
|
page_params = self._get_page_params(result)
|
|
actual_keys = sorted(str(k) for k in page_params.keys())
|
|
removed_keys = [
|
|
"last_event_id",
|
|
"narrow",
|
|
"narrow_stream",
|
|
]
|
|
expected_keys = [i for i in self.expected_page_params_keys if i not in removed_keys]
|
|
self.assertEqual(actual_keys, expected_keys)
|
|
|
|
def test_home_under_2fa_without_otp_device(self) -> None:
|
|
with self.settings(TWO_FACTOR_AUTHENTICATION_ENABLED=True):
|
|
self.login("iago")
|
|
result = self._get_home_page()
|
|
# Should be successful because otp device is not configured.
|
|
self.check_rendered_logged_in_app(result)
|
|
|
|
def test_home_under_2fa_with_otp_device(self) -> None:
|
|
with self.settings(TWO_FACTOR_AUTHENTICATION_ENABLED=True):
|
|
user_profile = self.example_user("iago")
|
|
self.create_default_device(user_profile)
|
|
self.login_user(user_profile)
|
|
result = self._get_home_page()
|
|
# User should not log in because otp device is configured but
|
|
# 2fa login function was not called.
|
|
self.assertEqual(result.status_code, 302)
|
|
|
|
self.login_2fa(user_profile)
|
|
result = self._get_home_page()
|
|
# Should be successful after calling 2fa login function.
|
|
self.check_rendered_logged_in_app(result)
|
|
|
|
def test_num_queries_for_realm_admin(self) -> None:
|
|
# Verify number of queries for Realm admin isn't much higher than for normal users.
|
|
self.login("iago")
|
|
flush_per_request_caches()
|
|
with queries_captured() as queries:
|
|
with patch("zerver.lib.cache.cache_set") as cache_mock:
|
|
result = self._get_home_page()
|
|
self.check_rendered_logged_in_app(result)
|
|
self.assert_length(cache_mock.call_args_list, 6)
|
|
self.assert_length(queries, 38)
|
|
|
|
def test_num_queries_with_streams(self) -> None:
|
|
main_user = self.example_user("hamlet")
|
|
other_user = self.example_user("cordelia")
|
|
|
|
realm_id = main_user.realm_id
|
|
|
|
self.login_user(main_user)
|
|
|
|
# Try to make page-load do extra work for various subscribed
|
|
# streams.
|
|
for i in range(10):
|
|
stream_name = "test_stream_" + str(i)
|
|
stream = self.make_stream(stream_name)
|
|
DefaultStream.objects.create(
|
|
realm_id=realm_id,
|
|
stream_id=stream.id,
|
|
)
|
|
for user in [main_user, other_user]:
|
|
self.subscribe(user, stream_name)
|
|
|
|
# Simulate hitting the page the first time to avoid some noise
|
|
# related to initial logins.
|
|
self._get_home_page()
|
|
|
|
# Then for the second page load, measure the number of queries.
|
|
flush_per_request_caches()
|
|
with queries_captured() as queries2:
|
|
result = self._get_home_page()
|
|
|
|
self.assert_length(queries2, 36)
|
|
|
|
# Do a sanity check that our new streams were in the payload.
|
|
html = result.content.decode("utf-8")
|
|
self.assertIn("test_stream_7", html)
|
|
|
|
def _get_home_page(self, **kwargs: Any) -> HttpResponse:
|
|
with patch("zerver.lib.events.request_event_queue", return_value=42), patch(
|
|
"zerver.lib.events.get_user_events", return_value=[]
|
|
):
|
|
result = self.client_get("/", dict(**kwargs))
|
|
return result
|
|
|
|
def _sanity_check(self, result: HttpResponse) -> None:
|
|
"""
|
|
Use this for tests that are geared toward specific edge cases, but
|
|
which still want the home page to load properly.
|
|
"""
|
|
html = result.content.decode("utf-8")
|
|
if "start a conversation" not in html:
|
|
raise AssertionError("Home page probably did not load.")
|
|
|
|
def test_terms_of_service(self) -> None:
|
|
user = self.example_user("hamlet")
|
|
self.login_user(user)
|
|
|
|
for user_tos_version in [None, "1.1", "2.0.3.4"]:
|
|
user.tos_version = user_tos_version
|
|
user.save()
|
|
|
|
with self.settings(TERMS_OF_SERVICE="whatever"), self.settings(TOS_VERSION="99.99"):
|
|
|
|
result = self.client_get("/", dict(stream="Denmark"))
|
|
|
|
html = result.content.decode("utf-8")
|
|
self.assertIn("Accept the new Terms of Service", html)
|
|
|
|
def test_banned_desktop_app_versions(self) -> None:
|
|
user = self.example_user("hamlet")
|
|
self.login_user(user)
|
|
|
|
result = self.client_get("/", HTTP_USER_AGENT="ZulipElectron/2.3.82")
|
|
html = result.content.decode("utf-8")
|
|
self.assertIn("You are using old version of the Zulip desktop", html)
|
|
|
|
def test_unsupported_browser(self) -> None:
|
|
user = self.example_user("hamlet")
|
|
self.login_user(user)
|
|
|
|
# currently we don't support IE, so some of IE's user agents are added.
|
|
unsupported_user_agents = [
|
|
"Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2)",
|
|
"Mozilla/5.0 (Windows NT 10.0; Trident/7.0; rv:11.0) like Gecko",
|
|
"Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; Trident/4.0)",
|
|
]
|
|
for user_agent in unsupported_user_agents:
|
|
result = self.client_get("/", HTTP_USER_AGENT=user_agent)
|
|
html = result.content.decode("utf-8")
|
|
self.assertIn("Internet Explorer is not supported by Zulip.", html)
|
|
|
|
def test_terms_of_service_first_time_template(self) -> None:
|
|
user = self.example_user("hamlet")
|
|
self.login_user(user)
|
|
|
|
user.tos_version = None
|
|
user.save()
|
|
|
|
with self.settings(FIRST_TIME_TOS_TEMPLATE="hello.html"), self.settings(
|
|
TOS_VERSION="99.99"
|
|
):
|
|
result = self.client_post("/accounts/accept_terms/")
|
|
self.assertEqual(result.status_code, 200)
|
|
self.assert_in_response("I agree to the", result)
|
|
self.assert_in_response("Chat for distributed teams", result)
|
|
|
|
def test_accept_terms_of_service(self) -> None:
|
|
self.login("hamlet")
|
|
|
|
result = self.client_post("/accounts/accept_terms/")
|
|
self.assertEqual(result.status_code, 200)
|
|
self.assert_in_response("I agree to the", result)
|
|
|
|
result = self.client_post("/accounts/accept_terms/", {"terms": True})
|
|
self.assertEqual(result.status_code, 302)
|
|
self.assertEqual(result["Location"], "/")
|
|
|
|
def test_bad_narrow(self) -> None:
|
|
self.login("hamlet")
|
|
with self.assertLogs(level="WARNING") as m:
|
|
result = self._get_home_page(stream="Invalid Stream")
|
|
self.assertEqual(m.output, ["WARNING:root:Invalid narrow requested, ignoring"])
|
|
self._sanity_check(result)
|
|
|
|
def test_topic_narrow(self) -> None:
|
|
self.login("hamlet")
|
|
result = self._get_home_page(stream="Denmark", topic="lunch")
|
|
self._sanity_check(result)
|
|
html = result.content.decode("utf-8")
|
|
self.assertIn("lunch", html)
|
|
self.assertEqual(
|
|
set(result["Cache-Control"].split(", ")), {"must-revalidate", "no-store", "no-cache"}
|
|
)
|
|
|
|
def test_notifications_stream(self) -> None:
|
|
realm = get_realm("zulip")
|
|
realm.notifications_stream_id = get_stream("Denmark", realm).id
|
|
realm.save()
|
|
self.login("hamlet")
|
|
result = self._get_home_page()
|
|
page_params = self._get_page_params(result)
|
|
self.assertEqual(
|
|
page_params["realm_notifications_stream_id"], get_stream("Denmark", realm).id
|
|
)
|
|
|
|
def create_bot(self, owner: UserProfile, bot_email: str, bot_name: str) -> UserProfile:
|
|
user = do_create_user(
|
|
email=bot_email,
|
|
password="123",
|
|
realm=owner.realm,
|
|
full_name=bot_name,
|
|
bot_type=UserProfile.DEFAULT_BOT,
|
|
bot_owner=owner,
|
|
acting_user=None,
|
|
)
|
|
return user
|
|
|
|
def create_non_active_user(self, realm: Realm, email: str, name: str) -> UserProfile:
|
|
user = do_create_user(
|
|
email=email, password="123", realm=realm, full_name=name, acting_user=None
|
|
)
|
|
|
|
# Doing a full-stack deactivation would be expensive here,
|
|
# and we really only need to flip the flag to get a valid
|
|
# test.
|
|
change_user_is_active(user, False)
|
|
return user
|
|
|
|
def test_signup_notifications_stream(self) -> None:
|
|
realm = get_realm("zulip")
|
|
realm.signup_notifications_stream = get_stream("Denmark", realm)
|
|
realm.save()
|
|
self.login("hamlet")
|
|
result = self._get_home_page()
|
|
page_params = self._get_page_params(result)
|
|
self.assertEqual(
|
|
page_params["realm_signup_notifications_stream_id"], get_stream("Denmark", realm).id
|
|
)
|
|
|
|
def test_people(self) -> None:
|
|
hamlet = self.example_user("hamlet")
|
|
realm = get_realm("zulip")
|
|
self.login_user(hamlet)
|
|
|
|
bots = {}
|
|
for i in range(3):
|
|
bots[i] = self.create_bot(
|
|
owner=hamlet,
|
|
bot_email=f"bot-{i}@zulip.com",
|
|
bot_name=f"Bot {i}",
|
|
)
|
|
|
|
for i in range(3):
|
|
defunct_user = self.create_non_active_user(
|
|
realm=realm,
|
|
email=f"defunct-{i}@zulip.com",
|
|
name=f"Defunct User {i}",
|
|
)
|
|
|
|
result = self._get_home_page()
|
|
page_params = self._get_page_params(result)
|
|
|
|
"""
|
|
We send three lists of users. The first two below are disjoint
|
|
lists of users, and the records we send for them have identical
|
|
structure.
|
|
|
|
The realm_bots bucket is somewhat redundant, since all bots will
|
|
be in one of the first two buckets. They do include fields, however,
|
|
that normal users don't care about, such as default_sending_stream.
|
|
"""
|
|
|
|
buckets = [
|
|
"realm_users",
|
|
"realm_non_active_users",
|
|
"realm_bots",
|
|
]
|
|
|
|
for field in buckets:
|
|
users = page_params[field]
|
|
self.assertTrue(len(users) >= 3, field)
|
|
for rec in users:
|
|
self.assertEqual(rec["user_id"], get_user(rec["email"], realm).id)
|
|
if field == "realm_bots":
|
|
self.assertNotIn("is_bot", rec)
|
|
self.assertIn("is_active", rec)
|
|
self.assertIn("owner_id", rec)
|
|
else:
|
|
self.assertIn("is_bot", rec)
|
|
self.assertNotIn("is_active", rec)
|
|
|
|
active_ids = {p["user_id"] for p in page_params["realm_users"]}
|
|
non_active_ids = {p["user_id"] for p in page_params["realm_non_active_users"]}
|
|
bot_ids = {p["user_id"] for p in page_params["realm_bots"]}
|
|
|
|
self.assertIn(hamlet.id, active_ids)
|
|
self.assertIn(defunct_user.id, non_active_ids)
|
|
|
|
# Bots can show up in multiple buckets.
|
|
self.assertIn(bots[2].id, bot_ids)
|
|
self.assertIn(bots[2].id, active_ids)
|
|
|
|
# Make sure nobody got mis-bucketed.
|
|
self.assertNotIn(hamlet.id, non_active_ids)
|
|
self.assertNotIn(defunct_user.id, active_ids)
|
|
|
|
cross_bots = page_params["cross_realm_bots"]
|
|
self.assertEqual(len(cross_bots), 3)
|
|
cross_bots.sort(key=lambda d: d["email"])
|
|
for cross_bot in cross_bots:
|
|
# These are either nondeterministic or boring
|
|
del cross_bot["timezone"]
|
|
del cross_bot["avatar_url"]
|
|
del cross_bot["date_joined"]
|
|
|
|
notification_bot = self.notification_bot()
|
|
email_gateway_bot = get_system_bot(settings.EMAIL_GATEWAY_BOT)
|
|
welcome_bot = get_system_bot(settings.WELCOME_BOT)
|
|
|
|
by_email = lambda d: d["email"]
|
|
|
|
self.assertEqual(
|
|
sorted(cross_bots, key=by_email),
|
|
sorted(
|
|
[
|
|
dict(
|
|
avatar_version=email_gateway_bot.avatar_version,
|
|
bot_owner_id=None,
|
|
bot_type=1,
|
|
email=email_gateway_bot.email,
|
|
user_id=email_gateway_bot.id,
|
|
full_name=email_gateway_bot.full_name,
|
|
is_active=True,
|
|
is_bot=True,
|
|
is_admin=False,
|
|
is_owner=False,
|
|
is_cross_realm_bot=True,
|
|
is_guest=False,
|
|
),
|
|
dict(
|
|
avatar_version=notification_bot.avatar_version,
|
|
bot_owner_id=None,
|
|
bot_type=1,
|
|
email=notification_bot.email,
|
|
user_id=notification_bot.id,
|
|
full_name=notification_bot.full_name,
|
|
is_active=True,
|
|
is_bot=True,
|
|
is_admin=False,
|
|
is_owner=False,
|
|
is_cross_realm_bot=True,
|
|
is_guest=False,
|
|
),
|
|
dict(
|
|
avatar_version=welcome_bot.avatar_version,
|
|
bot_owner_id=None,
|
|
bot_type=1,
|
|
email=welcome_bot.email,
|
|
user_id=welcome_bot.id,
|
|
full_name=welcome_bot.full_name,
|
|
is_active=True,
|
|
is_bot=True,
|
|
is_admin=False,
|
|
is_owner=False,
|
|
is_cross_realm_bot=True,
|
|
is_guest=False,
|
|
),
|
|
],
|
|
key=by_email,
|
|
),
|
|
)
|
|
|
|
def test_new_stream(self) -> None:
|
|
user_profile = self.example_user("hamlet")
|
|
stream_name = "New stream"
|
|
self.subscribe(user_profile, stream_name)
|
|
self.login_user(user_profile)
|
|
result = self._get_home_page(stream=stream_name)
|
|
page_params = self._get_page_params(result)
|
|
self.assertEqual(page_params["narrow_stream"], stream_name)
|
|
self.assertEqual(page_params["narrow"], [dict(operator="stream", operand=stream_name)])
|
|
self.assertEqual(page_params["max_message_id"], -1)
|
|
|
|
def test_invites_by_admins_only(self) -> None:
|
|
user_profile = self.example_user("hamlet")
|
|
|
|
realm = user_profile.realm
|
|
realm.invite_to_realm_policy = Realm.POLICY_ADMINS_ONLY
|
|
realm.save()
|
|
|
|
self.login_user(user_profile)
|
|
self.assertFalse(user_profile.is_realm_admin)
|
|
result = self._get_home_page()
|
|
html = result.content.decode("utf-8")
|
|
self.assertNotIn("Invite more users", html)
|
|
|
|
user_profile.role = UserProfile.ROLE_REALM_ADMINISTRATOR
|
|
user_profile.save()
|
|
result = self._get_home_page()
|
|
html = result.content.decode("utf-8")
|
|
self.assertIn("Invite more users", html)
|
|
|
|
def test_show_invites_for_guest_users(self) -> None:
|
|
user_profile = self.example_user("polonius")
|
|
|
|
realm = user_profile.realm
|
|
realm.invite_to_realm_policy = Realm.POLICY_MEMBERS_ONLY
|
|
realm.save()
|
|
|
|
self.login_user(user_profile)
|
|
self.assertFalse(user_profile.is_realm_admin)
|
|
self.assertEqual(get_realm("zulip").invite_to_realm_policy, Realm.POLICY_MEMBERS_ONLY)
|
|
result = self._get_home_page()
|
|
html = result.content.decode("utf-8")
|
|
self.assertNotIn("Invite more users", html)
|
|
|
|
def test_show_billing(self) -> None:
|
|
customer = Customer.objects.create(realm=get_realm("zulip"), stripe_customer_id="cus_id")
|
|
user = self.example_user("desdemona")
|
|
|
|
# realm owner, but no CustomerPlan -> no billing link
|
|
user.role = UserProfile.ROLE_REALM_OWNER
|
|
user.save(update_fields=["role"])
|
|
self.login_user(user)
|
|
result_html = self._get_home_page().content.decode("utf-8")
|
|
self.assertNotIn("Billing", result_html)
|
|
|
|
# realm owner, with inactive CustomerPlan -> show billing link
|
|
CustomerPlan.objects.create(
|
|
customer=customer,
|
|
billing_cycle_anchor=timezone_now(),
|
|
billing_schedule=CustomerPlan.ANNUAL,
|
|
next_invoice_date=timezone_now(),
|
|
tier=CustomerPlan.STANDARD,
|
|
status=CustomerPlan.ENDED,
|
|
)
|
|
result_html = self._get_home_page().content.decode("utf-8")
|
|
self.assertIn("Billing", result_html)
|
|
|
|
# realm admin, with CustomerPlan -> no billing link
|
|
user.role = UserProfile.ROLE_REALM_ADMINISTRATOR
|
|
user.save(update_fields=["role"])
|
|
result_html = self._get_home_page().content.decode("utf-8")
|
|
self.assertNotIn("Billing", result_html)
|
|
|
|
# billing admin, with CustomerPlan -> show billing link
|
|
user.role = UserProfile.ROLE_MEMBER
|
|
user.is_billing_admin = True
|
|
user.save(update_fields=["role", "is_billing_admin"])
|
|
result_html = self._get_home_page().content.decode("utf-8")
|
|
self.assertIn("Billing", result_html)
|
|
|
|
# member, with CustomerPlan -> no billing link
|
|
user.is_billing_admin = False
|
|
user.save(update_fields=["is_billing_admin"])
|
|
result_html = self._get_home_page().content.decode("utf-8")
|
|
self.assertNotIn("Billing", result_html)
|
|
|
|
# guest, with CustomerPlan -> no billing link
|
|
user.role = UserProfile.ROLE_GUEST
|
|
user.save(update_fields=["role"])
|
|
result_html = self._get_home_page().content.decode("utf-8")
|
|
self.assertNotIn("Billing", result_html)
|
|
|
|
# billing admin, but no CustomerPlan -> no billing link
|
|
user.role = UserProfile.ROLE_MEMBER
|
|
user.is_billing_admin = True
|
|
user.save(update_fields=["role", "is_billing_admin"])
|
|
CustomerPlan.objects.all().delete()
|
|
result_html = self._get_home_page().content.decode("utf-8")
|
|
self.assertNotIn("Billing", result_html)
|
|
|
|
# billing admin, with sponsorship pending -> show billing link
|
|
customer.sponsorship_pending = True
|
|
customer.save(update_fields=["sponsorship_pending"])
|
|
result_html = self._get_home_page().content.decode("utf-8")
|
|
self.assertIn("Billing", result_html)
|
|
|
|
# billing admin, no customer object -> make sure it doesn't crash
|
|
customer.delete()
|
|
result = self._get_home_page()
|
|
self.check_rendered_logged_in_app(result)
|
|
|
|
def test_show_plans(self) -> None:
|
|
realm = get_realm("zulip")
|
|
|
|
# Don't show plans to guest users
|
|
self.login("polonius")
|
|
do_change_plan_type(realm, Realm.LIMITED, acting_user=None)
|
|
result_html = self._get_home_page().content.decode("utf-8")
|
|
self.assertNotIn("Plans", result_html)
|
|
|
|
# Show plans link to all other users if plan_type is LIMITED
|
|
self.login("hamlet")
|
|
result_html = self._get_home_page().content.decode("utf-8")
|
|
self.assertIn("Plans", result_html)
|
|
|
|
# Show plans link to no one, including admins, if SELF_HOSTED or STANDARD
|
|
do_change_plan_type(realm, Realm.SELF_HOSTED, acting_user=None)
|
|
result_html = self._get_home_page().content.decode("utf-8")
|
|
self.assertNotIn("Plans", result_html)
|
|
|
|
do_change_plan_type(realm, Realm.STANDARD, acting_user=None)
|
|
result_html = self._get_home_page().content.decode("utf-8")
|
|
self.assertNotIn("Plans", result_html)
|
|
|
|
def test_desktop_home(self) -> None:
|
|
self.login("hamlet")
|
|
result = self.client_get("/desktop_home")
|
|
self.assertEqual(result.status_code, 301)
|
|
self.assertTrue(result["Location"].endswith("/desktop_home/"))
|
|
result = self.client_get("/desktop_home/")
|
|
self.assertEqual(result.status_code, 302)
|
|
path = urllib.parse.urlparse(result["Location"]).path
|
|
self.assertEqual(path, "/")
|
|
|
|
def test_compute_navbar_logo_url(self) -> None:
|
|
user_profile = self.example_user("hamlet")
|
|
|
|
page_params = {"color_scheme": user_profile.COLOR_SCHEME_NIGHT}
|
|
add_realm_logo_fields(page_params, user_profile.realm)
|
|
self.assertEqual(
|
|
compute_navbar_logo_url(page_params), "/static/images/logo/zulip-org-logo.svg?version=0"
|
|
)
|
|
|
|
page_params = {"color_scheme": user_profile.COLOR_SCHEME_LIGHT}
|
|
add_realm_logo_fields(page_params, user_profile.realm)
|
|
self.assertEqual(
|
|
compute_navbar_logo_url(page_params), "/static/images/logo/zulip-org-logo.svg?version=0"
|
|
)
|
|
|
|
do_change_logo_source(
|
|
user_profile.realm, Realm.LOGO_UPLOADED, night=False, acting_user=user_profile
|
|
)
|
|
page_params = {"color_scheme": user_profile.COLOR_SCHEME_NIGHT}
|
|
add_realm_logo_fields(page_params, user_profile.realm)
|
|
self.assertEqual(
|
|
compute_navbar_logo_url(page_params),
|
|
f"/user_avatars/{user_profile.realm_id}/realm/logo.png?version=2",
|
|
)
|
|
|
|
page_params = {"color_scheme": user_profile.COLOR_SCHEME_LIGHT}
|
|
add_realm_logo_fields(page_params, user_profile.realm)
|
|
self.assertEqual(
|
|
compute_navbar_logo_url(page_params),
|
|
f"/user_avatars/{user_profile.realm_id}/realm/logo.png?version=2",
|
|
)
|
|
|
|
do_change_logo_source(
|
|
user_profile.realm, Realm.LOGO_UPLOADED, night=True, acting_user=user_profile
|
|
)
|
|
page_params = {"color_scheme": user_profile.COLOR_SCHEME_NIGHT}
|
|
add_realm_logo_fields(page_params, user_profile.realm)
|
|
self.assertEqual(
|
|
compute_navbar_logo_url(page_params),
|
|
f"/user_avatars/{user_profile.realm_id}/realm/night_logo.png?version=2",
|
|
)
|
|
|
|
page_params = {"color_scheme": user_profile.COLOR_SCHEME_LIGHT}
|
|
add_realm_logo_fields(page_params, user_profile.realm)
|
|
self.assertEqual(
|
|
compute_navbar_logo_url(page_params),
|
|
f"/user_avatars/{user_profile.realm_id}/realm/logo.png?version=2",
|
|
)
|
|
|
|
# This configuration isn't super supported in the UI and is a
|
|
# weird choice, but we have a test for it anyway.
|
|
do_change_logo_source(
|
|
user_profile.realm, Realm.LOGO_DEFAULT, night=False, acting_user=user_profile
|
|
)
|
|
page_params = {"color_scheme": user_profile.COLOR_SCHEME_NIGHT}
|
|
add_realm_logo_fields(page_params, user_profile.realm)
|
|
self.assertEqual(
|
|
compute_navbar_logo_url(page_params),
|
|
f"/user_avatars/{user_profile.realm_id}/realm/night_logo.png?version=2",
|
|
)
|
|
|
|
page_params = {"color_scheme": user_profile.COLOR_SCHEME_LIGHT}
|
|
add_realm_logo_fields(page_params, user_profile.realm)
|
|
self.assertEqual(
|
|
compute_navbar_logo_url(page_params), "/static/images/logo/zulip-org-logo.svg?version=0"
|
|
)
|
|
|
|
@override_settings(SERVER_UPGRADE_NAG_DEADLINE_DAYS=365)
|
|
def test_is_outdated_server(self) -> None:
|
|
# Check when server_upgrade_nag_deadline > last_server_upgrade_time
|
|
hamlet = self.example_user("hamlet")
|
|
iago = self.example_user("iago")
|
|
now = LAST_SERVER_UPGRADE_TIME.replace(tzinfo=pytz.utc)
|
|
with patch("zerver.lib.home.timezone_now", return_value=now + timedelta(days=10)):
|
|
self.assertEqual(is_outdated_server(iago), False)
|
|
self.assertEqual(is_outdated_server(hamlet), False)
|
|
self.assertEqual(is_outdated_server(None), False)
|
|
|
|
with patch("zerver.lib.home.timezone_now", return_value=now + timedelta(days=397)):
|
|
self.assertEqual(is_outdated_server(iago), True)
|
|
self.assertEqual(is_outdated_server(hamlet), True)
|
|
self.assertEqual(is_outdated_server(None), True)
|
|
|
|
with patch("zerver.lib.home.timezone_now", return_value=now + timedelta(days=380)):
|
|
self.assertEqual(is_outdated_server(iago), True)
|
|
self.assertEqual(is_outdated_server(hamlet), False)
|
|
self.assertEqual(is_outdated_server(None), False)
|
|
|
|
def test_furthest_read_time(self) -> None:
|
|
msg_id = self.send_test_message("hello!", sender_name="iago")
|
|
|
|
hamlet = self.example_user("hamlet")
|
|
self.login_user(hamlet)
|
|
self.client_post(
|
|
"/json/messages/flags",
|
|
{"messages": orjson.dumps([msg_id]).decode(), "op": "add", "flag": "read"},
|
|
)
|
|
|
|
# Manually process the UserActivity
|
|
now = timezone_now()
|
|
activity_time = calendar.timegm(now.timetuple())
|
|
user_activity_event = {
|
|
"user_profile_id": hamlet.id,
|
|
"client_id": 1,
|
|
"query": "update_message_flags",
|
|
"time": activity_time,
|
|
}
|
|
|
|
yesterday = now - timedelta(days=1)
|
|
activity_time_2 = calendar.timegm(yesterday.timetuple())
|
|
user_activity_event_2 = {
|
|
"user_profile_id": hamlet.id,
|
|
"client_id": 2,
|
|
"query": "update_message_flags",
|
|
"time": activity_time_2,
|
|
}
|
|
UserActivityWorker().consume_batch([user_activity_event, user_activity_event_2])
|
|
|
|
# verify furthest_read_time is last activity time, irrespective of client
|
|
furthest_read_time = get_furthest_read_time(hamlet)
|
|
self.assertGreaterEqual(furthest_read_time, activity_time)
|
|
|
|
# Check when user has no activity
|
|
UserActivity.objects.filter(user_profile=hamlet).delete()
|
|
furthest_read_time = get_furthest_read_time(hamlet)
|
|
self.assertIsNone(furthest_read_time)
|
|
|
|
# Check no user profile handling
|
|
furthest_read_time = get_furthest_read_time(None)
|
|
self.assertIsNotNone(furthest_read_time)
|
|
|
|
def test_subdomain_homepage(self) -> None:
|
|
self.login("hamlet")
|
|
with self.settings(ROOT_DOMAIN_LANDING_PAGE=True):
|
|
with patch("zerver.views.home.get_subdomain", return_value=""):
|
|
result = self._get_home_page()
|
|
self.assertEqual(result.status_code, 200)
|
|
self.assert_in_response("Chat for distributed teams", result)
|
|
|
|
with patch("zerver.views.home.get_subdomain", return_value="subdomain"):
|
|
result = self._get_home_page()
|
|
self._sanity_check(result)
|
|
|
|
def send_test_message(
|
|
self,
|
|
content: str,
|
|
sender_name: str = "iago",
|
|
stream_name: str = "Denmark",
|
|
topic_name: str = "foo",
|
|
) -> int:
|
|
sender = self.example_user(sender_name)
|
|
return self.send_stream_message(sender, stream_name, content=content, topic_name=topic_name)
|
|
|
|
def soft_activate_and_get_unread_count(
|
|
self, stream: str = "Denmark", topic: str = "foo"
|
|
) -> int:
|
|
stream_narrow = self._get_home_page(stream=stream, topic=topic)
|
|
page_params = self._get_page_params(stream_narrow)
|
|
return page_params["unread_msgs"]["count"]
|
|
|
|
def test_unread_count_user_soft_deactivation(self) -> None:
|
|
# In this test we make sure if a soft deactivated user had unread
|
|
# messages before deactivation they remain same way after activation.
|
|
long_term_idle_user = self.example_user("hamlet")
|
|
self.login_user(long_term_idle_user)
|
|
message = "Test Message 1"
|
|
self.send_test_message(message)
|
|
with queries_captured() as queries:
|
|
self.assertEqual(self.soft_activate_and_get_unread_count(), 1)
|
|
query_count = len(queries)
|
|
user_msg_list = get_user_messages(long_term_idle_user)
|
|
self.assertEqual(user_msg_list[-1].content, message)
|
|
self.logout()
|
|
|
|
with self.assertLogs(logger_string, level="INFO") as info_log:
|
|
do_soft_deactivate_users([long_term_idle_user])
|
|
self.assertEqual(
|
|
info_log.output,
|
|
[
|
|
f"INFO:{logger_string}:Soft deactivated user {long_term_idle_user.id}",
|
|
f"INFO:{logger_string}:Soft-deactivated batch of 1 users; 0 remain to process",
|
|
],
|
|
)
|
|
|
|
self.login_user(long_term_idle_user)
|
|
message = "Test Message 2"
|
|
self.send_test_message(message)
|
|
idle_user_msg_list = get_user_messages(long_term_idle_user)
|
|
self.assertNotEqual(idle_user_msg_list[-1].content, message)
|
|
with queries_captured() as queries:
|
|
self.assertEqual(self.soft_activate_and_get_unread_count(), 2)
|
|
# Test here for query count to be at least 5 greater than previous count
|
|
# This will assure indirectly that add_missing_messages() was called.
|
|
self.assertGreaterEqual(len(queries) - query_count, 5)
|
|
idle_user_msg_list = get_user_messages(long_term_idle_user)
|
|
self.assertEqual(idle_user_msg_list[-1].content, message)
|
|
|
|
def test_multiple_user_soft_deactivations(self) -> None:
|
|
long_term_idle_user = self.example_user("hamlet")
|
|
# We are sending this message to ensure that long_term_idle_user has
|
|
# at least one UserMessage row.
|
|
self.send_test_message("Testing", sender_name="hamlet")
|
|
with self.assertLogs(logger_string, level="INFO") as info_log:
|
|
do_soft_deactivate_users([long_term_idle_user])
|
|
self.assertEqual(
|
|
info_log.output,
|
|
[
|
|
f"INFO:{logger_string}:Soft deactivated user {long_term_idle_user.id}",
|
|
f"INFO:{logger_string}:Soft-deactivated batch of 1 users; 0 remain to process",
|
|
],
|
|
)
|
|
|
|
message = "Test Message 1"
|
|
self.send_test_message(message)
|
|
self.login_user(long_term_idle_user)
|
|
with queries_captured() as queries:
|
|
self.assertEqual(self.soft_activate_and_get_unread_count(), 2)
|
|
query_count = len(queries)
|
|
long_term_idle_user.refresh_from_db()
|
|
self.assertFalse(long_term_idle_user.long_term_idle)
|
|
idle_user_msg_list = get_user_messages(long_term_idle_user)
|
|
self.assertEqual(idle_user_msg_list[-1].content, message)
|
|
|
|
message = "Test Message 2"
|
|
self.send_test_message(message)
|
|
with queries_captured() as queries:
|
|
self.assertEqual(self.soft_activate_and_get_unread_count(), 3)
|
|
# Test here for query count to be at least 5 less than previous count.
|
|
# This will assure add_missing_messages() isn't repeatedly called.
|
|
self.assertGreaterEqual(query_count - len(queries), 5)
|
|
idle_user_msg_list = get_user_messages(long_term_idle_user)
|
|
self.assertEqual(idle_user_msg_list[-1].content, message)
|
|
self.logout()
|
|
|
|
with self.assertLogs(logger_string, level="INFO") as info_log:
|
|
do_soft_deactivate_users([long_term_idle_user])
|
|
self.assertEqual(
|
|
info_log.output,
|
|
[
|
|
f"INFO:{logger_string}:Soft deactivated user {long_term_idle_user.id}",
|
|
f"INFO:{logger_string}:Soft-deactivated batch of 1 users; 0 remain to process",
|
|
],
|
|
)
|
|
|
|
message = "Test Message 3"
|
|
self.send_test_message(message)
|
|
self.login_user(long_term_idle_user)
|
|
with queries_captured() as queries:
|
|
self.assertEqual(self.soft_activate_and_get_unread_count(), 4)
|
|
query_count = len(queries)
|
|
long_term_idle_user.refresh_from_db()
|
|
self.assertFalse(long_term_idle_user.long_term_idle)
|
|
idle_user_msg_list = get_user_messages(long_term_idle_user)
|
|
self.assertEqual(idle_user_msg_list[-1].content, message)
|
|
|
|
message = "Test Message 4"
|
|
self.send_test_message(message)
|
|
with queries_captured() as queries:
|
|
self.assertEqual(self.soft_activate_and_get_unread_count(), 5)
|
|
self.assertGreaterEqual(query_count - len(queries), 5)
|
|
idle_user_msg_list = get_user_messages(long_term_idle_user)
|
|
self.assertEqual(idle_user_msg_list[-1].content, message)
|
|
self.logout()
|
|
|
|
def test_url_language(self) -> None:
|
|
user = self.example_user("hamlet")
|
|
user.default_language = "es"
|
|
user.save()
|
|
self.login_user(user)
|
|
result = self._get_home_page()
|
|
self.check_rendered_logged_in_app(result)
|
|
with patch("zerver.lib.events.request_event_queue", return_value=42), patch(
|
|
"zerver.lib.events.get_user_events", return_value=[]
|
|
):
|
|
result = self.client_get("/de/")
|
|
page_params = self._get_page_params(result)
|
|
self.assertEqual(page_params["default_language"], "es")
|
|
# TODO: Verify that the actual language we're using in the
|
|
# translation data is German.
|
|
|
|
def test_translation_data(self) -> None:
|
|
user = self.example_user("hamlet")
|
|
user.default_language = "es"
|
|
user.save()
|
|
self.login_user(user)
|
|
result = self._get_home_page()
|
|
self.check_rendered_logged_in_app(result)
|
|
|
|
page_params = self._get_page_params(result)
|
|
self.assertEqual(page_params["default_language"], "es")
|
|
|
|
def test_compute_show_invites_and_add_streams_admin(self) -> None:
|
|
user = self.example_user("iago")
|
|
|
|
realm = user.realm
|
|
realm.invite_to_realm_policy = Realm.POLICY_ADMINS_ONLY
|
|
realm.save()
|
|
|
|
show_invites, show_add_streams = compute_show_invites_and_add_streams(user)
|
|
self.assertEqual(show_invites, True)
|
|
self.assertEqual(show_add_streams, True)
|
|
|
|
def test_compute_show_invites_and_add_streams_require_admin(self) -> None:
|
|
user = self.example_user("hamlet")
|
|
|
|
realm = user.realm
|
|
realm.invite_to_realm_policy = Realm.POLICY_ADMINS_ONLY
|
|
realm.save()
|
|
|
|
show_invites, show_add_streams = compute_show_invites_and_add_streams(user)
|
|
self.assertEqual(show_invites, False)
|
|
self.assertEqual(show_add_streams, True)
|
|
|
|
def test_compute_show_invites_and_add_streams_guest(self) -> None:
|
|
user = self.example_user("polonius")
|
|
|
|
show_invites, show_add_streams = compute_show_invites_and_add_streams(user)
|
|
self.assertEqual(show_invites, False)
|
|
self.assertEqual(show_add_streams, False)
|
|
|
|
def test_compute_show_invites_and_add_streams_unauthenticated(self) -> None:
|
|
show_invites, show_add_streams = compute_show_invites_and_add_streams(None)
|
|
self.assertEqual(show_invites, False)
|
|
self.assertEqual(show_add_streams, False)
|