from datetime import datetime, timedelta, timezone
from typing import List, Optional
from unittest import mock
import orjson
from django.http import HttpResponse
from django.utils.timezone import now as timezone_now
from analytics.lib.counts import COUNT_STATS, CountStat
from analytics.lib.time_utils import time_range
from analytics.models import FillState, RealmCount, UserCount
from analytics.views import rewrite_client_arrays, sort_by_totals, sort_client_labels
from corporate.lib.stripe import add_months, update_sponsorship_status
from corporate.models import Customer, CustomerPlan, LicenseLedger, get_customer_by_realm
from zerver.lib.actions import (
    do_create_multiuse_invite_link,
    do_send_realm_reactivation_email,
    do_set_realm_property,
)
from zerver.lib.test_classes import ZulipTestCase
from zerver.lib.test_helpers import reset_emails_in_zulip_realm
from zerver.lib.timestamp import ceiling_to_day, ceiling_to_hour, datetime_to_timestamp
from zerver.models import (
    Client,
    MultiuseInvite,
    PreregistrationUser,
    Realm,
    UserMessage,
    UserProfile,
    get_realm,
)
class TestStatsEndpoint(ZulipTestCase):
    def test_stats(self) -> None:
        self.user = self.example_user("hamlet")
        self.login_user(self.user)
        result = self.client_get("/stats")
        self.assertEqual(result.status_code, 200)
        # Check that we get something back
        self.assert_in_response("Zulip analytics for", result)
    def test_guest_user_cant_access_stats(self) -> None:
        self.user = self.example_user("polonius")
        self.login_user(self.user)
        result = self.client_get("/stats")
        self.assert_json_error(result, "Not allowed for guest users", 400)
        result = self.client_get("/json/analytics/chart_data")
        self.assert_json_error(result, "Not allowed for guest users", 400)
    def test_stats_for_realm(self) -> None:
        user = self.example_user("hamlet")
        self.login_user(user)
        result = self.client_get("/stats/realm/zulip/")
        self.assertEqual(result.status_code, 302)
        result = self.client_get("/stats/realm/not_existing_realm/")
        self.assertEqual(result.status_code, 302)
        user = self.example_user("hamlet")
        user.is_staff = True
        user.save(update_fields=["is_staff"])
        result = self.client_get("/stats/realm/not_existing_realm/")
        self.assertEqual(result.status_code, 404)
        result = self.client_get("/stats/realm/zulip/")
        self.assertEqual(result.status_code, 200)
        self.assert_in_response("Zulip analytics for", result)
    def test_stats_for_installation(self) -> None:
        user = self.example_user("hamlet")
        self.login_user(user)
        result = self.client_get("/stats/installation")
        self.assertEqual(result.status_code, 302)
        user = self.example_user("hamlet")
        user.is_staff = True
        user.save(update_fields=["is_staff"])
        result = self.client_get("/stats/installation")
        self.assertEqual(result.status_code, 200)
        self.assert_in_response("Zulip analytics for", result)
