mirror of
https://github.com/zulip/zulip.git
synced 2025-10-23 04:52:12 +00:00
analytics: Improve do_increment_logging_stat performance.
The previous implementation using Django's `get_or_create` for `do_increment_logging_stat` involved two separate database queries, potentially leading to race conditions. Use an `ON CONFLICT ... DO UPDATE` (aka "upsert") query, which eliminates race conditions and improves performance. This is mildly complicated due to the different unique indexes across the various tables, and the need for bug-for-bug compatibility with the previous implementation. Fixes #28947. Co-authored-by: Alex Vandiver <alexmv@zulip.com>
This commit is contained in:
@@ -2,11 +2,10 @@ import logging
|
||||
import time
|
||||
from collections import OrderedDict, defaultdict
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Callable, Dict, Optional, Sequence, Tuple, Type, Union
|
||||
from typing import Callable, Dict, List, Optional, Sequence, Tuple, Type, Union
|
||||
|
||||
from django.conf import settings
|
||||
from django.db import connection, models
|
||||
from django.db.models import F
|
||||
from psycopg2.sql import SQL, Composable, Identifier, Literal
|
||||
from typing_extensions import TypeAlias, override
|
||||
|
||||
@@ -312,27 +311,47 @@ def do_increment_logging_stat(
|
||||
return
|
||||
|
||||
table = stat.data_collector.output_table
|
||||
id_args: Dict[str, Union[int, None]] = {}
|
||||
conflict_args: List[str] = []
|
||||
if table == RealmCount:
|
||||
assert isinstance(model_object_for_bucket, Realm)
|
||||
id_args: Dict[
|
||||
str, Optional[Union[Realm, UserProfile, Stream, "RemoteRealm", "RemoteZulipServer"]]
|
||||
] = {"realm": model_object_for_bucket}
|
||||
id_args = {"realm_id": model_object_for_bucket.id}
|
||||
conflict_args = ["realm_id"]
|
||||
elif table == UserCount:
|
||||
assert isinstance(model_object_for_bucket, UserProfile)
|
||||
id_args = {"realm": model_object_for_bucket.realm, "user": model_object_for_bucket}
|
||||
id_args = {
|
||||
"realm_id": model_object_for_bucket.realm_id,
|
||||
"user_id": model_object_for_bucket.id,
|
||||
}
|
||||
conflict_args = ["user_id"]
|
||||
elif table == StreamCount:
|
||||
assert isinstance(model_object_for_bucket, Stream)
|
||||
id_args = {"realm": model_object_for_bucket.realm, "stream": model_object_for_bucket}
|
||||
id_args = {
|
||||
"realm_id": model_object_for_bucket.realm_id,
|
||||
"stream_id": model_object_for_bucket.id,
|
||||
}
|
||||
conflict_args = ["stream_id"]
|
||||
elif table == RemoteInstallationCount:
|
||||
assert isinstance(model_object_for_bucket, RemoteZulipServer)
|
||||
id_args = {"server": model_object_for_bucket, "remote_id": None}
|
||||
id_args = {"server_id": model_object_for_bucket.id, "remote_id": None}
|
||||
conflict_args = ["server_id"]
|
||||
elif table == RemoteRealmCount:
|
||||
assert isinstance(model_object_for_bucket, RemoteRealm)
|
||||
# For RemoteRealmCount (e.g. `mobile_pushes_forwarded::day`),
|
||||
# we have no `remote_id` nor `realm_id`, since they are not
|
||||
# imported from the remote server, which is the source of
|
||||
# truth of those two columns. Their "ON CONFLICT" is thus the
|
||||
# only unique key we have, which is `remote_realm_id`, and not
|
||||
# `server_id` / `realm_id`.
|
||||
id_args = {
|
||||
"server": model_object_for_bucket.server,
|
||||
"remote_realm": model_object_for_bucket,
|
||||
"server_id": model_object_for_bucket.server_id,
|
||||
"remote_realm_id": model_object_for_bucket.id,
|
||||
"remote_id": None,
|
||||
"realm_id": None,
|
||||
}
|
||||
conflict_args = [
|
||||
"remote_realm_id",
|
||||
]
|
||||
else:
|
||||
raise AssertionError("Unsupported CountStat output_table")
|
||||
|
||||
@@ -343,16 +362,49 @@ def do_increment_logging_stat(
|
||||
else:
|
||||
raise AssertionError("Unsupported CountStat frequency")
|
||||
|
||||
row, created = table._default_manager.get_or_create(
|
||||
property=stat.property,
|
||||
subgroup=subgroup,
|
||||
end_time=end_time,
|
||||
defaults={"value": increment},
|
||||
**id_args,
|
||||
is_subgroup: SQL = SQL("NULL")
|
||||
if subgroup is not None:
|
||||
is_subgroup = SQL("NOT NULL")
|
||||
# For backwards consistency, we cast the subgroup to a string
|
||||
# in Python; this emulates the behaviour of `get_or_create`,
|
||||
# which was previously used in this function, and performed
|
||||
# this cast because the `subgroup` column is defined as a
|
||||
# `CharField`. Omitting this explicit cast causes a subgroup
|
||||
# of the boolean False to be passed as the PostgreSQL false,
|
||||
# which it stringifies as the lower-case `'false'`, not the
|
||||
# initial-case `'False'` if Python stringifies it.
|
||||
#
|
||||
# Other parts of the system (e.g. count_message_by_user_query)
|
||||
# already use PostgreSQL to cast bools to strings, resulting
|
||||
# in `subgroup` values of lower-case `'false'` -- for example
|
||||
# in `messages_sent:is_bot:hour`. Fixing this inconsistency
|
||||
# via a migration is complicated by these records being
|
||||
# exchanged over the wire from remote servers.
|
||||
subgroup = str(subgroup)
|
||||
conflict_args.append("subgroup")
|
||||
|
||||
id_column_names = SQL(", ").join(map(Identifier, id_args.keys()))
|
||||
id_values = SQL(", ").join(map(Literal, id_args.values()))
|
||||
conflict_columns = SQL(", ").join(map(Identifier, conflict_args))
|
||||
|
||||
sql_query = SQL(
|
||||
"""
|
||||
INSERT INTO {table_name}(property, subgroup, end_time, value, {id_column_names})
|
||||
VALUES (%s, %s, %s, %s, {id_values})
|
||||
ON CONFLICT (property, end_time, {conflict_columns})
|
||||
WHERE subgroup IS {is_subgroup}
|
||||
DO UPDATE SET
|
||||
value = {table_name}.value + EXCLUDED.value
|
||||
"""
|
||||
).format(
|
||||
table_name=Identifier(table._meta.db_table),
|
||||
id_column_names=id_column_names,
|
||||
id_values=id_values,
|
||||
conflict_columns=conflict_columns,
|
||||
is_subgroup=is_subgroup,
|
||||
)
|
||||
if not created:
|
||||
row.value = F("value") + increment
|
||||
row.save(update_fields=["value"])
|
||||
with connection.cursor() as cursor:
|
||||
cursor.execute(sql_query, [stat.property, subgroup, end_time, increment])
|
||||
|
||||
|
||||
def do_drop_all_analytics_tables() -> None:
|
||||
|
@@ -1326,7 +1326,7 @@ class TestDoIncrementLoggingStat(AnalyticsTestCase):
|
||||
|
||||
def test_do_increment_logging_start_query_count(self) -> None:
|
||||
stat = LoggingCountStat("test", RealmCount, CountStat.DAY)
|
||||
with self.assert_database_query_count(2):
|
||||
with self.assert_database_query_count(1):
|
||||
do_increment_logging_stat(self.default_realm, stat, None, self.TIME_ZERO)
|
||||
|
||||
|
||||
|
Reference in New Issue
Block a user