From d260a2263790e11532d7ec20b18d0c9617856235 Mon Sep 17 00:00:00 2001 From: umkay Date: Fri, 29 Jul 2016 12:52:45 -0700 Subject: [PATCH] Add a new statistics/analytics framework. This is a first pass at building a framework for collecting various stats about realms, users, streams, etc. Includes: * New analytics tables for storing counts data * Raw SQL queries for pulling data from zerver/models.py tables * Aggregation functions for aggregating hourly stats into daily stats, and aggregating user/stream level stats into realm level stats * A management command for pulling the data Note that counts.py was added to the linter exclude list due to errors around %%s. --- analytics/lib/__init__.py | 0 analytics/lib/counts.py | 256 +++++++++++++ analytics/lib/interval.py | 70 ++++ .../commands/update_analytics_counts.py | 60 +++ analytics/migrations/0001_initial.py | 112 ++++++ analytics/migrations/__init__.py | 0 analytics/models.py | 140 +++++++ analytics/tests/__init__.py | 0 analytics/tests/test_counts.py | 346 ++++++++++++++++++ analytics/tests/test_interval.py | 28 ++ scripts/lib/pythonrc.py | 1 + tools/lint-all | 4 +- tools/test-backend | 3 +- zerver/lib/timestamp.py | 10 + 14 files changed, 1028 insertions(+), 2 deletions(-) create mode 100644 analytics/lib/__init__.py create mode 100644 analytics/lib/counts.py create mode 100644 analytics/lib/interval.py create mode 100644 analytics/management/commands/update_analytics_counts.py create mode 100644 analytics/migrations/0001_initial.py create mode 100644 analytics/migrations/__init__.py create mode 100644 analytics/models.py create mode 100644 analytics/tests/__init__.py create mode 100644 analytics/tests/test_counts.py create mode 100644 analytics/tests/test_interval.py diff --git a/analytics/lib/__init__.py b/analytics/lib/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/analytics/lib/counts.py b/analytics/lib/counts.py new file mode 100644 index 0000000000..8ce80b3233 --- /dev/null +++ b/analytics/lib/counts.py @@ -0,0 +1,256 @@ +from django.db import connection, models +from datetime import timedelta, datetime + +from analytics.models import InstallationCount, RealmCount, \ + UserCount, StreamCount, HuddleCount, BaseCount +from analytics.lib.interval import TimeInterval, timeinterval_range +from zerver.models import Realm, UserProfile, Message, Stream, models + +from typing import Any, Optional, Type +from six import text_type + +class CountStat(object): + def __init__(self, property, zerver_count_query, filter_args, smallest_interval, frequency): + # type: (text_type, ZerverCountQuery, Dict[str, bool], str, str) -> None + self.property = property + self.zerver_count_query = zerver_count_query + # might have to do something different for bitfields + self.filter_args = filter_args + self.smallest_interval = smallest_interval + self.frequency = frequency + +class ZerverCountQuery(object): + def __init__(self, zerver_table, analytics_table, query): + # type: (Type[models.Model], Type[BaseCount], text_type) -> None + self.zerver_table = zerver_table + self.analytics_table = analytics_table + self.query = query + +def process_count_stat(stat, range_start, range_end): + # type: (CountStat, datetime, datetime) -> None + # stats that hit the prod database + for time_interval in timeinterval_range(range_start, range_end, stat.smallest_interval, stat.frequency): + do_pull_from_zerver(stat, time_interval) + + # aggregate hour to day + for time_interval in timeinterval_range(range_start, range_end, 'day', stat.frequency): + if stat.smallest_interval == 'hour': + do_aggregate_hour_to_day(stat, time_interval) + + # aggregate to summary tables + for interval in ['hour', 'day', 'gauge']: + for frequency in ['hour', 'day']: + for time_interval in timeinterval_range(range_start, range_end, interval, frequency): + analytics_table = stat.zerver_count_query.analytics_table + if stat.smallest_interval <= interval and stat.frequency == frequency and \ + analytics_table in (UserCount, StreamCount): + do_aggregate_to_summary_table(stat, time_interval, analytics_table, RealmCount) + do_aggregate_to_summary_table(stat, time_interval, RealmCount, InstallationCount) + + +# There are only two summary tables at the moment: RealmCount and InstallationCount. +# Will have to generalize this a bit if more are added +def do_aggregate_to_summary_table(stat, time_interval, from_table, to_table): + # type: (CountStat, TimeInterval, Type[BaseCount], Type[BaseCount]) -> None + if to_table == RealmCount: + id_cols = 'realm_id,' + group_by = 'GROUP BY realm_id' + elif to_table == InstallationCount: + id_cols = '' + group_by = '' + else: + raise ValueError("%s is not a summary table" % (to_table,)) + + if to_table.objects.filter(property = stat.property, + end_time = time_interval.end, + interval = time_interval.interval).exists(): + return + + query = """ + INSERT INTO %(to_table)s (%(id_cols)s value, property, end_time, interval) + SELECT %(id_cols)s COALESCE (sum(value), 0), '%(property)s', %%(end_time)s, '%(interval)s' + FROM %(from_table)s WHERE + ( + property = '%(property)s' AND + end_time = %%(end_time)s AND + interval = '%(interval)s' + ) + %(group_by)s + """ % {'to_table': to_table._meta.db_table, + 'id_cols' : id_cols, + 'from_table' : from_table._meta.db_table, + 'property' : stat.property, + 'interval' : time_interval.interval, + 'group_by' : group_by} + cursor = connection.cursor() + cursor.execute(query, {'end_time': time_interval.end}) + cursor.close() + +def do_aggregate_hour_to_day(stat, time_interval): + # type: (CountStat, TimeInterval) -> None + table = stat.zerver_count_query.analytics_table + id_cols = ''.join([col + ', ' for col in table.extended_id()]) + group_by = 'GROUP BY %s' % id_cols if id_cols else '' + + if table.objects.filter(property = stat.property, + end_time = time_interval.end, + interval = time_interval.interval).exists(): + return + + query = """ + INSERT INTO %(table)s (%(id_cols)s value, property, end_time, interval) + SELECT %(id_cols)s sum(value), '%(property)s', %%(end_time)s, 'day' + FROM %(table)s WHERE + ( + property = '%(property)s' AND + end_time > %%(time_start)s AND + end_time <= %%(time_end)s AND + interval = 'hour' + ) + %(group_by)s property + """ % {'table': table._meta.db_table, + 'id_cols' : id_cols, + 'group_by' : group_by, + 'property': stat.property} + cursor = connection.cursor() + cursor.execute(query, {'end_time': time_interval.end, + 'time_start': time_interval.end - timedelta(days=1), + 'time_end': time_interval.end}) + cursor.close() + +## methods that hit the prod databases directly +# No left joins in Django ORM yet, so have to use raw SQL :( +# written in slightly more than needed generality, to reduce copy-paste errors +# as more of these are made / make it easy to extend to a pull_X_by_realm + +def do_pull_from_zerver(stat, time_interval): + # type: (CountStat, TimeInterval) -> None + zerver_table = stat.zerver_count_query.zerver_table._meta.db_table # type: ignore + join_args = ' '.join('AND %s.%s = %s' % (zerver_table, key, value) \ + for key, value in stat.filter_args.items()) + + if stat.zerver_count_query.analytics_table.objects \ + .filter(property = stat.property, + end_time = time_interval.end, + interval = time_interval.interval) \ + .exists(): + return + + # We do string replacement here because passing join_args as a param + # may result in problems when running cursor.execute; we do + # the string formatting prior so that cursor.execute runs it as sql + query_ = stat.zerver_count_query.query % {'zerver_table' : zerver_table, + 'property' : stat.property, + 'interval' : time_interval.interval, + 'join_args' : join_args} + cursor = connection.cursor() + cursor.execute(query_, {'time_start': time_interval.start, 'time_end': time_interval.end}) + cursor.close() + +count_user_by_realm_query = """ + INSERT INTO analytics_realmcount + (realm_id, value, property, end_time, interval) + SELECT + zerver_realm.id, count(%(zerver_table)s),'%(property)s', %%(time_end)s, '%(interval)s' + FROM zerver_realm + LEFT JOIN zerver_userprofile + ON + ( + zerver_userprofile.realm_id = zerver_realm.id AND + zerver_userprofile.date_joined >= %%(time_start)s AND + zerver_userprofile.date_joined < %%(time_end)s + %(join_args)s + ) + WHERE + zerver_realm.date_created < %%(time_end)s + GROUP BY zerver_realm.id +""" +zerver_count_user_by_realm = ZerverCountQuery(UserProfile, RealmCount, count_user_by_realm_query) + +# currently .sender_id is only Message specific thing +count_message_by_user_query = """ + INSERT INTO analytics_usercount + (user_id, realm_id, value, property, end_time, interval) + SELECT + zerver_userprofile.id, zerver_userprofile.realm_id, count(*), '%(property)s', %%(time_end)s, '%(interval)s' + FROM zerver_userprofile + JOIN zerver_message + ON + ( + zerver_message.sender_id = zerver_userprofile.id AND + zerver_message.pub_date >= %%(time_start)s AND + zerver_message.pub_date < %%(time_end)s + %(join_args)s + ) + WHERE + zerver_userprofile.date_joined < %%(time_end)s + GROUP BY zerver_userprofile.id +""" +zerver_count_message_by_user = ZerverCountQuery(Message, UserCount, count_message_by_user_query) + +count_message_by_stream_query = """ + INSERT INTO analytics_streamcount + (stream_id, realm_id, value, property, end_time, interval) + SELECT + zerver_stream.id, zerver_stream.realm_id, count(*), '%(property)s', %%(time_end)s, '%(interval)s' + FROM zerver_stream + INNER JOIN zerver_recipient + ON + ( + zerver_recipient.type = 2 AND + zerver_stream.id = zerver_recipient.type_id + ) + INNER JOIN zerver_message + ON + ( + zerver_message.recipient_id = zerver_recipient.id AND + zerver_message.pub_date >= %%(time_start)s AND + zerver_message.pub_date < %%(time_end)s AND + zerver_stream.date_created < %%(time_end)s + %(join_args)s + ) + GROUP BY zerver_stream.id +""" +zerver_count_message_by_stream = ZerverCountQuery(Message, StreamCount, count_message_by_stream_query) + +count_stream_by_realm_query = """ + INSERT INTO analytics_realmcount + (realm_id, value, property, end_time, interval) + SELECT + zerver_stream.realm_id, count(*), '%(property)s', %%(time_end)s, '%(interval)s' + FROM zerver_stream + LEFT JOIN zerver_recipient + ON + ( + zerver_recipient.type = 2 AND + zerver_stream.id = zerver_recipient.type_id + ) + GROUP BY zerver_stream.realm_id +""" +zerver_count_stream_by_realm = ZerverCountQuery(Stream, RealmCount, count_stream_by_realm_query) + +count_message_by_huddle_query = """ + INSERT INTO analytics_huddlecount + (huddle_id, user_id, value, property, end_time, interval) + SELECT + zerver_message.recipient_id, zerver_message.sender_id, count(*), '%(property)s', %%(time_end)s, '%(interval)s' + FROM zerver_message + INNER JOIN zerver_recipient + ON + ( + zerver_recipient.type = 3 AND + zerver_message.recipient_id = zerver_recipient.id AND + zerver_message.pub_date >= %%(time_start)s AND + zerver_message.pub_date < %%(time_end)s + %(join_args)s + ) + GROUP BY zerver_message.recipient_id, zerver_message.sender_id +""" +zerver_count_message_by_huddle = ZerverCountQuery(Message, HuddleCount, count_message_by_huddle_query) + +COUNT_STATS = { + 'active_humans': CountStat('active_humans', zerver_count_user_by_realm, + {'is_bot': False, 'is_active': True}, 'gauge', 'day'), + 'active_bots': CountStat('active_bots', zerver_count_user_by_realm, + {'is_bot': True, 'is_active': True}, 'gauge', 'day'), + 'messages_sent': CountStat('messages_sent', zerver_count_message_by_user, {}, 'hour', 'hour')} diff --git a/analytics/lib/interval.py b/analytics/lib/interval.py new file mode 100644 index 0000000000..aaae9eab1d --- /dev/null +++ b/analytics/lib/interval.py @@ -0,0 +1,70 @@ +from django.utils import timezone +from datetime import datetime, timedelta, MINYEAR + +from zerver.lib.timestamp import is_timezone_aware +from six import text_type + +MIN_TIME = datetime(MINYEAR, 1, 1, 0, 0, 0, tzinfo=timezone.utc) + +# Name isn't great .. fixedinterval? timerange? Trying to distinguish +# generic intervals like 'hour' or 'quarter' from fixed intervals like +# 'Aug 3 2016 from 9-10am' +class TimeInterval(object): + def __init__(self, interval, end = timezone.now(), floor_to_boundary = 'hour'): + # type: (str, datetime, str) -> None + # Not the best logic for when we have intervals like 'quarter', but okay for now + if not is_timezone_aware(end): + raise ValueError("end must be timezone aware") + if floor_to_boundary is None: + self.end = end + else: + self.end = floor_to_interval_boundary(end, floor_to_boundary) + self.interval = interval + if interval == 'gauge': + self.start = MIN_TIME + else: + self.start = subtract_interval(self.end, interval) + +# Perhaps the right way to do the next two functions is to have an interval class +# (subclassed to hourinterval, dayinterval, etc) with methods like floor, +# subtract, and subinterval. Seems like overkill for now, though. +def floor_to_interval_boundary(datetime_object, interval): + # type: (datetime, text_type) -> datetime + # datetime objects are (year, month, day, hour, minutes, seconds, microseconds) + if interval == 'day': + return datetime(*datetime_object.timetuple()[:3]).replace(tzinfo=datetime_object.tzinfo) + elif interval == 'hour': + return datetime(*datetime_object.timetuple()[:4]).replace(tzinfo=datetime_object.tzinfo) + else: + raise ValueError("Unknown or unfloorable interval", interval) + +# Don't have to worry about leap seconds, since datetime doesn't support it +def subtract_interval(datetime_object, interval): + # type: (datetime, str) -> datetime + if interval == 'day': + return datetime_object - timedelta(days = 1) + elif interval == 'hour': + return datetime_object - timedelta(seconds = 3600) + else: + raise ValueError("Unknown or unarithmetic interval", interval) + +def subintervals(interval): + # type: (str) -> List[str] + if interval == 'day': + return ['day', 'hour'] + elif interval == 'hour': + return ['hour',] + elif interval == 'gauge': + return ['gauge',] + else: + raise ValueError("Unknown interval", interval) + +def timeinterval_range(first, last, interval, step_interval): + # type: (datetime, datetime, str, str) -> List[TimeInterval] + end = floor_to_interval_boundary(last, step_interval) + ans = [] + while end >= first: + ans.append(TimeInterval(interval, end, floor_to_boundary=None)) + end = subtract_interval(end, step_interval) + ans.reverse() + return ans diff --git a/analytics/management/commands/update_analytics_counts.py b/analytics/management/commands/update_analytics_counts.py new file mode 100644 index 0000000000..d0a472d9f1 --- /dev/null +++ b/analytics/management/commands/update_analytics_counts.py @@ -0,0 +1,60 @@ +from argparse import ArgumentParser +from datetime import timedelta + +from django.core.management.base import BaseCommand +from django.utils import timezone +from django.utils.dateparse import parse_datetime + +from analytics.models import RealmCount, UserCount +from analytics.lib.counts import COUNT_STATS, CountStat, process_count_stat +from zerver.lib.timestamp import datetime_to_string, is_timezone_aware +from zerver.models import UserProfile, Message + +from typing import Any + +class Command(BaseCommand): + help = """Fills Analytics tables. + + Run as a cron job that runs every hour.""" + + def add_arguments(self, parser): + # type: (ArgumentParser) -> None + parser.add_argument('--range-start', '-s', + type=str, + help="Time to backfill from.") + parser.add_argument('--range-end', '-e', + type=str, + help='Time to backfill to.', + default=datetime_to_string(timezone.now())) + parser.add_argument('--utc', + type=bool, + help="Interpret --range-start and --range-end as times in UTC.", + default=False) + parser.add_argument('--stat', '-q', + type=str, + help="CountStat to process. If omitted, all stats are processed") + + def handle(self, *args, **options): + # type: (*Any, **Any) -> None + range_start = parse_datetime(options['range_start']) + if 'range_end' in options: + range_end = parse_datetime(options['range_end']) + else: + range_end = range_start - timedelta(seconds = 3600) + + # throw error if start time is greater than end time + if range_start > range_end: + raise ValueError("--range-start cannot be greater than --range-end.") + + if options['utc'] is True: + range_start = range_start.replace(tzinfo=timezone.utc) + range_end = range_end.replace(tzinfo=timezone.utc) + + if not (is_timezone_aware(range_start) and is_timezone_aware(range_end)): + raise ValueError("--range-start and --range-end must be timezone aware. Maybe you meant to use the --utc option?") + + if 'stat' in options: + process_count_stat(COUNT_STATS[options['stat']], range_start, range_end) + else: + for stat in COUNT_STATS.values(): + process_count_stat(stat, range_start, range_end) diff --git a/analytics/migrations/0001_initial.py b/analytics/migrations/0001_initial.py new file mode 100644 index 0000000000..0c6f1cd63d --- /dev/null +++ b/analytics/migrations/0001_initial.py @@ -0,0 +1,112 @@ +# -*- coding: utf-8 -*- +from __future__ import unicode_literals + +from django.db import models, migrations +from django.conf import settings +import zerver.lib.str_utils + + +class Migration(migrations.Migration): + + dependencies = [ + ('zerver', '0028_userprofile_tos_version'), + migrations.swappable_dependency(settings.AUTH_USER_MODEL), + ] + + operations = [ + migrations.CreateModel( + name='Anomaly', + fields=[ + ('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)), + ('info', models.CharField(max_length=1000)), + ], + bases=(zerver.lib.str_utils.ModelReprMixin, models.Model), + ), + migrations.CreateModel( + name='HuddleCount', + fields=[ + ('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)), + ('value', models.BigIntegerField()), + ('property', models.CharField(max_length=40)), + ('end_time', models.DateTimeField()), + ('interval', models.CharField(max_length=20)), + ('anomaly', models.ForeignKey(to='analytics.Anomaly', null=True)), + ('huddle', models.ForeignKey(to='zerver.Recipient')), + ('user', models.ForeignKey(to=settings.AUTH_USER_MODEL)), + ], + bases=(zerver.lib.str_utils.ModelReprMixin, models.Model), + ), + migrations.CreateModel( + name='InstallationCount', + fields=[ + ('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)), + ('value', models.BigIntegerField()), + ('property', models.CharField(max_length=40)), + ('end_time', models.DateTimeField()), + ('interval', models.CharField(max_length=20)), + ('anomaly', models.ForeignKey(to='analytics.Anomaly', null=True)), + ], + bases=(zerver.lib.str_utils.ModelReprMixin, models.Model), + ), + migrations.CreateModel( + name='RealmCount', + fields=[ + ('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)), + ('value', models.BigIntegerField()), + ('property', models.CharField(max_length=40)), + ('end_time', models.DateTimeField()), + ('interval', models.CharField(max_length=20)), + ('anomaly', models.ForeignKey(to='analytics.Anomaly', null=True)), + ('realm', models.ForeignKey(to='zerver.Realm')), + ], + bases=(zerver.lib.str_utils.ModelReprMixin, models.Model), + ), + migrations.CreateModel( + name='StreamCount', + fields=[ + ('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)), + ('value', models.BigIntegerField()), + ('property', models.CharField(max_length=40)), + ('end_time', models.DateTimeField()), + ('interval', models.CharField(max_length=20)), + ('anomaly', models.ForeignKey(to='analytics.Anomaly', null=True)), + ('realm', models.ForeignKey(to='zerver.Realm')), + ('stream', models.ForeignKey(to='zerver.Stream')), + ], + bases=(zerver.lib.str_utils.ModelReprMixin, models.Model), + ), + migrations.CreateModel( + name='UserCount', + fields=[ + ('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)), + ('value', models.BigIntegerField()), + ('property', models.CharField(max_length=40)), + ('end_time', models.DateTimeField()), + ('interval', models.CharField(max_length=20)), + ('anomaly', models.ForeignKey(to='analytics.Anomaly', null=True)), + ('realm', models.ForeignKey(to='zerver.Realm')), + ('user', models.ForeignKey(to=settings.AUTH_USER_MODEL)), + ], + bases=(zerver.lib.str_utils.ModelReprMixin, models.Model), + ), + migrations.AlterUniqueTogether( + name='usercount', + unique_together=set([('user', 'property', 'end_time', 'interval')]), + ), + migrations.AlterUniqueTogether( + name='streamcount', + unique_together=set([('stream', 'property', 'end_time', 'interval')]), + ), + migrations.AlterUniqueTogether( + name='realmcount', + unique_together=set([('realm', 'property', 'end_time', 'interval')]), + ), + migrations.AlterUniqueTogether( + name='installationcount', + unique_together=set([('property', 'end_time', 'interval')]), + ), + migrations.AlterUniqueTogether( + name='huddlecount', + unique_together=set([('huddle', 'property', 'end_time', 'interval')]), + ), + ] diff --git a/analytics/migrations/__init__.py b/analytics/migrations/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/analytics/models.py b/analytics/models.py new file mode 100644 index 0000000000..25935b9d19 --- /dev/null +++ b/analytics/models.py @@ -0,0 +1,140 @@ +from django.db import models + +from zerver.models import Realm, UserProfile, Stream, Recipient +from zerver.lib.str_utils import ModelReprMixin +import datetime + +from six import text_type +from typing import Optional, Tuple, Union + +from analytics.lib.interval import MIN_TIME + +# would only ever make entries here by hand +class Anomaly(ModelReprMixin, models.Model): + info = models.CharField(max_length=1000) # type: text_type + + def __unicode__(self): + # type: () -> text_type + return u"" % (self.info, self.id) + +class BaseCount(ModelReprMixin, models.Model): + value = models.BigIntegerField() # type: int + property = models.CharField(max_length=40) # type: text_type + end_time = models.DateTimeField() # type: datetime.datetime + interval = models.CharField(max_length=20) # type: text_type + anomaly = models.ForeignKey(Anomaly, null=True) # type: Optional[Anomaly] + + class Meta(object): + abstract = True + + @staticmethod + def extended_id(): + # type: () -> Tuple[str, ...] + raise NotImplementedError + + @staticmethod + def key_model(): + # type: () -> models.Model + raise NotImplementedError + +class InstallationCount(BaseCount): + + class Meta(object): + unique_together = ("property", "end_time", "interval") + + @staticmethod + def extended_id(): + # type: () -> Tuple[str, ...] + return () + + @staticmethod + def key_model(): + # type: () -> models.Model + return None + + def __unicode__(self): + # type: () -> text_type + return u"" % (self.property, self.value, self.id) + +class RealmCount(BaseCount): + realm = models.ForeignKey(Realm) + + class Meta(object): + unique_together = ("realm", "property", "end_time", "interval") + + @staticmethod + def extended_id(): + # type: () -> Tuple[str, ...] + return ('realm_id',) + + @staticmethod + def key_model(): + # type: () -> models.Model + return Realm + + def __unicode__(self): + # type: () -> text_type + return u"" % (self.realm, self.property, self.value, self.id) + +class UserCount(BaseCount): + user = models.ForeignKey(UserProfile) + realm = models.ForeignKey(Realm) + + class Meta(object): + unique_together = ("user", "property", "end_time", "interval") + + @staticmethod + def extended_id(): + # type: () -> Tuple[str, ...] + return ('user_id', 'realm_id') + + @staticmethod + def key_model(): + # type: () -> models.Model + return UserProfile + + def __unicode__(self): + # type: () -> text_type + return u"" % (self.user, self.property, self.value, self.id) + +class StreamCount(BaseCount): + stream = models.ForeignKey(Stream) + realm = models.ForeignKey(Realm) + + class Meta(object): + unique_together = ("stream", "property", "end_time", "interval") + + @staticmethod + def extended_id(): + # type: () -> Tuple[str, ...] + return ('stream_id', 'realm_id') + + @staticmethod + def key_model(): + # type: () -> models.Model + return Stream + + def __unicode__(self): + # type: () -> text_type + return u"" % (self.stream, self.property, self.value, self.id) + +class HuddleCount(BaseCount): + huddle = models.ForeignKey(Recipient) + user = models.ForeignKey(UserProfile) + + class Meta(object): + unique_together = ("huddle", "property", "end_time", "interval") + + @staticmethod + def extended_id(): + # type: () -> Tuple[str, ...] + return ('huddle_id', 'user_id') + + @staticmethod + def key_model(): + # type: () -> models.Model + return Recipient + + def __unicode__(self): + # type: () -> text_type + return u"" % (self.huddle, self.property, self.value, self.id) diff --git a/analytics/tests/__init__.py b/analytics/tests/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/analytics/tests/test_counts.py b/analytics/tests/test_counts.py new file mode 100644 index 0000000000..7bf9544614 --- /dev/null +++ b/analytics/tests/test_counts.py @@ -0,0 +1,346 @@ +from django.utils import timezone +from django.utils.dateparse import parse_datetime +from django.test import TestCase + +from datetime import datetime, timedelta + +from analytics.lib.interval import TimeInterval +from analytics.lib.counts import CountStat, process_count_stat, \ + zerver_count_user_by_realm, zerver_count_message_by_user, \ + zerver_count_message_by_stream, zerver_count_stream_by_realm, \ + zerver_count_message_by_huddle +from analytics.models import UserCount, RealmCount, StreamCount, InstallationCount, Stream, Recipient + +from zerver.lib.test_helpers import make_client, get_stream +from zerver.models import Realm, UserProfile, Message, get_user_profile_by_email + +from typing import Any +from six import text_type + +def do_update_past_hour(stat): + # type: (CountStat) -> Any + return process_count_stat(stat, range_start=timezone.now() - timedelta(seconds=3600), + range_end=timezone.now()) + +class TestDataCollectors(TestCase): + def create_user(self, email, **kwargs): + # type: (str, **Any) -> UserProfile + defaults = {'realm': self.realm, + 'full_name': 'full_name', + 'short_name': 'short_name', + 'pointer': -1, + 'last_pointer_updater': 'seems unused?', + 'api_key': '42'} + for key, value in defaults.items(): + kwargs[key] = kwargs.get(key, value) + user = UserProfile(email=email, **kwargs) + user.save() + return user + + def create_stream(self, **kwargs): + # type: (**Any) -> Stream + defaults = {'realm': self.realm, + 'description': ''} + for key, value in defaults.items(): + kwargs[key] = kwargs.get(key, value) + stream = Stream(**kwargs) + stream.save() + return stream + + def create_message(self, sender, recipient, **kwargs): + # type: (UserProfile, Recipient, **Any) -> Message + sending_client = make_client(name="analytics test") + defaults = { + 'sender': sender, + 'recipient': recipient, + 'subject': 'whatever', + 'content': 'hello **world**', + 'pub_date': timezone.now(), + 'sending_client': sending_client, + 'last_edit_time': timezone.now(), + 'edit_history': '[]' + } + for key, value in defaults.items(): + kwargs[key] = kwargs.get(key, value) + message = Message(**kwargs) + message.save() + return message + + # TODO uk refactor tests to use this assert + def assertRealmCountEquals(self, realm, property, interval, value): + # type: (Realm, str, str, Any) -> None + realm_count_value = RealmCount.objects.filter(realm=realm, + property=property, + interval=interval).values_list('value', flat=True)[0] + + self.assertEqual(realm_count_value, value) + + def setUp(self): + # type: () -> None + # almost every test will need a time_interval, realm, and user + end = timezone.now() + timedelta(seconds=7200) + self.day_interval = TimeInterval('day', end, 'hour') + self.hour_interval = TimeInterval('hour', end, 'hour') + self.realm = Realm(domain='analytics.test', name='Realm Test', + date_created=parse_datetime('2016-09-27 01:00:50+00:00')) + self.realm.save() + # don't pull the realm object back from the database every time we need its id + self.realm_id = self.realm.id + self.user = self.create_user('email', is_bot=False, is_active=True, + date_joined=parse_datetime('2016-09-27 04:20:50+00:00')) + self.user_id = self.user.id + + def test_human_and_bot_count_by_realm(self): + # type: () -> None + + stats = [ + CountStat('test_active_humans', zerver_count_user_by_realm, {'is_bot': False, 'is_active': True}, + 'hour', 'hour'), + CountStat('test_active_bots', zerver_count_user_by_realm, {'is_bot': True, 'is_active': True}, + 'hour', 'hour')] + + # TODO these dates should probably be explicit, since the default args for the commands are timezone.now() dependent. + self.create_user('test_bot1', is_bot=True, is_active=True, + date_joined=timezone.now() - timedelta(hours=1)) + self.create_user('test_bot2', is_bot=True, is_active=True, + date_joined=timezone.now() - timedelta(hours=1)) + self.create_user('test_human', is_bot=False, is_active=True, + date_joined=timezone.now() - timedelta(hours=1)) + + for stat in stats: + do_update_past_hour(stat) + + human_row = RealmCount.objects.filter(realm=self.realm, interval='day', + property='test_active_humans') \ + .values_list('value', flat=True)[0] + assert (human_row == 1) + + bot_row = RealmCount.objects.filter(realm=self.realm, interval='day', + property='test_active_bots') \ + .values_list('value', flat=True)[0] + assert (bot_row == 2) + + # test users added in last hour + def test_add_new_users(self): + # type: () -> None + stat = CountStat('add_new_user_test', zerver_count_user_by_realm, {}, 'hour', 'hour') + + # add new users to realm in last hour + self.create_user('email_1', date_joined=parse_datetime('2016-09-27 03:22:50+00:00')) + self.create_user('email_2', date_joined=parse_datetime('2016-09-27 03:15:50+00:00')) + + # add a new user before an hour + self.create_user('email_3', date_joined=parse_datetime('2016-09-27 02:10:50+00:00')) + + # check if user added before the hour is not included + process_count_stat(stat, range_start=parse_datetime('2016-09-27 03:00:50+00:00'), + range_end=parse_datetime('2016-09-27 04:00:50+00:00')) + # do_update is writing the stat.property to all zerver tables + row = RealmCount.objects.filter(realm=self.realm, property='add_new_user_test', + interval='hour').values_list('value', flat=True)[0] + # assert only 2 users + assert (row == 2) + + def test_analytics_stat_write(self): + # type: () -> None + # might change if we refactor count_query + + stat = CountStat('test_stat_write', zerver_count_stream_by_realm, + {'invite_only': False}, 'hour', 'hour') + + # add some stuff to zerver_* + self.create_stream(name='stream1', description='test_analytics_stream', + date_created=parse_datetime('2016-09-27 02:10:50+00:00')) + self.create_stream(name='stream2', description='test_analytics_stream', + date_created=parse_datetime('2016-09-27 02:10:50+00:00')) + self.create_stream(name='stream3', description='test_analytics_stream', + date_created=parse_datetime('2016-09-27 02:10:50+00:00')) + + # run do_pull_from_zerver + do_update_past_hour(stat) + + # check analytics_* values are correct + row = RealmCount.objects.filter(realm=self.realm, interval='day', + property='test_stat_write').values_list('value', flat=True)[0] + assert (row == 3) + + # test if process count does nothing if count already processed + def test_process_count(self): + # type: () -> None + # add some active and inactive users that are human + self.create_user('inactive_human_1', is_bot=False, is_active=False, + date_joined=timezone.now() - timedelta(hours=1)) + self.create_user('inactive_human_2', is_bot=False, is_active=False, + date_joined=timezone.now() - timedelta(hours=1)) + self.create_user('active_human', is_bot=False, is_active=True, + date_joined=timezone.now() - timedelta(hours=1)) + + # run stat to pull active humans + stat = CountStat('active_humans', zerver_count_user_by_realm, + {'is_bot': False, 'is_active': True}, 'hour', 'hour') + + do_update_past_hour(stat) + + # get row in analytics table + row_before = RealmCount.objects.filter(realm=self.realm, interval='day', + property='active_humans')\ + .values_list('value', flat=True)[0] + + # run command again + do_update_past_hour(stat) + + # check if row is same as before + row_after = RealmCount.objects.filter(realm=self.realm, interval='day', + property='active_humans').values_list('value', flat=True)[0] + + assert (row_before == 1) + assert (row_before == row_after) + + # test management commands + def test_update_analytics_tables(self): + # type: () -> None + stat = CountStat('test_messages_sent', zerver_count_message_by_user, {}, 'hour', 'hour') + + self.create_user('human1', is_bot=False, is_active=True, + date_joined=parse_datetime('2016-09-27 04:22:50+00:00')) + + human1 = get_user_profile_by_email('human1') + human2 = get_user_profile_by_email('email') + + self.create_message(human1, Recipient(human2.id), + pub_date=parse_datetime('2016-09-27 04:30:50+00:00')) + + # run command + process_count_stat(stat, range_start=parse_datetime('2016-09-27 04:00:50+00:00'), + range_end=parse_datetime('2016-09-27 05:00:50+00:00')) + usercount_row = UserCount.objects.filter(realm=self.realm, interval='hour', + property='test_messages_sent').values_list( + 'value', flat=True)[0] + assert (usercount_row == 1) + + # run command with dates before message creation + process_count_stat(stat, range_start=parse_datetime('2016-09-27 01:00:50+00:00'), + range_end=parse_datetime('2016-09-22 02:00:50+00:00')) + + # check new row has no entries, old ones still there + updated_usercount_row = UserCount.objects.filter( + realm=self.realm, interval='hour', property='test_messages_sent') \ + .values_list('value', flat=True)[0] + + new_row = UserCount.objects.filter(realm=self.realm, interval='hour', property='test_messages_sent', + end_time=datetime(2016, 9, 22, 5, 0).replace(tzinfo=timezone.utc)).exists() + + self.assertFalse(new_row) + + assert (updated_usercount_row == 1) + + def test_do_aggregate(self): + # type: () -> None + + # write some entries to analytics.usercount with smallest interval as day + stat = CountStat('test_messages_aggregate', zerver_count_message_by_user, {}, 'day', 'hour') + + # write some messages + self.create_user('human1', is_bot=False, is_active=True, + date_joined=parse_datetime('2016-09-27 04:22:50+00:00')) + + human1 = get_user_profile_by_email('human1') + human2 = get_user_profile_by_email('email') + + self.create_message(human1, Recipient(human2.id), + pub_date=parse_datetime('2016-09-27 04:30:50+00:00'), + content="hi") + self.create_message(human1, Recipient(human2.id), + pub_date=parse_datetime('2016-09-27 04:30:50+00:00'), + content="hello") + self.create_message(human1, Recipient(human2.id), + pub_date=parse_datetime('2016-09-27 04:30:50+00:00'), + content="bye") + + # run command + process_count_stat(stat, range_start=parse_datetime('2016-09-27 04:00:50+00:00'), + range_end=parse_datetime('2016-09-27 05:00:50+00:00')) + + # check no rows for hour interval on usercount granularity + usercount_row = UserCount.objects.filter(realm=self.realm, interval='hour').exists() + + self.assertFalse(usercount_row) + + # see if aggregated correctly to realmcount and installationcount + realmcount_row = RealmCount.objects.filter(realm=self.realm, interval='day', + property='test_messages_aggregate').values_list( + 'value', flat=True)[0] + assert (realmcount_row == 3) + + installationcount_row = InstallationCount.objects.filter(interval='day', + property='test_messages_aggregate') \ + .values_list('value', flat=True)[0] + assert (installationcount_row == 3) + + def test_message_to_stream_aggregation(self): + # type: () -> None + stat = CountStat('test_messages_to_stream', zerver_count_message_by_stream, {}, 'hour', 'hour') + + # write some messages + user = get_user_profile_by_email('email') + + self.create_stream(name='stream1', description='test_analytics_stream', + date_created=parse_datetime('2016-09-27 03:10:50+00:00')) + + stream1 = get_stream('stream1', self.realm) + recipient = Recipient(type_id=stream1.id, type=2) + recipient.save() + + self.create_message(user, recipient, pub_date=parse_datetime('2016-09-27 04:30:50+00:00'), + content='hi') + + # run command + process_count_stat(stat, range_start=parse_datetime('2016-09-27 04:00:50+00:00'), + range_end=parse_datetime('2016-09-27 05:00:50+00:00')) + + stream_row = StreamCount.objects.filter(realm=self.realm, interval='hour', + property='test_messages_to_stream').values_list( + 'value', flat=True)[0] + assert (stream_row == 1) + + def test_count_before_realm_creation(self): + # type: () -> None + stat = CountStat('test_active_humans', zerver_count_user_by_realm, + {'is_bot': False, 'is_active': True}, 'hour', 'hour') + + self.realm.date_created = parse_datetime('2016-09-30 01:00:50+00:00') + self.realm.save() + self.create_user('human1', is_bot=False, is_active=True, + date_joined=parse_datetime('2016-09-30 04:22:50+00:00')) + + # run count prior to realm creation + process_count_stat(stat, range_start=parse_datetime('2016-09-26 04:00:50+00:00'), + range_end=parse_datetime('2016-09-26 05:00:50+00:00')) + realm_count = RealmCount.objects.values('realm__name', 'value', 'property') \ + .filter(realm=self.realm, interval='hour').exists() + # assert no rows exist + self.assertFalse(realm_count) + + def test_empty_counts_in_realm(self): + # type: () -> None + + # test that rows with empty counts are returned if realm exists + stat = CountStat('test_active_humans', zerver_count_user_by_realm, + {'is_bot': False, 'is_active': True}, 'hour', 'hour') + + self.create_user('human1', is_bot=False, is_active=True, + date_joined=parse_datetime('2016-09-27 02:22:50+00:00')) + + process_count_stat(stat, range_start=parse_datetime('2016-09-27 01:00:50+00:00'), + range_end=parse_datetime('2016-09-27 05:00:50+00:00')) + realm_count = RealmCount.objects.values('end_time', 'value') \ + .filter(realm=self.realm, interval='hour') + empty1 = realm_count.filter(end_time=datetime(2016, 9, 27, 2, 0, tzinfo=timezone.utc)) \ + .values_list('value', flat=True)[0] + empty2 = realm_count.filter(end_time=datetime(2016, 9, 27, 4, 0, tzinfo=timezone.utc)) \ + .values_list('value', flat=True)[0] + nonempty = realm_count.filter(end_time=datetime(2016, 9, 27, 5, 0, tzinfo=timezone.utc)) \ + .values_list('value', flat=True)[0] + assert (empty1 == 0) + assert (empty2 == 0) + assert (nonempty == 1) diff --git a/analytics/tests/test_interval.py b/analytics/tests/test_interval.py new file mode 100644 index 0000000000..f3f7938dea --- /dev/null +++ b/analytics/tests/test_interval.py @@ -0,0 +1,28 @@ +from django.test import TestCase +from django.utils import timezone + +from analytics.lib.interval import TimeInterval, floor_to_interval_boundary, subtract_interval, timeinterval_range + +from datetime import datetime, timedelta + +class TimeIntervalTest(TestCase): + def test_time_interval_creation(self): + # type: () -> None + time_interval = TimeInterval('day', datetime(2016, 4, 29, 3, 14, 15, 926535).replace(tzinfo=timezone.utc)) + self.assertEqual(time_interval.start, datetime(2016, 4, 28, 3, 0, 0).replace(tzinfo=timezone.utc)) + self.assertEqual(time_interval.end, datetime(2016, 4, 29, 3, 0, 0).replace(tzinfo=timezone.utc)) + + def test_datetime_leap_second(self): + # type: () -> None + after_leap = datetime(2015, 7, 1) + self.assertEqual(subtract_interval(after_leap, 'hour'), datetime(2015, 6, 30, 23)) + + def test_timeinterval_range(self): + # type: () -> None + first = datetime(2016, 4, 29, 3, 14, 15, 926535).replace(tzinfo=timezone.utc) + self.assertEqual(len(timeinterval_range(first, first + timedelta(days = 1), 'day', 'hour')), 24) + first_hour = floor_to_interval_boundary(first, 'hour') + self.assertEqual(len(timeinterval_range(first_hour, first_hour + timedelta(days = 1), 'day', 'hour')), 25) + self.assertEqual(len(timeinterval_range(first_hour, first_hour + timedelta(days = 1), 'day', 'day')), 1) + + # TODO: test UTC / timezone flooring stuff diff --git a/scripts/lib/pythonrc.py b/scripts/lib/pythonrc.py index f4f7f2d87a..4f8b36bbe8 100644 --- a/scripts/lib/pythonrc.py +++ b/scripts/lib/pythonrc.py @@ -3,6 +3,7 @@ try: from django.conf import settings from zerver.models import * from zerver.lib.actions import * # type: ignore # Otherwise have duplicate imports with previous line + from analytics.models import * except Exception: import traceback print("\nException importing Zulip core modules on startup!") diff --git a/tools/lint-all b/tools/lint-all index b82964d942..360e178c5b 100755 --- a/tools/lint-all +++ b/tools/lint-all @@ -212,7 +212,9 @@ def build_custom_checkers(by_lang): {'pattern': '".*"%\([a-z_].*\)?$', 'description': 'Missing space around "%"'}, {'pattern': "'.*'%\([a-z_].*\)?$", - 'exclude': set(['tools/lint-all']), + 'exclude': set(['tools/lint-all', + 'analytics/lib/counts.py', + ]), 'exclude_line': set([ ('zerver/views/users.py', "return json_error(_(\"Email '%(email)s' does not belong to domain '%(domain)s'\") %"), diff --git a/tools/test-backend b/tools/test-backend index 9d2552d799..c8bdeeadc0 100755 --- a/tools/test-backend +++ b/tools/test-backend @@ -112,7 +112,8 @@ if __name__ == "__main__": if len(args) == 0: - suites = ["zerver.tests"] + suites = ["zerver.tests", + "analytics.tests"] else: suites = args diff --git a/zerver/lib/timestamp.py b/zerver/lib/timestamp.py index 1349bcbe24..1cb029f9ce 100644 --- a/zerver/lib/timestamp.py +++ b/zerver/lib/timestamp.py @@ -4,6 +4,10 @@ import datetime import calendar from django.utils.timezone import utc +def is_timezone_aware(datetime_object): + # type: (datetime.datetime) -> bool + return datetime_object.tzinfo is not None + def timestamp_to_datetime(timestamp): # type: (float) -> datetime.datetime return datetime.datetime.utcfromtimestamp(float(timestamp)).replace(tzinfo=utc) @@ -11,3 +15,9 @@ def timestamp_to_datetime(timestamp): def datetime_to_timestamp(datetime_object): # type: (datetime.datetime) -> int return calendar.timegm(datetime_object.timetuple()) + +def datetime_to_string(datetime_object): + # type: (datetime.datetime) -> str + assert is_timezone_aware(datetime_object) + date_string = datetime_object.strftime('%Y-%m-%d %H:%M:%S%z') + return date_string