class TestGetChartData(ZulipTestCase):
    def setUp(self) -> None:
        super().setUp()
        self.realm = get_realm("zulip")
        self.user = self.example_user("hamlet")
        self.login_user(self.user)
        self.end_times_hour = [
            ceiling_to_hour(self.realm.date_created) + timedelta(hours=i) for i in range(4)
        ]
        self.end_times_day = [
            ceiling_to_day(self.realm.date_created) + timedelta(days=i) for i in range(4)
        ]
    def data(self, i: int) -> List[int]:
        return [0, 0, i, 0]
    def insert_data(
        self, stat: CountStat, realm_subgroups: List[Optional[str]], user_subgroups: List[str]
    ) -> None:
        if stat.frequency == CountStat.HOUR:
            insert_time = self.end_times_hour[2]
            fill_time = self.end_times_hour[-1]
        if stat.frequency == CountStat.DAY:
            insert_time = self.end_times_day[2]
            fill_time = self.end_times_day[-1]
        RealmCount.objects.bulk_create(
            RealmCount(
                property=stat.property,
                subgroup=subgroup,
                end_time=insert_time,
                value=100 + i,
                realm=self.realm,
            )
            for i, subgroup in enumerate(realm_subgroups)
        )
        UserCount.objects.bulk_create(
            UserCount(
                property=stat.property,
                subgroup=subgroup,
                end_time=insert_time,
                value=200 + i,
                realm=self.realm,
                user=self.user,
            )
            for i, subgroup in enumerate(user_subgroups)
        )
        FillState.objects.create(property=stat.property, end_time=fill_time, state=FillState.DONE)
    def test_number_of_humans(self) -> None:
        stat = COUNT_STATS["realm_active_humans::day"]
        self.insert_data(stat, [None], [])
        stat = COUNT_STATS["1day_actives::day"]
        self.insert_data(stat, [None], [])
        stat = COUNT_STATS["active_users_audit:is_bot:day"]
        self.insert_data(stat, ["false"], [])
        result = self.client_get("/json/analytics/chart_data", {"chart_name": "number_of_humans"})
        self.assert_json_success(result)
        data = result.json()
        self.assertEqual(
            data,
            {
                "msg": "",
                "end_times": [datetime_to_timestamp(dt) for dt in self.end_times_day],
                "frequency": CountStat.DAY,
                "everyone": {
                    "_1day": self.data(100),
                    "_15day": self.data(100),
                    "all_time": self.data(100),
                },
                "display_order": None,
                "result": "success",
            },
        )
    def test_messages_sent_over_time(self) -> None:
        stat = COUNT_STATS["messages_sent:is_bot:hour"]
        self.insert_data(stat, ["true", "false"], ["false"])
        result = self.client_get(
            "/json/analytics/chart_data", {"chart_name": "messages_sent_over_time"}
        )
        self.assert_json_success(result)
        data = result.json()
        self.assertEqual(
            data,
            {
                "msg": "",
                "end_times": [datetime_to_timestamp(dt) for dt in self.end_times_hour],
                "frequency": CountStat.HOUR,
                "everyone": {"bot": self.data(100), "human": self.data(101)},
                "user": {"bot": self.data(0), "human": self.data(200)},
                "display_order": None,
                "result": "success",
            },
        )
    def test_messages_sent_by_message_type(self) -> None:
        stat = COUNT_STATS["messages_sent:message_type:day"]
        self.insert_data(
            stat, ["public_stream", "private_message"], ["public_stream", "private_stream"]
        )
        result = self.client_get(
            "/json/analytics/chart_data", {"chart_name": "messages_sent_by_message_type"}
        )
        self.assert_json_success(result)
        data = result.json()
        self.assertEqual(
            data,
            {
                "msg": "",
                "end_times": [datetime_to_timestamp(dt) for dt in self.end_times_day],
                "frequency": CountStat.DAY,
                "everyone": {
                    "Public streams": self.data(100),
                    "Private streams": self.data(0),
                    "Private messages": self.data(101),
                    "Group private messages": self.data(0),
                },
                "user": {
                    "Public streams": self.data(200),
                    "Private streams": self.data(201),
                    "Private messages": self.data(0),
                    "Group private messages": self.data(0),
                },
                "display_order": [
                    "Private messages",
                    "Public streams",
                    "Private streams",
                    "Group private messages",
                ],
                "result": "success",
            },
        )
    def test_messages_sent_by_client(self) -> None:
        stat = COUNT_STATS["messages_sent:client:day"]
        client1 = Client.objects.create(name="client 1")
        client2 = Client.objects.create(name="client 2")
        client3 = Client.objects.create(name="client 3")
        client4 = Client.objects.create(name="client 4")
        self.insert_data(stat, [client4.id, client3.id, client2.id], [client3.id, client1.id])
        result = self.client_get(
            "/json/analytics/chart_data", {"chart_name": "messages_sent_by_client"}
        )
        self.assert_json_success(result)
        data = result.json()
        self.assertEqual(
            data,
            {
                "msg": "",
                "end_times": [datetime_to_timestamp(dt) for dt in self.end_times_day],
                "frequency": CountStat.DAY,
                "everyone": {
                    "client 4": self.data(100),
                    "client 3": self.data(101),
                    "client 2": self.data(102),
                },
                "user": {"client 3": self.data(200), "client 1": self.data(201)},
                "display_order": ["client 1", "client 2", "client 3", "client 4"],
                "result": "success",
            },
        )
    def test_messages_read_over_time(self) -> None:
        stat = COUNT_STATS["messages_read::hour"]
        self.insert_data(stat, [None], [])
        result = self.client_get(
            "/json/analytics/chart_data", {"chart_name": "messages_read_over_time"}
        )
        self.assert_json_success(result)
        data = result.json()
        self.assertEqual(
            data,
            {
                "msg": "",
                "end_times": [datetime_to_timestamp(dt) for dt in self.end_times_hour],
                "frequency": CountStat.HOUR,
                "everyone": {"read": self.data(100)},
                "user": {"read": self.data(0)},
                "display_order": None,
                "result": "success",
            },
        )
    def test_include_empty_subgroups(self) -> None:
        FillState.objects.create(
            property="realm_active_humans::day",
            end_time=self.end_times_day[0],
            state=FillState.DONE,
        )
        result = self.client_get("/json/analytics/chart_data", {"chart_name": "number_of_humans"})
        self.assert_json_success(result)
        data = result.json()
        self.assertEqual(data["everyone"], {"_1day": [0], "_15day": [0], "all_time": [0]})
        self.assertFalse("user" in data)
        FillState.objects.create(
            property="messages_sent:is_bot:hour",
            end_time=self.end_times_hour[0],
            state=FillState.DONE,
        )
        result = self.client_get(
            "/json/analytics/chart_data", {"chart_name": "messages_sent_over_time"}
        )
        self.assert_json_success(result)
        data = result.json()
        self.assertEqual(data["everyone"], {"human": [0], "bot": [0]})
        self.assertEqual(data["user"], {"human": [0], "bot": [0]})
        FillState.objects.create(
            property="messages_sent:message_type:day",
            end_time=self.end_times_day[0],
            state=FillState.DONE,
        )
        result = self.client_get(
            "/json/analytics/chart_data", {"chart_name": "messages_sent_by_message_type"}
        )
        self.assert_json_success(result)
        data = result.json()
        self.assertEqual(
            data["everyone"],
            {
                "Public streams": [0],
                "Private streams": [0],
                "Private messages": [0],
                "Group private messages": [0],
            },
        )
        self.assertEqual(
            data["user"],
            {
                "Public streams": [0],
                "Private streams": [0],
                "Private messages": [0],
                "Group private messages": [0],
            },
        )
        FillState.objects.create(
            property="messages_sent:client:day",
            end_time=self.end_times_day[0],
            state=FillState.DONE,
        )
        result = self.client_get(
            "/json/analytics/chart_data", {"chart_name": "messages_sent_by_client"}
        )
        self.assert_json_success(result)
        data = result.json()
        self.assertEqual(data["everyone"], {})
        self.assertEqual(data["user"], {})
    def test_start_and_end(self) -> None:
        stat = COUNT_STATS["realm_active_humans::day"]
        self.insert_data(stat, [None], [])
        stat = COUNT_STATS["1day_actives::day"]
        self.insert_data(stat, [None], [])
        stat = COUNT_STATS["active_users_audit:is_bot:day"]
        self.insert_data(stat, ["false"], [])
        end_time_timestamps = [datetime_to_timestamp(dt) for dt in self.end_times_day]
        # valid start and end
        result = self.client_get(
            "/json/analytics/chart_data",
            {
                "chart_name": "number_of_humans",
                "start": end_time_timestamps[1],
                "end": end_time_timestamps[2],
            },
        )
        self.assert_json_success(result)
        data = result.json()
        self.assertEqual(data["end_times"], end_time_timestamps[1:3])
        self.assertEqual(
            data["everyone"], {"_1day": [0, 100], "_15day": [0, 100], "all_time": [0, 100]}
        )
        # start later then end
        result = self.client_get(
            "/json/analytics/chart_data",
            {
                "chart_name": "number_of_humans",
                "start": end_time_timestamps[2],
                "end": end_time_timestamps[1],
            },
        )
        self.assert_json_error_contains(result, "Start time is later than")
    def test_min_length(self) -> None:
        stat = COUNT_STATS["realm_active_humans::day"]
        self.insert_data(stat, [None], [])
        stat = COUNT_STATS["1day_actives::day"]
        self.insert_data(stat, [None], [])
        stat = COUNT_STATS["active_users_audit:is_bot:day"]
        self.insert_data(stat, ["false"], [])
        # test min_length is too short to change anything
        result = self.client_get(
            "/json/analytics/chart_data", {"chart_name": "number_of_humans", "min_length": 2}
        )
        self.assert_json_success(result)
        data = result.json()
        self.assertEqual(
            data["end_times"], [datetime_to_timestamp(dt) for dt in self.end_times_day]
        )
        self.assertEqual(
            data["everyone"],
            {"_1day": self.data(100), "_15day": self.data(100), "all_time": self.data(100)},
        )
        # test min_length larger than filled data
        result = self.client_get(
            "/json/analytics/chart_data", {"chart_name": "number_of_humans", "min_length": 5}
        )
        self.assert_json_success(result)
        data = result.json()
        end_times = [
            ceiling_to_day(self.realm.date_created) + timedelta(days=i) for i in range(-1, 4)
        ]
        self.assertEqual(data["end_times"], [datetime_to_timestamp(dt) for dt in end_times])
        self.assertEqual(
            data["everyone"],
            {
                "_1day": [0, *self.data(100)],
                "_15day": [0, *self.data(100)],
                "all_time": [0, *self.data(100)],
            },
        )
    def test_non_existent_chart(self) -> None:
        result = self.client_get("/json/analytics/chart_data", {"chart_name": "does_not_exist"})
        self.assert_json_error_contains(result, "Unknown chart name")
    def test_analytics_not_running(self) -> None:
        realm = get_realm("zulip")
        self.assertEqual(FillState.objects.count(), 0)
        realm.date_created = timezone_now() - timedelta(days=3)
        realm.save(update_fields=["date_created"])
        with self.assertLogs(level="WARNING") as m:
            result = self.client_get(
                "/json/analytics/chart_data", {"chart_name": "messages_sent_over_time"}
            )
            self.assertEqual(
                m.output,
                [
                    f"WARNING:root:User from realm zulip attempted to access /stats, but the computed start time: {realm.date_created} (creation of realm or installation) is later than the computed end time: 0001-01-01 00:00:00+00:00 (last successful analytics update). Is the analytics cron job running?"
                ],
            )
        self.assert_json_error_contains(result, "No analytics data available")
        realm.date_created = timezone_now() - timedelta(days=1, hours=2)
        realm.save(update_fields=["date_created"])
        with self.assertLogs(level="WARNING") as m:
            result = self.client_get(
                "/json/analytics/chart_data", {"chart_name": "messages_sent_over_time"}
            )
            self.assertEqual(
                m.output,
                [
                    f"WARNING:root:User from realm zulip attempted to access /stats, but the computed start time: {realm.date_created} (creation of realm or installation) is later than the computed end time: 0001-01-01 00:00:00+00:00 (last successful analytics update). Is the analytics cron job running?"
                ],
            )
        self.assert_json_error_contains(result, "No analytics data available")
        realm.date_created = timezone_now() - timedelta(days=1, minutes=10)
        realm.save(update_fields=["date_created"])
        result = self.client_get(
            "/json/analytics/chart_data", {"chart_name": "messages_sent_over_time"}
        )
        self.assert_json_success(result)
        realm.date_created = timezone_now() - timedelta(hours=10)
        realm.save(update_fields=["date_created"])
        result = self.client_get(
            "/json/analytics/chart_data", {"chart_name": "messages_sent_over_time"}
        )
        self.assert_json_success(result)
        end_time = timezone_now() - timedelta(days=5)
        fill_state = FillState.objects.create(
            property="messages_sent:is_bot:hour", end_time=end_time, state=FillState.DONE
        )
        realm.date_created = timezone_now() - timedelta(days=3)
        realm.save(update_fields=["date_created"])
        with self.assertLogs(level="WARNING") as m:
            result = self.client_get(
                "/json/analytics/chart_data", {"chart_name": "messages_sent_over_time"}
            )
            self.assertEqual(
                m.output,
                [
                    f"WARNING:root:User from realm zulip attempted to access /stats, but the computed start time: {realm.date_created} (creation of realm or installation) is later than the computed end time: {end_time} (last successful analytics update). Is the analytics cron job running?"
                ],
            )
        self.assert_json_error_contains(result, "No analytics data available")
        realm.date_created = timezone_now() - timedelta(days=1, minutes=10)
        realm.save(update_fields=["date_created"])
        result = self.client_get(
            "/json/analytics/chart_data", {"chart_name": "messages_sent_over_time"}
        )
        self.assert_json_success(result)
        end_time = timezone_now() - timedelta(days=2)
        fill_state.end_time = end_time
        fill_state.save(update_fields=["end_time"])
        realm.date_created = timezone_now() - timedelta(days=3)
        realm.save(update_fields=["date_created"])
        result = self.client_get(
            "/json/analytics/chart_data", {"chart_name": "messages_sent_over_time"}
        )
        self.assert_json_success(result)
        realm.date_created = timezone_now() - timedelta(days=1, hours=2)
        realm.save(update_fields=["date_created"])
        with self.assertLogs(level="WARNING") as m:
            result = self.client_get(
                "/json/analytics/chart_data", {"chart_name": "messages_sent_over_time"}
            )
            self.assertEqual(
                m.output,
                [
                    f"WARNING:root:User from realm zulip attempted to access /stats, but the computed start time: {realm.date_created} (creation of realm or installation) is later than the computed end time: {end_time} (last successful analytics update). Is the analytics cron job running?"
                ],
            )
        self.assert_json_error_contains(result, "No analytics data available")
        realm.date_created = timezone_now() - timedelta(days=1, minutes=10)
        realm.save(update_fields=["date_created"])
        result = self.client_get(
            "/json/analytics/chart_data", {"chart_name": "messages_sent_over_time"}
        )
        self.assert_json_success(result)
    def test_get_chart_data_for_realm(self) -> None:
        user = self.example_user("hamlet")
        self.login_user(user)
        result = self.client_get(
            "/json/analytics/chart_data/realm/zulip", {"chart_name": "number_of_humans"}
        )
        self.assert_json_error(result, "Must be an server administrator", 400)
        user = self.example_user("hamlet")
        user.is_staff = True
        user.save(update_fields=["is_staff"])
        stat = COUNT_STATS["realm_active_humans::day"]
        self.insert_data(stat, [None], [])
        result = self.client_get(
            "/json/analytics/chart_data/realm/not_existing_realm",
            {"chart_name": "number_of_humans"},
        )
        self.assert_json_error(result, "Invalid organization", 400)
        result = self.client_get(
            "/json/analytics/chart_data/realm/zulip", {"chart_name": "number_of_humans"}
        )
        self.assert_json_success(result)
    def test_get_chart_data_for_installation(self) -> None:
        user = self.example_user("hamlet")
        self.login_user(user)
        result = self.client_get(
            "/json/analytics/chart_data/installation", {"chart_name": "number_of_humans"}
        )
        self.assert_json_error(result, "Must be an server administrator", 400)
        user = self.example_user("hamlet")
        user.is_staff = True
        user.save(update_fields=["is_staff"])
        stat = COUNT_STATS["realm_active_humans::day"]
        self.insert_data(stat, [None], [])
        result = self.client_get(
            "/json/analytics/chart_data/installation", {"chart_name": "number_of_humans"}
        )
        self.assert_json_success(result)
