analytics: Add realm argument to analytics.

This changeset is prepartory work for doing something reasonable with
analytics data during the zulip -> zulip data import process (and
potentially e.g. slack -> Zulip as well).

To support that, we need to make it possible to do our analytics
calculations for a single realm.

We do this while maintaining backwards compatibility and avoiding
massive duplicated code by adding an optional `realm` argument to the
entrypoints to the analytics system, especially process_count_stat.

More work involving restructuring FillState will be required for this
to be actually usable for its intented purpose, but this commit is a
nice checkpoint along the way.

Tweaked by tabbott to adjust comments and disable InstallationCount
updates when a realm argument is specified.
This commit is contained in:
arpit551
2020-01-16 07:13:51 +05:30
committed by Tim Abbott
parent 3f99985df3
commit b23a5431cd
2 changed files with 443 additions and 103 deletions

View File

@@ -63,13 +63,23 @@ class DependentCountStat(CountStat):
class DataCollector: class DataCollector:
def __init__(self, output_table: Type[BaseCount], def __init__(self, output_table: Type[BaseCount],
pull_function: Optional[Callable[[str, datetime, datetime], int]]) -> None: pull_function: Optional[Callable[[str, datetime, datetime, Optional[Realm]], int]]) -> None:
self.output_table = output_table self.output_table = output_table
self.pull_function = pull_function self.pull_function = pull_function
## CountStat-level operations ## ## CountStat-level operations ##
def process_count_stat(stat: CountStat, fill_to_time: datetime) -> None: def process_count_stat(stat: CountStat, fill_to_time: datetime,
realm: Optional[Realm]=None) -> None:
# TODO: The realm argument is not yet supported, in that we don't
# have a solution for how to update FillState if it is passed. It
# exists solely as partial plumbing for when we do fully implement
# doing single-realm analytics runs for use cases like data import.
#
# Also, note that for the realm argument to be properly supported,
# the CountStat object passed in needs to have come from
# E.g. get_count_stats(realm), i.e. have the realm_id already
# entered into the SQL query defined by the CountState object.
if stat.frequency == CountStat.HOUR: if stat.frequency == CountStat.HOUR:
time_increment = timedelta(hours=1) time_increment = timedelta(hours=1)
elif stat.frequency == CountStat.DAY: elif stat.frequency == CountStat.DAY:
@@ -113,7 +123,7 @@ def process_count_stat(stat: CountStat, fill_to_time: datetime) -> None:
logger.info("START %s %s" % (stat.property, currently_filled)) logger.info("START %s %s" % (stat.property, currently_filled))
start = time.time() start = time.time()
do_update_fill_state(fill_state, currently_filled, FillState.STARTED) do_update_fill_state(fill_state, currently_filled, FillState.STARTED)
do_fill_count_stat_at_hour(stat, currently_filled) do_fill_count_stat_at_hour(stat, currently_filled, realm)
do_update_fill_state(fill_state, currently_filled, FillState.DONE) do_update_fill_state(fill_state, currently_filled, FillState.DONE)
end = time.time() end = time.time()
currently_filled = currently_filled + time_increment currently_filled = currently_filled + time_increment
@@ -126,15 +136,15 @@ def do_update_fill_state(fill_state: FillState, end_time: datetime, state: int)
# We assume end_time is valid (e.g. is on a day or hour boundary as appropriate) # We assume end_time is valid (e.g. is on a day or hour boundary as appropriate)
# and is timezone aware. It is the caller's responsibility to enforce this! # and is timezone aware. It is the caller's responsibility to enforce this!
def do_fill_count_stat_at_hour(stat: CountStat, end_time: datetime) -> None: def do_fill_count_stat_at_hour(stat: CountStat, end_time: datetime, realm: Optional[Realm]=None) -> None:
start_time = end_time - stat.interval start_time = end_time - stat.interval
if not isinstance(stat, LoggingCountStat): if not isinstance(stat, LoggingCountStat):
timer = time.time() timer = time.time()
assert(stat.data_collector.pull_function is not None) assert(stat.data_collector.pull_function is not None)
rows_added = stat.data_collector.pull_function(stat.property, start_time, end_time) rows_added = stat.data_collector.pull_function(stat.property, start_time, end_time, realm)
logger.info("%s run pull_function (%dms/%sr)" % logger.info("%s run pull_function (%dms/%sr)" %
(stat.property, (time.time()-timer)*1000, rows_added)) (stat.property, (time.time()-timer)*1000, rows_added))
do_aggregate_to_summary_table(stat, end_time) do_aggregate_to_summary_table(stat, end_time, realm)
def do_delete_counts_at_hour(stat: CountStat, end_time: datetime) -> None: def do_delete_counts_at_hour(stat: CountStat, end_time: datetime) -> None:
if isinstance(stat, LoggingCountStat): if isinstance(stat, LoggingCountStat):
@@ -147,11 +157,17 @@ def do_delete_counts_at_hour(stat: CountStat, end_time: datetime) -> None:
RealmCount.objects.filter(property=stat.property, end_time=end_time).delete() RealmCount.objects.filter(property=stat.property, end_time=end_time).delete()
InstallationCount.objects.filter(property=stat.property, end_time=end_time).delete() InstallationCount.objects.filter(property=stat.property, end_time=end_time).delete()
def do_aggregate_to_summary_table(stat: CountStat, end_time: datetime) -> None: def do_aggregate_to_summary_table(stat: CountStat, end_time: datetime,
realm: Optional[Realm]=None) -> None:
cursor = connection.cursor() cursor = connection.cursor()
# Aggregate into RealmCount # Aggregate into RealmCount
output_table = stat.data_collector.output_table output_table = stat.data_collector.output_table
if realm is not None:
realm_clause = "AND zerver_realm.id = %s" % (realm.id,)
else:
realm_clause = ""
if output_table in (UserCount, StreamCount): if output_table in (UserCount, StreamCount):
realmcount_query = """ realmcount_query = """
INSERT INTO analytics_realmcount INSERT INTO analytics_realmcount
@@ -166,32 +182,40 @@ def do_aggregate_to_summary_table(stat: CountStat, end_time: datetime) -> None:
WHERE WHERE
%(output_table)s.property = '%(property)s' AND %(output_table)s.property = '%(property)s' AND
%(output_table)s.end_time = %%(end_time)s %(output_table)s.end_time = %%(end_time)s
%(realm_clause)s
GROUP BY zerver_realm.id, %(output_table)s.subgroup GROUP BY zerver_realm.id, %(output_table)s.subgroup
""" % {'output_table': output_table._meta.db_table, """ % {'output_table': output_table._meta.db_table,
'property': stat.property} 'property': stat.property,
'realm_clause': realm_clause}
start = time.time() start = time.time()
cursor.execute(realmcount_query, {'end_time': end_time}) cursor.execute(realmcount_query, {'end_time': end_time})
end = time.time() end = time.time()
logger.info("%s RealmCount aggregation (%dms/%sr)" % ( logger.info("%s RealmCount aggregation (%dms/%sr)" % (
stat.property, (end - start) * 1000, cursor.rowcount)) stat.property, (end - start) * 1000, cursor.rowcount))
# Aggregate into InstallationCount if realm is None:
installationcount_query = """ # Aggregate into InstallationCount. Only run if we just
INSERT INTO analytics_installationcount # processed counts for all realms.
(value, property, subgroup, end_time) #
SELECT # TODO: Add support for updating installation data after
sum(value), '%(property)s', analytics_realmcount.subgroup, %%(end_time)s # changing an individual realm's values.
FROM analytics_realmcount installationcount_query = """
WHERE INSERT INTO analytics_installationcount
property = '%(property)s' AND (value, property, subgroup, end_time)
end_time = %%(end_time)s SELECT
GROUP BY analytics_realmcount.subgroup sum(value), '%(property)s', analytics_realmcount.subgroup, %%(end_time)s
""" % {'property': stat.property} FROM analytics_realmcount
start = time.time() WHERE
cursor.execute(installationcount_query, {'end_time': end_time}) property = '%(property)s' AND
end = time.time() end_time = %%(end_time)s
logger.info("%s InstallationCount aggregation (%dms/%sr)" % ( GROUP BY analytics_realmcount.subgroup
stat.property, (end - start) * 1000, cursor.rowcount)) """ % {'property': stat.property}
start = time.time()
cursor.execute(installationcount_query, {'end_time': end_time})
end = time.time()
logger.info("%s InstallationCount aggregation (%dms/%sr)" % (
stat.property, (end - start) * 1000, cursor.rowcount))
cursor.close() cursor.close()
## Utility functions called from outside counts.py ## ## Utility functions called from outside counts.py ##
@@ -240,7 +264,7 @@ def do_pull_by_sql_query(property: str, start_time: datetime, end_time: datetime
group_by: Optional[Tuple[models.Model, str]]) -> int: group_by: Optional[Tuple[models.Model, str]]) -> int:
if group_by is None: if group_by is None:
subgroup = 'NULL' subgroup = 'NULL'
group_by_clause = '' group_by_clause = ''
else: else:
subgroup = '%s.%s' % (group_by[0]._meta.db_table, group_by[1]) subgroup = '%s.%s' % (group_by[0]._meta.db_table, group_by[1])
group_by_clause = ', ' + subgroup group_by_clause = ', ' + subgroup
@@ -259,11 +283,13 @@ def do_pull_by_sql_query(property: str, start_time: datetime, end_time: datetime
def sql_data_collector(output_table: Type[BaseCount], query: str, def sql_data_collector(output_table: Type[BaseCount], query: str,
group_by: Optional[Tuple[models.Model, str]]) -> DataCollector: group_by: Optional[Tuple[models.Model, str]]) -> DataCollector:
def pull_function(property: str, start_time: datetime, end_time: datetime) -> int: def pull_function(property: str, start_time: datetime, end_time: datetime,
realm: Optional[Realm] = None) -> int:
return do_pull_by_sql_query(property, start_time, end_time, query, group_by) return do_pull_by_sql_query(property, start_time, end_time, query, group_by)
return DataCollector(output_table, pull_function) return DataCollector(output_table, pull_function)
def do_pull_minutes_active(property: str, start_time: datetime, end_time: datetime) -> int: def do_pull_minutes_active(property: str, start_time: datetime, end_time: datetime,
realm: Optional[Realm] = None) -> int:
user_activity_intervals = UserActivityInterval.objects.filter( user_activity_intervals = UserActivityInterval.objects.filter(
end__gt=start_time, start__lt=end_time end__gt=start_time, start__lt=end_time
).select_related( ).select_related(
@@ -273,9 +299,10 @@ def do_pull_minutes_active(property: str, start_time: datetime, end_time: dateti
seconds_active = defaultdict(float) # type: Dict[Tuple[int, int], float] seconds_active = defaultdict(float) # type: Dict[Tuple[int, int], float]
for user_id, realm_id, interval_start, interval_end in user_activity_intervals: for user_id, realm_id, interval_start, interval_end in user_activity_intervals:
start = max(start_time, interval_start) if realm is None or realm.id == realm_id:
end = min(end_time, interval_end) start = max(start_time, interval_start)
seconds_active[(user_id, realm_id)] += (end - start).total_seconds() end = min(end_time, interval_end)
seconds_active[(user_id, realm_id)] += (end - start).total_seconds()
rows = [UserCount(user_id=ids[0], realm_id=ids[1], property=property, rows = [UserCount(user_id=ids[0], realm_id=ids[1], property=property,
end_time=end_time, value=int(seconds // 60)) end_time=end_time, value=int(seconds // 60))
@@ -283,7 +310,12 @@ def do_pull_minutes_active(property: str, start_time: datetime, end_time: dateti
UserCount.objects.bulk_create(rows) UserCount.objects.bulk_create(rows)
return len(rows) return len(rows)
count_message_by_user_query = """ def count_message_by_user_query(realm: Optional[Realm]) -> str:
if realm is None:
realm_clause = ""
else:
realm_clause = "zerver_userprofile.realm_id = %s AND" % (realm.id,)
return """
INSERT INTO analytics_usercount INSERT INTO analytics_usercount
(user_id, realm_id, value, property, subgroup, end_time) (user_id, realm_id, value, property, subgroup, end_time)
SELECT SELECT
@@ -296,12 +328,18 @@ count_message_by_user_query = """
WHERE WHERE
zerver_userprofile.date_joined < %%(time_end)s AND zerver_userprofile.date_joined < %%(time_end)s AND
zerver_message.date_sent >= %%(time_start)s AND zerver_message.date_sent >= %%(time_start)s AND
{realm_clause}
zerver_message.date_sent < %%(time_end)s zerver_message.date_sent < %%(time_end)s
GROUP BY zerver_userprofile.id %(group_by_clause)s GROUP BY zerver_userprofile.id %(group_by_clause)s
""" """.format(realm_clause=realm_clause)
# Note: ignores the group_by / group_by_clause. # Note: ignores the group_by / group_by_clause.
count_message_type_by_user_query = """ def count_message_type_by_user_query(realm: Optional[Realm]) -> str:
if realm is None:
realm_clause = ""
else:
realm_clause = "zerver_userprofile.realm_id = %s AND" % (realm.id,)
return """
INSERT INTO analytics_usercount INSERT INTO analytics_usercount
(realm_id, user_id, value, property, subgroup, end_time) (realm_id, user_id, value, property, subgroup, end_time)
SELECT realm_id, id, SUM(count) AS value, '%(property)s', message_type, %%(time_end)s SELECT realm_id, id, SUM(count) AS value, '%(property)s', message_type, %%(time_end)s
@@ -323,6 +361,7 @@ count_message_type_by_user_query = """
ON ON
zerver_userprofile.id = zerver_message.sender_id AND zerver_userprofile.id = zerver_message.sender_id AND
zerver_message.date_sent >= %%(time_start)s AND zerver_message.date_sent >= %%(time_start)s AND
{realm_clause}
zerver_message.date_sent < %%(time_end)s zerver_message.date_sent < %%(time_end)s
JOIN zerver_recipient JOIN zerver_recipient
ON ON
@@ -335,13 +374,18 @@ count_message_type_by_user_query = """
zerver_recipient.type, zerver_stream.invite_only zerver_recipient.type, zerver_stream.invite_only
) AS subquery ) AS subquery
GROUP BY realm_id, id, message_type GROUP BY realm_id, id, message_type
""" """.format(realm_clause=realm_clause)
# This query joins to the UserProfile table since all current queries that # This query joins to the UserProfile table since all current queries that
# use this also subgroup on UserProfile.is_bot. If in the future there is a # use this also subgroup on UserProfile.is_bot. If in the future there is a
# stat that counts messages by stream and doesn't need the UserProfile # stat that counts messages by stream and doesn't need the UserProfile
# table, consider writing a new query for efficiency. # table, consider writing a new query for efficiency.
count_message_by_stream_query = """ def count_message_by_stream_query(realm: Optional[Realm]) -> str:
if realm is None:
realm_clause = ""
else:
realm_clause = "zerver_stream.realm_id = %s AND" % (realm.id,)
return """
INSERT INTO analytics_streamcount INSERT INTO analytics_streamcount
(stream_id, realm_id, value, property, subgroup, end_time) (stream_id, realm_id, value, property, subgroup, end_time)
SELECT SELECT
@@ -360,13 +404,19 @@ count_message_by_stream_query = """
zerver_stream.date_created < %%(time_end)s AND zerver_stream.date_created < %%(time_end)s AND
zerver_recipient.type = 2 AND zerver_recipient.type = 2 AND
zerver_message.date_sent >= %%(time_start)s AND zerver_message.date_sent >= %%(time_start)s AND
{realm_clause}
zerver_message.date_sent < %%(time_end)s zerver_message.date_sent < %%(time_end)s
GROUP BY zerver_stream.id %(group_by_clause)s GROUP BY zerver_stream.id %(group_by_clause)s
""" """.format(realm_clause=realm_clause)
# Hardcodes the query needed by active_users:is_bot:day, since that is # Hardcodes the query needed by active_users:is_bot:day, since that is
# currently the only stat that uses this. # currently the only stat that uses this.
count_user_by_realm_query = """ def count_user_by_realm_query(realm: Optional[Realm]) -> str:
if realm is None:
realm_clause = ""
else:
realm_clause = "zerver_userprofile.realm_id = %s AND" % (realm.id,)
return """
INSERT INTO analytics_realmcount INSERT INTO analytics_realmcount
(realm_id, value, property, subgroup, end_time) (realm_id, value, property, subgroup, end_time)
SELECT SELECT
@@ -379,15 +429,21 @@ count_user_by_realm_query = """
zerver_realm.date_created < %%(time_end)s AND zerver_realm.date_created < %%(time_end)s AND
zerver_userprofile.date_joined >= %%(time_start)s AND zerver_userprofile.date_joined >= %%(time_start)s AND
zerver_userprofile.date_joined < %%(time_end)s AND zerver_userprofile.date_joined < %%(time_end)s AND
{realm_clause}
zerver_userprofile.is_active = TRUE zerver_userprofile.is_active = TRUE
GROUP BY zerver_realm.id %(group_by_clause)s GROUP BY zerver_realm.id %(group_by_clause)s
""" """.format(realm_clause=realm_clause)
# Currently hardcodes the query needed for active_users_audit:is_bot:day. # Currently hardcodes the query needed for active_users_audit:is_bot:day.
# Assumes that a user cannot have two RealmAuditLog entries with the same event_time and # Assumes that a user cannot have two RealmAuditLog entries with the same event_time and
# event_type in [RealmAuditLog.USER_CREATED, USER_DEACTIVATED, etc]. # event_type in [RealmAuditLog.USER_CREATED, USER_DEACTIVATED, etc].
# In particular, it's important to ensure that migrations don't cause that to happen. # In particular, it's important to ensure that migrations don't cause that to happen.
check_realmauditlog_by_user_query = """ def check_realmauditlog_by_user_query(realm: Optional[Realm]) -> str:
if realm is None:
realm_clause = ""
else:
realm_clause = "realm_id = %s AND" % (realm.id,)
return """
INSERT INTO analytics_usercount INSERT INTO analytics_usercount
(user_id, realm_id, value, property, subgroup, end_time) (user_id, realm_id, value, property, subgroup, end_time)
SELECT SELECT
@@ -398,6 +454,7 @@ check_realmauditlog_by_user_query = """
FROM zerver_realmauditlog FROM zerver_realmauditlog
WHERE WHERE
event_type in ({user_created}, {user_activated}, {user_deactivated}, {user_reactivated}) AND event_type in ({user_created}, {user_activated}, {user_deactivated}, {user_reactivated}) AND
{realm_clause}
event_time < %%(time_end)s event_time < %%(time_end)s
GROUP BY modified_user_id GROUP BY modified_user_id
) ral2 ) ral2
@@ -412,9 +469,15 @@ check_realmauditlog_by_user_query = """
""".format(user_created=RealmAuditLog.USER_CREATED, """.format(user_created=RealmAuditLog.USER_CREATED,
user_activated=RealmAuditLog.USER_ACTIVATED, user_activated=RealmAuditLog.USER_ACTIVATED,
user_deactivated=RealmAuditLog.USER_DEACTIVATED, user_deactivated=RealmAuditLog.USER_DEACTIVATED,
user_reactivated=RealmAuditLog.USER_REACTIVATED) user_reactivated=RealmAuditLog.USER_REACTIVATED,
realm_clause=realm_clause)
check_useractivityinterval_by_user_query = """ def check_useractivityinterval_by_user_query(realm: Optional[Realm]) -> str:
if realm is None:
realm_clause = ""
else:
realm_clause = "zerver_userprofile.realm_id = %s AND" % (realm.id,)
return """
INSERT INTO analytics_usercount INSERT INTO analytics_usercount
(user_id, realm_id, value, property, subgroup, end_time) (user_id, realm_id, value, property, subgroup, end_time)
SELECT SELECT
@@ -425,11 +488,17 @@ check_useractivityinterval_by_user_query = """
zerver_userprofile.id = zerver_useractivityinterval.user_profile_id zerver_userprofile.id = zerver_useractivityinterval.user_profile_id
WHERE WHERE
zerver_useractivityinterval.end >= %%(time_start)s AND zerver_useractivityinterval.end >= %%(time_start)s AND
{realm_clause}
zerver_useractivityinterval.start < %%(time_end)s zerver_useractivityinterval.start < %%(time_end)s
GROUP BY zerver_userprofile.id %(group_by_clause)s GROUP BY zerver_userprofile.id %(group_by_clause)s
""" """.format(realm_clause=realm_clause)
count_realm_active_humans_query = """ def count_realm_active_humans_query(realm: Optional[Realm]) -> str:
if realm is None:
realm_clause = ""
else:
realm_clause = "realm_id = %s AND" % (realm.id,)
return """
INSERT INTO analytics_realmcount INSERT INTO analytics_realmcount
(realm_id, value, property, subgroup, end_time) (realm_id, value, property, subgroup, end_time)
SELECT SELECT
@@ -440,6 +509,7 @@ count_realm_active_humans_query = """
WHERE WHERE
property = 'active_users_audit:is_bot:day' AND property = 'active_users_audit:is_bot:day' AND
subgroup = 'false' AND subgroup = 'false' AND
{realm_clause}
end_time = %%(time_end)s end_time = %%(time_end)s
) usercount1 ) usercount1
JOIN ( JOIN (
@@ -447,12 +517,13 @@ count_realm_active_humans_query = """
FROM analytics_usercount FROM analytics_usercount
WHERE WHERE
property = '15day_actives::day' AND property = '15day_actives::day' AND
{realm_clause}
end_time = %%(time_end)s end_time = %%(time_end)s
) usercount2 ) usercount2
ON ON
usercount1.user_id = usercount2.user_id usercount1.user_id = usercount2.user_id
GROUP BY usercount1.realm_id GROUP BY usercount1.realm_id
""" """.format(realm_clause=realm_clause)
# Currently unused and untested # Currently unused and untested
count_stream_by_realm_query = """ count_stream_by_realm_query = """
@@ -471,71 +542,84 @@ count_stream_by_realm_query = """
GROUP BY zerver_realm.id %(group_by_clause)s GROUP BY zerver_realm.id %(group_by_clause)s
""" """
## CountStat declarations ## def get_count_stats(realm: Optional[Realm]=None) -> Dict[str, CountStat]:
## CountStat declarations ##
count_stats_ = [ count_stats_ = [
# Messages Sent stats # Messages Sent stats
# Stats that count the number of messages sent in various ways. # Stats that count the number of messages sent in various ways.
# These are also the set of stats that read from the Message table. # These are also the set of stats that read from the Message table.
CountStat('messages_sent:is_bot:hour', CountStat('messages_sent:is_bot:hour',
sql_data_collector(UserCount, count_message_by_user_query, (UserProfile, 'is_bot')), sql_data_collector(UserCount, count_message_by_user_query(
CountStat.HOUR), realm), (UserProfile, 'is_bot')),
CountStat('messages_sent:message_type:day', CountStat.HOUR),
sql_data_collector(UserCount, count_message_type_by_user_query, None), CountStat.DAY), CountStat('messages_sent:message_type:day',
CountStat('messages_sent:client:day', sql_data_collector(
sql_data_collector(UserCount, count_message_by_user_query, (Message, 'sending_client_id')), UserCount, count_message_type_by_user_query(realm), None),
CountStat.DAY), CountStat.DAY),
CountStat('messages_in_stream:is_bot:day', CountStat('messages_sent:client:day',
sql_data_collector(StreamCount, count_message_by_stream_query, (UserProfile, 'is_bot')), sql_data_collector(UserCount, count_message_by_user_query(realm),
CountStat.DAY), (Message, 'sending_client_id')), CountStat.DAY),
CountStat('messages_in_stream:is_bot:day',
sql_data_collector(StreamCount, count_message_by_stream_query(realm),
(UserProfile, 'is_bot')), CountStat.DAY),
# Number of Users stats # Number of Users stats
# Stats that count the number of active users in the UserProfile.is_active sense. # Stats that count the number of active users in the UserProfile.is_active sense.
# 'active_users_audit:is_bot:day' is the canonical record of which users were # 'active_users_audit:is_bot:day' is the canonical record of which users were
# active on which days (in the UserProfile.is_active sense). # active on which days (in the UserProfile.is_active sense).
# Important that this stay a daily stat, so that 'realm_active_humans::day' works as expected. # Important that this stay a daily stat, so that 'realm_active_humans::day' works as expected.
CountStat('active_users_audit:is_bot:day', CountStat('active_users_audit:is_bot:day',
sql_data_collector(UserCount, check_realmauditlog_by_user_query, (UserProfile, 'is_bot')), sql_data_collector(UserCount, check_realmauditlog_by_user_query(
CountStat.DAY), realm), (UserProfile, 'is_bot')),
# Sanity check on 'active_users_audit:is_bot:day', and a archetype for future LoggingCountStats. CountStat.DAY),
# In RealmCount, 'active_users_audit:is_bot:day' should be the partial # Sanity check on 'active_users_audit:is_bot:day', and a archetype for future LoggingCountStats.
# sum sequence of 'active_users_log:is_bot:day', for any realm that # In RealmCount, 'active_users_audit:is_bot:day' should be the partial
# started after the latter stat was introduced. # sum sequence of 'active_users_log:is_bot:day', for any realm that
LoggingCountStat('active_users_log:is_bot:day', RealmCount, CountStat.DAY), # started after the latter stat was introduced.
# Another sanity check on 'active_users_audit:is_bot:day'. Is only an LoggingCountStat('active_users_log:is_bot:day',
# approximation, e.g. if a user is deactivated between the end of the RealmCount, CountStat.DAY),
# day and when this stat is run, they won't be counted. However, is the # Another sanity check on 'active_users_audit:is_bot:day'. Is only an
# simplest of the three to inspect by hand. # approximation, e.g. if a user is deactivated between the end of the
CountStat('active_users:is_bot:day', # day and when this stat is run, they won't be counted. However, is the
sql_data_collector(RealmCount, count_user_by_realm_query, (UserProfile, 'is_bot')), # simplest of the three to inspect by hand.
CountStat.DAY, interval=TIMEDELTA_MAX), CountStat('active_users:is_bot:day',
sql_data_collector(RealmCount, count_user_by_realm_query(realm), (UserProfile, 'is_bot')),
CountStat.DAY, interval=TIMEDELTA_MAX),
# User Activity stats # User Activity stats
# Stats that measure user activity in the UserActivityInterval sense. # Stats that measure user activity in the UserActivityInterval sense.
CountStat('1day_actives::day', CountStat('1day_actives::day',
sql_data_collector(UserCount, check_useractivityinterval_by_user_query, None), sql_data_collector(
CountStat.DAY, interval=timedelta(days=1)-UserActivityInterval.MIN_INTERVAL_LENGTH), UserCount, check_useractivityinterval_by_user_query(realm), None),
CountStat('15day_actives::day', CountStat.DAY, interval=timedelta(days=1)-UserActivityInterval.MIN_INTERVAL_LENGTH),
sql_data_collector(UserCount, check_useractivityinterval_by_user_query, None), CountStat('15day_actives::day',
CountStat.DAY, interval=timedelta(days=15)-UserActivityInterval.MIN_INTERVAL_LENGTH), sql_data_collector(
CountStat('minutes_active::day', DataCollector(UserCount, do_pull_minutes_active), CountStat.DAY), UserCount, check_useractivityinterval_by_user_query(realm), None),
CountStat.DAY, interval=timedelta(days=15)-UserActivityInterval.MIN_INTERVAL_LENGTH),
CountStat('minutes_active::day', DataCollector(
UserCount, do_pull_minutes_active), CountStat.DAY),
# Rate limiting stats # Rate limiting stats
# Used to limit the number of invitation emails sent by a realm # Used to limit the number of invitation emails sent by a realm
LoggingCountStat('invites_sent::day', RealmCount, CountStat.DAY), LoggingCountStat('invites_sent::day', RealmCount, CountStat.DAY),
# Dependent stats # Dependent stats
# Must come after their dependencies. # Must come after their dependencies.
# Canonical account of the number of active humans in a realm on each day. # Canonical account of the number of active humans in a realm on each day.
DependentCountStat('realm_active_humans::day', DependentCountStat('realm_active_humans::day',
sql_data_collector(RealmCount, count_realm_active_humans_query, None), sql_data_collector(
CountStat.DAY, RealmCount, count_realm_active_humans_query(realm), None),
dependencies=['active_users_audit:is_bot:day', '15day_actives::day']) CountStat.DAY,
] dependencies=['active_users_audit:is_bot:day', '15day_actives::day'])
]
COUNT_STATS = OrderedDict([(stat.property, stat) for stat in count_stats_]) return OrderedDict([(stat.property, stat) for stat in count_stats_])
# To avoid refactoring for now COUNT_STATS can be used as before
COUNT_STATS = get_count_stats()

View File

@@ -10,7 +10,7 @@ from django.test import TestCase
from django.utils.timezone import now as timezone_now from django.utils.timezone import now as timezone_now
from django.utils.timezone import utc as timezone_utc from django.utils.timezone import utc as timezone_utc
from analytics.lib.counts import COUNT_STATS, CountStat, \ from analytics.lib.counts import COUNT_STATS, CountStat, get_count_stats, \
DependentCountStat, LoggingCountStat, do_aggregate_to_summary_table, \ DependentCountStat, LoggingCountStat, do_aggregate_to_summary_table, \
do_drop_all_analytics_tables, do_drop_single_stat, \ do_drop_all_analytics_tables, do_drop_single_stat, \
do_fill_count_stat_at_hour, do_increment_logging_stat, \ do_fill_count_stat_at_hour, do_increment_logging_stat, \
@@ -363,6 +363,29 @@ class TestCountStats(AnalyticsTestCase):
self.assertTableState(UserCount, [], []) self.assertTableState(UserCount, [], [])
self.assertTableState(StreamCount, [], []) self.assertTableState(StreamCount, [], [])
def test_active_users_by_is_bot_for_realm_constraint(self) -> None:
# For single Realm
COUNT_STATS = get_count_stats(self.default_realm)
stat = COUNT_STATS['active_users:is_bot:day']
self.current_property = stat.property
# To be included
self.create_user(is_bot=True, date_joined=self.TIME_ZERO-25*self.HOUR)
self.create_user(is_bot=False)
# To be excluded
self.create_user(email='test@second.analytics',
realm=self.second_realm, date_joined=self.TIME_ZERO-2*self.DAY)
do_fill_count_stat_at_hour(stat, self.TIME_ZERO, self.default_realm)
self.assertTableState(RealmCount, ['value', 'subgroup'],
[[1, 'true'], [1, 'false']])
# No aggregation to InstallationCount with realm constraint
self.assertTableState(InstallationCount, ['value', 'subgroup'], [])
self.assertTableState(UserCount, [], [])
self.assertTableState(StreamCount, [], [])
def test_messages_sent_by_is_bot(self) -> None: def test_messages_sent_by_is_bot(self) -> None:
stat = COUNT_STATS['messages_sent:is_bot:hour'] stat = COUNT_STATS['messages_sent:is_bot:hour']
self.current_property = stat.property self.current_property = stat.property
@@ -392,6 +415,46 @@ class TestCountStats(AnalyticsTestCase):
self.assertTableState(InstallationCount, ['value', 'subgroup'], [[3, 'false'], [3, 'true']]) self.assertTableState(InstallationCount, ['value', 'subgroup'], [[3, 'false'], [3, 'true']])
self.assertTableState(StreamCount, [], []) self.assertTableState(StreamCount, [], [])
def test_messages_sent_by_is_bot_realm_constraint(self) -> None:
# For single Realm
COUNT_STATS = get_count_stats(self.default_realm)
stat = COUNT_STATS['messages_sent:is_bot:hour']
self.current_property = stat.property
bot = self.create_user(is_bot=True)
human1 = self.create_user()
human2 = self.create_user()
recipient_human1 = Recipient.objects.get(type_id=human1.id,
type=Recipient.PERSONAL)
recipient_stream = self.create_stream_with_recipient()[1]
recipient_huddle = self.create_huddle_with_recipient()[1]
# To be included
self.create_message(bot, recipient_human1)
self.create_message(bot, recipient_stream)
self.create_message(bot, recipient_huddle)
self.create_message(human1, recipient_human1)
self.create_message(human2, recipient_human1)
# To be excluded
self.create_message(self.hourly_user, recipient_human1)
self.create_message(self.hourly_user, recipient_stream)
self.create_message(self.hourly_user, recipient_huddle)
do_fill_count_stat_at_hour(stat, self.TIME_ZERO, self.default_realm)
self.assertTableState(UserCount, ['value', 'subgroup', 'user'],
[[1, 'false', human1], [1, 'false', human2],
[3, 'true', bot]])
self.assertTableState(RealmCount, ['value', 'subgroup', 'realm'],
[[2, 'false', self.default_realm],
[3, 'true', self.default_realm]])
# No aggregation to InstallationCount with realm constraint
self.assertTableState(InstallationCount, ['value', 'subgroup'], [])
self.assertTableState(StreamCount, [], [])
def test_messages_sent_by_message_type(self) -> None: def test_messages_sent_by_message_type(self) -> None:
stat = COUNT_STATS['messages_sent:message_type:day'] stat = COUNT_STATS['messages_sent:message_type:day']
self.current_property = stat.property self.current_property = stat.property
@@ -454,6 +517,43 @@ class TestCountStats(AnalyticsTestCase):
[2, 'huddle_message']]) [2, 'huddle_message']])
self.assertTableState(StreamCount, [], []) self.assertTableState(StreamCount, [], [])
def test_messages_sent_by_message_type_realm_constraint(self) -> None:
# For single Realm
COUNT_STATS = get_count_stats(self.default_realm)
stat = COUNT_STATS['messages_sent:message_type:day']
self.current_property = stat.property
user = self.create_user()
user_recipient = Recipient.objects.get(type_id=user.id, type=Recipient.PERSONAL)
private_stream_recipient = self.create_stream_with_recipient(invite_only=True)[1]
stream_recipient = self.create_stream_with_recipient()[1]
huddle_recipient = self.create_huddle_with_recipient()[1]
# To be included
self.create_message(user, user_recipient)
self.create_message(user, private_stream_recipient)
self.create_message(user, stream_recipient)
self.create_message(user, huddle_recipient)
do_fill_count_stat_at_hour(stat, self.TIME_ZERO, self.default_realm)
# To be excluded
self.create_message(self.hourly_user, user_recipient)
self.create_message(self.hourly_user, private_stream_recipient)
self.create_message(self.hourly_user, stream_recipient)
self.create_message(self.hourly_user, huddle_recipient)
self.assertTableState(UserCount, ['value', 'subgroup', 'user'],
[[1, 'private_message', user], [1, 'private_stream', user],
[1, 'huddle_message', user], [1, 'public_stream', user]])
self.assertTableState(RealmCount, ['value', 'subgroup'],
[[1, 'private_message'], [1, 'private_stream'],
[1, 'public_stream'], [1, 'huddle_message']])
# No aggregation to InstallationCount with realm constraint
self.assertTableState(InstallationCount, ['value', 'subgroup'], [])
self.assertTableState(StreamCount, [], [])
def test_messages_sent_to_recipients_with_same_id(self) -> None: def test_messages_sent_to_recipients_with_same_id(self) -> None:
stat = COUNT_STATS['messages_sent:message_type:day'] stat = COUNT_STATS['messages_sent:message_type:day']
self.current_property = stat.property self.current_property = stat.property
@@ -508,6 +608,42 @@ class TestCountStats(AnalyticsTestCase):
[[4, website_client_id], [3, client2_id]]) [[4, website_client_id], [3, client2_id]])
self.assertTableState(StreamCount, [], []) self.assertTableState(StreamCount, [], [])
def test_messages_sent_by_client_realm_constraint(self) -> None:
# For single Realm
COUNT_STATS = get_count_stats(self.default_realm)
stat = COUNT_STATS['messages_sent:client:day']
self.current_property = stat.property
user1 = self.create_user(is_bot=True)
user2 = self.create_user()
recipient_user2 = Recipient.objects.get(type_id=user2.id, type=Recipient.PERSONAL)
client2 = Client.objects.create(name='client2')
# TO be included
self.create_message(user1, recipient_user2, sending_client=client2)
self.create_message(user2, recipient_user2, sending_client=client2)
self.create_message(user2, recipient_user2)
# To be excluded
self.create_message(self.hourly_user, recipient_user2, sending_client=client2)
self.create_message(self.hourly_user, recipient_user2, sending_client=client2)
self.create_message(self.hourly_user, recipient_user2)
do_fill_count_stat_at_hour(stat, self.TIME_ZERO, self.default_realm)
client2_id = str(client2.id)
website_client_id = str(get_client('website').id) # default for self.create_message
self.assertTableState(UserCount, ['value', 'subgroup', 'user'],
[[1, client2_id, user1], [1, client2_id, user2],
[1, website_client_id, user2]])
self.assertTableState(RealmCount, ['value', 'subgroup'],
[[1, website_client_id], [2, client2_id]])
# No aggregation to InstallationCount with realm constraint
self.assertTableState(InstallationCount, ['value', 'subgroup'], [])
self.assertTableState(StreamCount, [], [])
def test_messages_sent_to_stream_by_is_bot(self) -> None: def test_messages_sent_to_stream_by_is_bot(self) -> None:
stat = COUNT_STATS['messages_in_stream:is_bot:day'] stat = COUNT_STATS['messages_in_stream:is_bot:day']
self.current_property = stat.property self.current_property = stat.property
@@ -545,6 +681,39 @@ class TestCountStats(AnalyticsTestCase):
self.assertTableState(InstallationCount, ['value', 'subgroup'], [[5, 'false'], [2, 'true']]) self.assertTableState(InstallationCount, ['value', 'subgroup'], [[5, 'false'], [2, 'true']])
self.assertTableState(UserCount, [], []) self.assertTableState(UserCount, [], [])
def test_messages_sent_to_stream_by_is_bot_realm_constraint(self) -> None:
# For single Realm
COUNT_STATS = get_count_stats(self.default_realm)
stat = COUNT_STATS['messages_in_stream:is_bot:day']
self.current_property = stat.property
human1 = self.create_user()
bot = self.create_user(is_bot=True)
realm = {'realm': self.second_realm}
stream1, recipient_stream1 = self.create_stream_with_recipient()
stream2, recipient_stream2 = self.create_stream_with_recipient(**realm)
# To be included
self.create_message(human1, recipient_stream1)
self.create_message(bot, recipient_stream1)
# To be excluded
self.create_message(self.hourly_user, recipient_stream2)
self.create_message(self.daily_user, recipient_stream2)
do_fill_count_stat_at_hour(stat, self.TIME_ZERO, self.default_realm)
self.assertTableState(StreamCount, ['value', 'subgroup', 'stream'],
[[1, 'false', stream1],
[1, 'true', stream1]])
self.assertTableState(RealmCount, ['value', 'subgroup', 'realm'],
[[1, 'false'], [1, 'true']])
# No aggregation to InstallationCount with realm constraint
self.assertTableState(InstallationCount, ['value', 'subgroup'], [])
self.assertTableState(UserCount, [], [])
def create_interval(self, user: UserProfile, start_offset: timedelta, def create_interval(self, user: UserProfile, start_offset: timedelta,
end_offset: timedelta) -> None: end_offset: timedelta) -> None:
UserActivityInterval.objects.create( UserActivityInterval.objects.create(
@@ -594,6 +763,34 @@ class TestCountStats(AnalyticsTestCase):
self.assertTableState(InstallationCount, ['value'], [[6]]) self.assertTableState(InstallationCount, ['value'], [[6]])
self.assertTableState(StreamCount, [], []) self.assertTableState(StreamCount, [], [])
def test_1day_actives_realm_constraint(self) -> None:
# For single Realm
COUNT_STATS = get_count_stats(self.default_realm)
stat = COUNT_STATS['1day_actives::day']
self.current_property = stat.property
_1day = 1*self.DAY - UserActivityInterval.MIN_INTERVAL_LENGTH
user1 = self.create_user()
user2 = self.create_user()
# To be included
self.create_interval(user1, 20*self.HOUR, 19*self.HOUR)
self.create_interval(user2, _1day + self.DAY, _1day)
# To be excluded
user3 = self.create_user(realm=self.second_realm)
self.create_interval(user3, 20*self.MINUTE, 19*self.MINUTE)
do_fill_count_stat_at_hour(stat, self.TIME_ZERO, self.default_realm)
self.assertTableState(UserCount, ['value', 'user'],
[[1, user2], [1, user2]])
self.assertTableState(RealmCount, ['value', 'realm'],
[[2, self.default_realm]])
# No aggregation to InstallationCount with realm constraint
self.assertTableState(InstallationCount, ['value'], [])
self.assertTableState(StreamCount, [], [])
def test_15day_actives(self) -> None: def test_15day_actives(self) -> None:
stat = COUNT_STATS['15day_actives::day'] stat = COUNT_STATS['15day_actives::day']
self.current_property = stat.property self.current_property = stat.property
@@ -637,6 +834,36 @@ class TestCountStats(AnalyticsTestCase):
self.assertTableState(InstallationCount, ['value'], [[6]]) self.assertTableState(InstallationCount, ['value'], [[6]])
self.assertTableState(StreamCount, [], []) self.assertTableState(StreamCount, [], [])
def test_15day_actives_realm_constraint(self) -> None:
# For single Realm
COUNT_STATS = get_count_stats(self.default_realm)
stat = COUNT_STATS['15day_actives::day']
self.current_property = stat.property
_15day = 15*self.DAY - UserActivityInterval.MIN_INTERVAL_LENGTH
user1 = self.create_user()
user2 = self.create_user()
user3 = self.create_user(realm=self.second_realm)
# To be included
self.create_interval(user1, _15day + self.DAY, _15day)
self.create_interval(user2, 20*self.HOUR, 19*self.HOUR)
# To be excluded
self.create_interval(user3, 20*self.HOUR, 19*self.HOUR)
do_fill_count_stat_at_hour(stat, self.TIME_ZERO, self.default_realm)
self.assertTableState(UserCount, ['value', 'user'],
[[1, user1], [1, user2]])
self.assertTableState(RealmCount, ['value', 'realm'],
[[2, self.default_realm]])
# No aggregation to InstallationCount with realm constraint
self.assertTableState(InstallationCount, ['value'], [])
self.assertTableState(StreamCount, [], [])
def test_minutes_active(self) -> None: def test_minutes_active(self) -> None:
stat = COUNT_STATS['minutes_active::day'] stat = COUNT_STATS['minutes_active::day']
self.current_property = stat.property self.current_property = stat.property
@@ -679,6 +906,35 @@ class TestCountStats(AnalyticsTestCase):
self.assertTableState(InstallationCount, ['value'], [[61 + 121 + 24*60 + 1]]) self.assertTableState(InstallationCount, ['value'], [[61 + 121 + 24*60 + 1]])
self.assertTableState(StreamCount, [], []) self.assertTableState(StreamCount, [], [])
def test_minutes_active_realm_constraint(self) -> None:
# For single Realm
COUNT_STATS = get_count_stats(self.default_realm)
stat = COUNT_STATS['minutes_active::day']
self.current_property = stat.property
# Outside time range, should not appear. Also testing for intervals
# starting and ending on boundary
user1 = self.create_user()
user2 = self.create_user()
user3 = self.create_user(realm=self.second_realm)
# To be included
self.create_interval(user1, 20*self.HOUR, 19*self.HOUR)
self.create_interval(user2, 20*self.MINUTE, 19*self.MINUTE)
# To be excluded
self.create_interval(user3, 20*self.MINUTE, 19*self.MINUTE)
do_fill_count_stat_at_hour(stat, self.TIME_ZERO, self.default_realm)
self.assertTableState(UserCount, ['value', 'user'],
[[60, user1], [1, user2]])
self.assertTableState(RealmCount, ['value', 'realm'],
[[60 + 1, self.default_realm]])
# No aggregation to InstallationCount with realm constraint
self.assertTableState(InstallationCount, ['value'], [])
self.assertTableState(StreamCount, [], [])
class TestDoAggregateToSummaryTable(AnalyticsTestCase): class TestDoAggregateToSummaryTable(AnalyticsTestCase):
# do_aggregate_to_summary_table is mostly tested by the end to end # do_aggregate_to_summary_table is mostly tested by the end to end
# nature of the tests in TestCountStats. But want to highlight one # nature of the tests in TestCountStats. But want to highlight one