class TestSupportEndpoint(ZulipTestCase):
    def test_search(self) -> None:
        reset_emails_in_zulip_realm()
        def assert_user_details_in_html_response(
            html_response: str, full_name: str, email: str, role: str
        ) -> None:
            self.assert_in_success_response(
                [
                    'user\n',
                    f"
{full_name}
",
                    f"Email: {email}",
                    "Is active: True
",
                    f"Role: {role}
",
                ],
                html_response,
            )
        def check_hamlet_user_query_result(result: HttpResponse) -> None:
            assert_user_details_in_html_response(
                result, "King Hamlet", self.example_email("hamlet"), "Member"
            )
            self.assert_in_success_response(
                [
                    f"Admins: {self.example_email('iago')}\n",
                    f"Owners: {self.example_email('desdemona')}\n",
                    'class="copy-button" data-copytext="{}">'.format(self.example_email("iago")),
                    'class="copy-button" data-copytext="{}">'.format(
                        self.example_email("desdemona")
                    ),
                ],
                result,
            )
        def check_othello_user_query_result(result: HttpResponse) -> None:
            assert_user_details_in_html_response(
                result, "Othello, the Moor of Venice", self.example_email("othello"), "Member"
            )
        def check_polonius_user_query_result(result: HttpResponse) -> None:
            assert_user_details_in_html_response(
                result, "Polonius", self.example_email("polonius"), "Guest"
            )
        def check_zulip_realm_query_result(result: HttpResponse) -> None:
            zulip_realm = get_realm("zulip")
            first_human_user = zulip_realm.get_first_human_user()
            assert first_human_user is not None
            self.assert_in_success_response(
                [
                    f"First human user: {first_human_user.delivery_email}\n",
                    f'",
                    '',
                    '',
                    'input type="number" name="discount" value="None"',
                    '',
                    '',
                    'scrub-realm-button">',
                    'data-string-id="zulip"',
                ],
                result,
            )
        def check_lear_realm_query_result(result: HttpResponse) -> None:
            lear_realm = get_realm("lear")
            self.assert_in_success_response(
                [
                    f'",
                    '',
                    '',
                    'input type="number" name="discount" value="None"',
                    '',
                    '',
                    'scrub-realm-button">',
                    'data-string-id="lear"',
                    "Name: Zulip Standard",
                    "Status: Active",
                    "Billing schedule: Annual",
                    "Licenses: 2/10 (Manual)",
                    "Price per license: $80.0",
                    "Next invoice date: 02 January 2017",
                    '