Add a new statistics/analytics framework.

This is a first pass at building a framework for collecting various
stats about realms, users, streams, etc. Includes:
* New analytics tables for storing counts data
* Raw SQL queries for pulling data from zerver/models.py tables
* Aggregation functions for aggregating hourly stats into daily stats, and
  aggregating user/stream level stats into realm level stats
* A management command for pulling the data

Note that counts.py was added to the linter exclude list due to errors
around %%s.
This commit is contained in:
umkay
2016-07-29 12:52:45 -07:00
committed by Tim Abbott
parent 43e21f6fb4
commit d260a22637
14 changed files with 1028 additions and 2 deletions

View File

256
analytics/lib/counts.py Normal file
View File

@@ -0,0 +1,256 @@
from django.db import connection, models
from datetime import timedelta, datetime
from analytics.models import InstallationCount, RealmCount, \
UserCount, StreamCount, HuddleCount, BaseCount
from analytics.lib.interval import TimeInterval, timeinterval_range
from zerver.models import Realm, UserProfile, Message, Stream, models
from typing import Any, Optional, Type
from six import text_type
class CountStat(object):
def __init__(self, property, zerver_count_query, filter_args, smallest_interval, frequency):
# type: (text_type, ZerverCountQuery, Dict[str, bool], str, str) -> None
self.property = property
self.zerver_count_query = zerver_count_query
# might have to do something different for bitfields
self.filter_args = filter_args
self.smallest_interval = smallest_interval
self.frequency = frequency
class ZerverCountQuery(object):
def __init__(self, zerver_table, analytics_table, query):
# type: (Type[models.Model], Type[BaseCount], text_type) -> None
self.zerver_table = zerver_table
self.analytics_table = analytics_table
self.query = query
def process_count_stat(stat, range_start, range_end):
# type: (CountStat, datetime, datetime) -> None
# stats that hit the prod database
for time_interval in timeinterval_range(range_start, range_end, stat.smallest_interval, stat.frequency):
do_pull_from_zerver(stat, time_interval)
# aggregate hour to day
for time_interval in timeinterval_range(range_start, range_end, 'day', stat.frequency):
if stat.smallest_interval == 'hour':
do_aggregate_hour_to_day(stat, time_interval)
# aggregate to summary tables
for interval in ['hour', 'day', 'gauge']:
for frequency in ['hour', 'day']:
for time_interval in timeinterval_range(range_start, range_end, interval, frequency):
analytics_table = stat.zerver_count_query.analytics_table
if stat.smallest_interval <= interval and stat.frequency == frequency and \
analytics_table in (UserCount, StreamCount):
do_aggregate_to_summary_table(stat, time_interval, analytics_table, RealmCount)
do_aggregate_to_summary_table(stat, time_interval, RealmCount, InstallationCount)
# There are only two summary tables at the moment: RealmCount and InstallationCount.
# Will have to generalize this a bit if more are added
def do_aggregate_to_summary_table(stat, time_interval, from_table, to_table):
# type: (CountStat, TimeInterval, Type[BaseCount], Type[BaseCount]) -> None
if to_table == RealmCount:
id_cols = 'realm_id,'
group_by = 'GROUP BY realm_id'
elif to_table == InstallationCount:
id_cols = ''
group_by = ''
else:
raise ValueError("%s is not a summary table" % (to_table,))
if to_table.objects.filter(property = stat.property,
end_time = time_interval.end,
interval = time_interval.interval).exists():
return
query = """
INSERT INTO %(to_table)s (%(id_cols)s value, property, end_time, interval)
SELECT %(id_cols)s COALESCE (sum(value), 0), '%(property)s', %%(end_time)s, '%(interval)s'
FROM %(from_table)s WHERE
(
property = '%(property)s' AND
end_time = %%(end_time)s AND
interval = '%(interval)s'
)
%(group_by)s
""" % {'to_table': to_table._meta.db_table,
'id_cols' : id_cols,
'from_table' : from_table._meta.db_table,
'property' : stat.property,
'interval' : time_interval.interval,
'group_by' : group_by}
cursor = connection.cursor()
cursor.execute(query, {'end_time': time_interval.end})
cursor.close()
def do_aggregate_hour_to_day(stat, time_interval):
# type: (CountStat, TimeInterval) -> None
table = stat.zerver_count_query.analytics_table
id_cols = ''.join([col + ', ' for col in table.extended_id()])
group_by = 'GROUP BY %s' % id_cols if id_cols else ''
if table.objects.filter(property = stat.property,
end_time = time_interval.end,
interval = time_interval.interval).exists():
return
query = """
INSERT INTO %(table)s (%(id_cols)s value, property, end_time, interval)
SELECT %(id_cols)s sum(value), '%(property)s', %%(end_time)s, 'day'
FROM %(table)s WHERE
(
property = '%(property)s' AND
end_time > %%(time_start)s AND
end_time <= %%(time_end)s AND
interval = 'hour'
)
%(group_by)s property
""" % {'table': table._meta.db_table,
'id_cols' : id_cols,
'group_by' : group_by,
'property': stat.property}
cursor = connection.cursor()
cursor.execute(query, {'end_time': time_interval.end,
'time_start': time_interval.end - timedelta(days=1),
'time_end': time_interval.end})
cursor.close()
## methods that hit the prod databases directly
# No left joins in Django ORM yet, so have to use raw SQL :(
# written in slightly more than needed generality, to reduce copy-paste errors
# as more of these are made / make it easy to extend to a pull_X_by_realm
def do_pull_from_zerver(stat, time_interval):
# type: (CountStat, TimeInterval) -> None
zerver_table = stat.zerver_count_query.zerver_table._meta.db_table # type: ignore
join_args = ' '.join('AND %s.%s = %s' % (zerver_table, key, value) \
for key, value in stat.filter_args.items())
if stat.zerver_count_query.analytics_table.objects \
.filter(property = stat.property,
end_time = time_interval.end,
interval = time_interval.interval) \
.exists():
return
# We do string replacement here because passing join_args as a param
# may result in problems when running cursor.execute; we do
# the string formatting prior so that cursor.execute runs it as sql
query_ = stat.zerver_count_query.query % {'zerver_table' : zerver_table,
'property' : stat.property,
'interval' : time_interval.interval,
'join_args' : join_args}
cursor = connection.cursor()
cursor.execute(query_, {'time_start': time_interval.start, 'time_end': time_interval.end})
cursor.close()
count_user_by_realm_query = """
INSERT INTO analytics_realmcount
(realm_id, value, property, end_time, interval)
SELECT
zerver_realm.id, count(%(zerver_table)s),'%(property)s', %%(time_end)s, '%(interval)s'
FROM zerver_realm
LEFT JOIN zerver_userprofile
ON
(
zerver_userprofile.realm_id = zerver_realm.id AND
zerver_userprofile.date_joined >= %%(time_start)s AND
zerver_userprofile.date_joined < %%(time_end)s
%(join_args)s
)
WHERE
zerver_realm.date_created < %%(time_end)s
GROUP BY zerver_realm.id
"""
zerver_count_user_by_realm = ZerverCountQuery(UserProfile, RealmCount, count_user_by_realm_query)
# currently .sender_id is only Message specific thing
count_message_by_user_query = """
INSERT INTO analytics_usercount
(user_id, realm_id, value, property, end_time, interval)
SELECT
zerver_userprofile.id, zerver_userprofile.realm_id, count(*), '%(property)s', %%(time_end)s, '%(interval)s'
FROM zerver_userprofile
JOIN zerver_message
ON
(
zerver_message.sender_id = zerver_userprofile.id AND
zerver_message.pub_date >= %%(time_start)s AND
zerver_message.pub_date < %%(time_end)s
%(join_args)s
)
WHERE
zerver_userprofile.date_joined < %%(time_end)s
GROUP BY zerver_userprofile.id
"""
zerver_count_message_by_user = ZerverCountQuery(Message, UserCount, count_message_by_user_query)
count_message_by_stream_query = """
INSERT INTO analytics_streamcount
(stream_id, realm_id, value, property, end_time, interval)
SELECT
zerver_stream.id, zerver_stream.realm_id, count(*), '%(property)s', %%(time_end)s, '%(interval)s'
FROM zerver_stream
INNER JOIN zerver_recipient
ON
(
zerver_recipient.type = 2 AND
zerver_stream.id = zerver_recipient.type_id
)
INNER JOIN zerver_message
ON
(
zerver_message.recipient_id = zerver_recipient.id AND
zerver_message.pub_date >= %%(time_start)s AND
zerver_message.pub_date < %%(time_end)s AND
zerver_stream.date_created < %%(time_end)s
%(join_args)s
)
GROUP BY zerver_stream.id
"""
zerver_count_message_by_stream = ZerverCountQuery(Message, StreamCount, count_message_by_stream_query)
count_stream_by_realm_query = """
INSERT INTO analytics_realmcount
(realm_id, value, property, end_time, interval)
SELECT
zerver_stream.realm_id, count(*), '%(property)s', %%(time_end)s, '%(interval)s'
FROM zerver_stream
LEFT JOIN zerver_recipient
ON
(
zerver_recipient.type = 2 AND
zerver_stream.id = zerver_recipient.type_id
)
GROUP BY zerver_stream.realm_id
"""
zerver_count_stream_by_realm = ZerverCountQuery(Stream, RealmCount, count_stream_by_realm_query)
count_message_by_huddle_query = """
INSERT INTO analytics_huddlecount
(huddle_id, user_id, value, property, end_time, interval)
SELECT
zerver_message.recipient_id, zerver_message.sender_id, count(*), '%(property)s', %%(time_end)s, '%(interval)s'
FROM zerver_message
INNER JOIN zerver_recipient
ON
(
zerver_recipient.type = 3 AND
zerver_message.recipient_id = zerver_recipient.id AND
zerver_message.pub_date >= %%(time_start)s AND
zerver_message.pub_date < %%(time_end)s
%(join_args)s
)
GROUP BY zerver_message.recipient_id, zerver_message.sender_id
"""
zerver_count_message_by_huddle = ZerverCountQuery(Message, HuddleCount, count_message_by_huddle_query)
COUNT_STATS = {
'active_humans': CountStat('active_humans', zerver_count_user_by_realm,
{'is_bot': False, 'is_active': True}, 'gauge', 'day'),
'active_bots': CountStat('active_bots', zerver_count_user_by_realm,
{'is_bot': True, 'is_active': True}, 'gauge', 'day'),
'messages_sent': CountStat('messages_sent', zerver_count_message_by_user, {}, 'hour', 'hour')}

70
analytics/lib/interval.py Normal file
View File

@@ -0,0 +1,70 @@
from django.utils import timezone
from datetime import datetime, timedelta, MINYEAR
from zerver.lib.timestamp import is_timezone_aware
from six import text_type
MIN_TIME = datetime(MINYEAR, 1, 1, 0, 0, 0, tzinfo=timezone.utc)
# Name isn't great .. fixedinterval? timerange? Trying to distinguish
# generic intervals like 'hour' or 'quarter' from fixed intervals like
# 'Aug 3 2016 from 9-10am'
class TimeInterval(object):
def __init__(self, interval, end = timezone.now(), floor_to_boundary = 'hour'):
# type: (str, datetime, str) -> None
# Not the best logic for when we have intervals like 'quarter', but okay for now
if not is_timezone_aware(end):
raise ValueError("end must be timezone aware")
if floor_to_boundary is None:
self.end = end
else:
self.end = floor_to_interval_boundary(end, floor_to_boundary)
self.interval = interval
if interval == 'gauge':
self.start = MIN_TIME
else:
self.start = subtract_interval(self.end, interval)
# Perhaps the right way to do the next two functions is to have an interval class
# (subclassed to hourinterval, dayinterval, etc) with methods like floor,
# subtract, and subinterval. Seems like overkill for now, though.
def floor_to_interval_boundary(datetime_object, interval):
# type: (datetime, text_type) -> datetime
# datetime objects are (year, month, day, hour, minutes, seconds, microseconds)
if interval == 'day':
return datetime(*datetime_object.timetuple()[:3]).replace(tzinfo=datetime_object.tzinfo)
elif interval == 'hour':
return datetime(*datetime_object.timetuple()[:4]).replace(tzinfo=datetime_object.tzinfo)
else:
raise ValueError("Unknown or unfloorable interval", interval)
# Don't have to worry about leap seconds, since datetime doesn't support it
def subtract_interval(datetime_object, interval):
# type: (datetime, str) -> datetime
if interval == 'day':
return datetime_object - timedelta(days = 1)
elif interval == 'hour':
return datetime_object - timedelta(seconds = 3600)
else:
raise ValueError("Unknown or unarithmetic interval", interval)
def subintervals(interval):
# type: (str) -> List[str]
if interval == 'day':
return ['day', 'hour']
elif interval == 'hour':
return ['hour',]
elif interval == 'gauge':
return ['gauge',]
else:
raise ValueError("Unknown interval", interval)
def timeinterval_range(first, last, interval, step_interval):
# type: (datetime, datetime, str, str) -> List[TimeInterval]
end = floor_to_interval_boundary(last, step_interval)
ans = []
while end >= first:
ans.append(TimeInterval(interval, end, floor_to_boundary=None))
end = subtract_interval(end, step_interval)
ans.reverse()
return ans

View File

@@ -0,0 +1,60 @@
from argparse import ArgumentParser
from datetime import timedelta
from django.core.management.base import BaseCommand
from django.utils import timezone
from django.utils.dateparse import parse_datetime
from analytics.models import RealmCount, UserCount
from analytics.lib.counts import COUNT_STATS, CountStat, process_count_stat
from zerver.lib.timestamp import datetime_to_string, is_timezone_aware
from zerver.models import UserProfile, Message
from typing import Any
class Command(BaseCommand):
help = """Fills Analytics tables.
Run as a cron job that runs every hour."""
def add_arguments(self, parser):
# type: (ArgumentParser) -> None
parser.add_argument('--range-start', '-s',
type=str,
help="Time to backfill from.")
parser.add_argument('--range-end', '-e',
type=str,
help='Time to backfill to.',
default=datetime_to_string(timezone.now()))
parser.add_argument('--utc',
type=bool,
help="Interpret --range-start and --range-end as times in UTC.",
default=False)
parser.add_argument('--stat', '-q',
type=str,
help="CountStat to process. If omitted, all stats are processed")
def handle(self, *args, **options):
# type: (*Any, **Any) -> None
range_start = parse_datetime(options['range_start'])
if 'range_end' in options:
range_end = parse_datetime(options['range_end'])
else:
range_end = range_start - timedelta(seconds = 3600)
# throw error if start time is greater than end time
if range_start > range_end:
raise ValueError("--range-start cannot be greater than --range-end.")
if options['utc'] is True:
range_start = range_start.replace(tzinfo=timezone.utc)
range_end = range_end.replace(tzinfo=timezone.utc)
if not (is_timezone_aware(range_start) and is_timezone_aware(range_end)):
raise ValueError("--range-start and --range-end must be timezone aware. Maybe you meant to use the --utc option?")
if 'stat' in options:
process_count_stat(COUNT_STATS[options['stat']], range_start, range_end)
else:
for stat in COUNT_STATS.values():
process_count_stat(stat, range_start, range_end)

View File

@@ -0,0 +1,112 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import models, migrations
from django.conf import settings
import zerver.lib.str_utils
class Migration(migrations.Migration):
dependencies = [
('zerver', '0028_userprofile_tos_version'),
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]
operations = [
migrations.CreateModel(
name='Anomaly',
fields=[
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
('info', models.CharField(max_length=1000)),
],
bases=(zerver.lib.str_utils.ModelReprMixin, models.Model),
),
migrations.CreateModel(
name='HuddleCount',
fields=[
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
('value', models.BigIntegerField()),
('property', models.CharField(max_length=40)),
('end_time', models.DateTimeField()),
('interval', models.CharField(max_length=20)),
('anomaly', models.ForeignKey(to='analytics.Anomaly', null=True)),
('huddle', models.ForeignKey(to='zerver.Recipient')),
('user', models.ForeignKey(to=settings.AUTH_USER_MODEL)),
],
bases=(zerver.lib.str_utils.ModelReprMixin, models.Model),
),
migrations.CreateModel(
name='InstallationCount',
fields=[
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
('value', models.BigIntegerField()),
('property', models.CharField(max_length=40)),
('end_time', models.DateTimeField()),
('interval', models.CharField(max_length=20)),
('anomaly', models.ForeignKey(to='analytics.Anomaly', null=True)),
],
bases=(zerver.lib.str_utils.ModelReprMixin, models.Model),
),
migrations.CreateModel(
name='RealmCount',
fields=[
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
('value', models.BigIntegerField()),
('property', models.CharField(max_length=40)),
('end_time', models.DateTimeField()),
('interval', models.CharField(max_length=20)),
('anomaly', models.ForeignKey(to='analytics.Anomaly', null=True)),
('realm', models.ForeignKey(to='zerver.Realm')),
],
bases=(zerver.lib.str_utils.ModelReprMixin, models.Model),
),
migrations.CreateModel(
name='StreamCount',
fields=[
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
('value', models.BigIntegerField()),
('property', models.CharField(max_length=40)),
('end_time', models.DateTimeField()),
('interval', models.CharField(max_length=20)),
('anomaly', models.ForeignKey(to='analytics.Anomaly', null=True)),
('realm', models.ForeignKey(to='zerver.Realm')),
('stream', models.ForeignKey(to='zerver.Stream')),
],
bases=(zerver.lib.str_utils.ModelReprMixin, models.Model),
),
migrations.CreateModel(
name='UserCount',
fields=[
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
('value', models.BigIntegerField()),
('property', models.CharField(max_length=40)),
('end_time', models.DateTimeField()),
('interval', models.CharField(max_length=20)),
('anomaly', models.ForeignKey(to='analytics.Anomaly', null=True)),
('realm', models.ForeignKey(to='zerver.Realm')),
('user', models.ForeignKey(to=settings.AUTH_USER_MODEL)),
],
bases=(zerver.lib.str_utils.ModelReprMixin, models.Model),
),
migrations.AlterUniqueTogether(
name='usercount',
unique_together=set([('user', 'property', 'end_time', 'interval')]),
),
migrations.AlterUniqueTogether(
name='streamcount',
unique_together=set([('stream', 'property', 'end_time', 'interval')]),
),
migrations.AlterUniqueTogether(
name='realmcount',
unique_together=set([('realm', 'property', 'end_time', 'interval')]),
),
migrations.AlterUniqueTogether(
name='installationcount',
unique_together=set([('property', 'end_time', 'interval')]),
),
migrations.AlterUniqueTogether(
name='huddlecount',
unique_together=set([('huddle', 'property', 'end_time', 'interval')]),
),
]

View File

140
analytics/models.py Normal file
View File

@@ -0,0 +1,140 @@
from django.db import models
from zerver.models import Realm, UserProfile, Stream, Recipient
from zerver.lib.str_utils import ModelReprMixin
import datetime
from six import text_type
from typing import Optional, Tuple, Union
from analytics.lib.interval import MIN_TIME
# would only ever make entries here by hand
class Anomaly(ModelReprMixin, models.Model):
info = models.CharField(max_length=1000) # type: text_type
def __unicode__(self):
# type: () -> text_type
return u"<Anomaly: %s... %s>" % (self.info, self.id)
class BaseCount(ModelReprMixin, models.Model):
value = models.BigIntegerField() # type: int
property = models.CharField(max_length=40) # type: text_type
end_time = models.DateTimeField() # type: datetime.datetime
interval = models.CharField(max_length=20) # type: text_type
anomaly = models.ForeignKey(Anomaly, null=True) # type: Optional[Anomaly]
class Meta(object):
abstract = True
@staticmethod
def extended_id():
# type: () -> Tuple[str, ...]
raise NotImplementedError
@staticmethod
def key_model():
# type: () -> models.Model
raise NotImplementedError
class InstallationCount(BaseCount):
class Meta(object):
unique_together = ("property", "end_time", "interval")
@staticmethod
def extended_id():
# type: () -> Tuple[str, ...]
return ()
@staticmethod
def key_model():
# type: () -> models.Model
return None
def __unicode__(self):
# type: () -> text_type
return u"<InstallationCount: %s %s %s>" % (self.property, self.value, self.id)
class RealmCount(BaseCount):
realm = models.ForeignKey(Realm)
class Meta(object):
unique_together = ("realm", "property", "end_time", "interval")
@staticmethod
def extended_id():
# type: () -> Tuple[str, ...]
return ('realm_id',)
@staticmethod
def key_model():
# type: () -> models.Model
return Realm
def __unicode__(self):
# type: () -> text_type
return u"<RealmCount: %s %s %s %s>" % (self.realm, self.property, self.value, self.id)
class UserCount(BaseCount):
user = models.ForeignKey(UserProfile)
realm = models.ForeignKey(Realm)
class Meta(object):
unique_together = ("user", "property", "end_time", "interval")
@staticmethod
def extended_id():
# type: () -> Tuple[str, ...]
return ('user_id', 'realm_id')
@staticmethod
def key_model():
# type: () -> models.Model
return UserProfile
def __unicode__(self):
# type: () -> text_type
return u"<UserCount: %s %s %s %s>" % (self.user, self.property, self.value, self.id)
class StreamCount(BaseCount):
stream = models.ForeignKey(Stream)
realm = models.ForeignKey(Realm)
class Meta(object):
unique_together = ("stream", "property", "end_time", "interval")
@staticmethod
def extended_id():
# type: () -> Tuple[str, ...]
return ('stream_id', 'realm_id')
@staticmethod
def key_model():
# type: () -> models.Model
return Stream
def __unicode__(self):
# type: () -> text_type
return u"<StreamCount: %s %s %s %s>" % (self.stream, self.property, self.value, self.id)
class HuddleCount(BaseCount):
huddle = models.ForeignKey(Recipient)
user = models.ForeignKey(UserProfile)
class Meta(object):
unique_together = ("huddle", "property", "end_time", "interval")
@staticmethod
def extended_id():
# type: () -> Tuple[str, ...]
return ('huddle_id', 'user_id')
@staticmethod
def key_model():
# type: () -> models.Model
return Recipient
def __unicode__(self):
# type: () -> text_type
return u"<HuddleCount: %s %s %s %s>" % (self.huddle, self.property, self.value, self.id)

View File

View File

@@ -0,0 +1,346 @@
from django.utils import timezone
from django.utils.dateparse import parse_datetime
from django.test import TestCase
from datetime import datetime, timedelta
from analytics.lib.interval import TimeInterval
from analytics.lib.counts import CountStat, process_count_stat, \
zerver_count_user_by_realm, zerver_count_message_by_user, \
zerver_count_message_by_stream, zerver_count_stream_by_realm, \
zerver_count_message_by_huddle
from analytics.models import UserCount, RealmCount, StreamCount, InstallationCount, Stream, Recipient
from zerver.lib.test_helpers import make_client, get_stream
from zerver.models import Realm, UserProfile, Message, get_user_profile_by_email
from typing import Any
from six import text_type
def do_update_past_hour(stat):
# type: (CountStat) -> Any
return process_count_stat(stat, range_start=timezone.now() - timedelta(seconds=3600),
range_end=timezone.now())
class TestDataCollectors(TestCase):
def create_user(self, email, **kwargs):
# type: (str, **Any) -> UserProfile
defaults = {'realm': self.realm,
'full_name': 'full_name',
'short_name': 'short_name',
'pointer': -1,
'last_pointer_updater': 'seems unused?',
'api_key': '42'}
for key, value in defaults.items():
kwargs[key] = kwargs.get(key, value)
user = UserProfile(email=email, **kwargs)
user.save()
return user
def create_stream(self, **kwargs):
# type: (**Any) -> Stream
defaults = {'realm': self.realm,
'description': ''}
for key, value in defaults.items():
kwargs[key] = kwargs.get(key, value)
stream = Stream(**kwargs)
stream.save()
return stream
def create_message(self, sender, recipient, **kwargs):
# type: (UserProfile, Recipient, **Any) -> Message
sending_client = make_client(name="analytics test")
defaults = {
'sender': sender,
'recipient': recipient,
'subject': 'whatever',
'content': 'hello **world**',
'pub_date': timezone.now(),
'sending_client': sending_client,
'last_edit_time': timezone.now(),
'edit_history': '[]'
}
for key, value in defaults.items():
kwargs[key] = kwargs.get(key, value)
message = Message(**kwargs)
message.save()
return message
# TODO uk refactor tests to use this assert
def assertRealmCountEquals(self, realm, property, interval, value):
# type: (Realm, str, str, Any) -> None
realm_count_value = RealmCount.objects.filter(realm=realm,
property=property,
interval=interval).values_list('value', flat=True)[0]
self.assertEqual(realm_count_value, value)
def setUp(self):
# type: () -> None
# almost every test will need a time_interval, realm, and user
end = timezone.now() + timedelta(seconds=7200)
self.day_interval = TimeInterval('day', end, 'hour')
self.hour_interval = TimeInterval('hour', end, 'hour')
self.realm = Realm(domain='analytics.test', name='Realm Test',
date_created=parse_datetime('2016-09-27 01:00:50+00:00'))
self.realm.save()
# don't pull the realm object back from the database every time we need its id
self.realm_id = self.realm.id
self.user = self.create_user('email', is_bot=False, is_active=True,
date_joined=parse_datetime('2016-09-27 04:20:50+00:00'))
self.user_id = self.user.id
def test_human_and_bot_count_by_realm(self):
# type: () -> None
stats = [
CountStat('test_active_humans', zerver_count_user_by_realm, {'is_bot': False, 'is_active': True},
'hour', 'hour'),
CountStat('test_active_bots', zerver_count_user_by_realm, {'is_bot': True, 'is_active': True},
'hour', 'hour')]
# TODO these dates should probably be explicit, since the default args for the commands are timezone.now() dependent.
self.create_user('test_bot1', is_bot=True, is_active=True,
date_joined=timezone.now() - timedelta(hours=1))
self.create_user('test_bot2', is_bot=True, is_active=True,
date_joined=timezone.now() - timedelta(hours=1))
self.create_user('test_human', is_bot=False, is_active=True,
date_joined=timezone.now() - timedelta(hours=1))
for stat in stats:
do_update_past_hour(stat)
human_row = RealmCount.objects.filter(realm=self.realm, interval='day',
property='test_active_humans') \
.values_list('value', flat=True)[0]
assert (human_row == 1)
bot_row = RealmCount.objects.filter(realm=self.realm, interval='day',
property='test_active_bots') \
.values_list('value', flat=True)[0]
assert (bot_row == 2)
# test users added in last hour
def test_add_new_users(self):
# type: () -> None
stat = CountStat('add_new_user_test', zerver_count_user_by_realm, {}, 'hour', 'hour')
# add new users to realm in last hour
self.create_user('email_1', date_joined=parse_datetime('2016-09-27 03:22:50+00:00'))
self.create_user('email_2', date_joined=parse_datetime('2016-09-27 03:15:50+00:00'))
# add a new user before an hour
self.create_user('email_3', date_joined=parse_datetime('2016-09-27 02:10:50+00:00'))
# check if user added before the hour is not included
process_count_stat(stat, range_start=parse_datetime('2016-09-27 03:00:50+00:00'),
range_end=parse_datetime('2016-09-27 04:00:50+00:00'))
# do_update is writing the stat.property to all zerver tables
row = RealmCount.objects.filter(realm=self.realm, property='add_new_user_test',
interval='hour').values_list('value', flat=True)[0]
# assert only 2 users
assert (row == 2)
def test_analytics_stat_write(self):
# type: () -> None
# might change if we refactor count_query
stat = CountStat('test_stat_write', zerver_count_stream_by_realm,
{'invite_only': False}, 'hour', 'hour')
# add some stuff to zerver_*
self.create_stream(name='stream1', description='test_analytics_stream',
date_created=parse_datetime('2016-09-27 02:10:50+00:00'))
self.create_stream(name='stream2', description='test_analytics_stream',
date_created=parse_datetime('2016-09-27 02:10:50+00:00'))
self.create_stream(name='stream3', description='test_analytics_stream',
date_created=parse_datetime('2016-09-27 02:10:50+00:00'))
# run do_pull_from_zerver
do_update_past_hour(stat)
# check analytics_* values are correct
row = RealmCount.objects.filter(realm=self.realm, interval='day',
property='test_stat_write').values_list('value', flat=True)[0]
assert (row == 3)
# test if process count does nothing if count already processed
def test_process_count(self):
# type: () -> None
# add some active and inactive users that are human
self.create_user('inactive_human_1', is_bot=False, is_active=False,
date_joined=timezone.now() - timedelta(hours=1))
self.create_user('inactive_human_2', is_bot=False, is_active=False,
date_joined=timezone.now() - timedelta(hours=1))
self.create_user('active_human', is_bot=False, is_active=True,
date_joined=timezone.now() - timedelta(hours=1))
# run stat to pull active humans
stat = CountStat('active_humans', zerver_count_user_by_realm,
{'is_bot': False, 'is_active': True}, 'hour', 'hour')
do_update_past_hour(stat)
# get row in analytics table
row_before = RealmCount.objects.filter(realm=self.realm, interval='day',
property='active_humans')\
.values_list('value', flat=True)[0]
# run command again
do_update_past_hour(stat)
# check if row is same as before
row_after = RealmCount.objects.filter(realm=self.realm, interval='day',
property='active_humans').values_list('value', flat=True)[0]
assert (row_before == 1)
assert (row_before == row_after)
# test management commands
def test_update_analytics_tables(self):
# type: () -> None
stat = CountStat('test_messages_sent', zerver_count_message_by_user, {}, 'hour', 'hour')
self.create_user('human1', is_bot=False, is_active=True,
date_joined=parse_datetime('2016-09-27 04:22:50+00:00'))
human1 = get_user_profile_by_email('human1')
human2 = get_user_profile_by_email('email')
self.create_message(human1, Recipient(human2.id),
pub_date=parse_datetime('2016-09-27 04:30:50+00:00'))
# run command
process_count_stat(stat, range_start=parse_datetime('2016-09-27 04:00:50+00:00'),
range_end=parse_datetime('2016-09-27 05:00:50+00:00'))
usercount_row = UserCount.objects.filter(realm=self.realm, interval='hour',
property='test_messages_sent').values_list(
'value', flat=True)[0]
assert (usercount_row == 1)
# run command with dates before message creation
process_count_stat(stat, range_start=parse_datetime('2016-09-27 01:00:50+00:00'),
range_end=parse_datetime('2016-09-22 02:00:50+00:00'))
# check new row has no entries, old ones still there
updated_usercount_row = UserCount.objects.filter(
realm=self.realm, interval='hour', property='test_messages_sent') \
.values_list('value', flat=True)[0]
new_row = UserCount.objects.filter(realm=self.realm, interval='hour', property='test_messages_sent',
end_time=datetime(2016, 9, 22, 5, 0).replace(tzinfo=timezone.utc)).exists()
self.assertFalse(new_row)
assert (updated_usercount_row == 1)
def test_do_aggregate(self):
# type: () -> None
# write some entries to analytics.usercount with smallest interval as day
stat = CountStat('test_messages_aggregate', zerver_count_message_by_user, {}, 'day', 'hour')
# write some messages
self.create_user('human1', is_bot=False, is_active=True,
date_joined=parse_datetime('2016-09-27 04:22:50+00:00'))
human1 = get_user_profile_by_email('human1')
human2 = get_user_profile_by_email('email')
self.create_message(human1, Recipient(human2.id),
pub_date=parse_datetime('2016-09-27 04:30:50+00:00'),
content="hi")
self.create_message(human1, Recipient(human2.id),
pub_date=parse_datetime('2016-09-27 04:30:50+00:00'),
content="hello")
self.create_message(human1, Recipient(human2.id),
pub_date=parse_datetime('2016-09-27 04:30:50+00:00'),
content="bye")
# run command
process_count_stat(stat, range_start=parse_datetime('2016-09-27 04:00:50+00:00'),
range_end=parse_datetime('2016-09-27 05:00:50+00:00'))
# check no rows for hour interval on usercount granularity
usercount_row = UserCount.objects.filter(realm=self.realm, interval='hour').exists()
self.assertFalse(usercount_row)
# see if aggregated correctly to realmcount and installationcount
realmcount_row = RealmCount.objects.filter(realm=self.realm, interval='day',
property='test_messages_aggregate').values_list(
'value', flat=True)[0]
assert (realmcount_row == 3)
installationcount_row = InstallationCount.objects.filter(interval='day',
property='test_messages_aggregate') \
.values_list('value', flat=True)[0]
assert (installationcount_row == 3)
def test_message_to_stream_aggregation(self):
# type: () -> None
stat = CountStat('test_messages_to_stream', zerver_count_message_by_stream, {}, 'hour', 'hour')
# write some messages
user = get_user_profile_by_email('email')
self.create_stream(name='stream1', description='test_analytics_stream',
date_created=parse_datetime('2016-09-27 03:10:50+00:00'))
stream1 = get_stream('stream1', self.realm)
recipient = Recipient(type_id=stream1.id, type=2)
recipient.save()
self.create_message(user, recipient, pub_date=parse_datetime('2016-09-27 04:30:50+00:00'),
content='hi')
# run command
process_count_stat(stat, range_start=parse_datetime('2016-09-27 04:00:50+00:00'),
range_end=parse_datetime('2016-09-27 05:00:50+00:00'))
stream_row = StreamCount.objects.filter(realm=self.realm, interval='hour',
property='test_messages_to_stream').values_list(
'value', flat=True)[0]
assert (stream_row == 1)
def test_count_before_realm_creation(self):
# type: () -> None
stat = CountStat('test_active_humans', zerver_count_user_by_realm,
{'is_bot': False, 'is_active': True}, 'hour', 'hour')
self.realm.date_created = parse_datetime('2016-09-30 01:00:50+00:00')
self.realm.save()
self.create_user('human1', is_bot=False, is_active=True,
date_joined=parse_datetime('2016-09-30 04:22:50+00:00'))
# run count prior to realm creation
process_count_stat(stat, range_start=parse_datetime('2016-09-26 04:00:50+00:00'),
range_end=parse_datetime('2016-09-26 05:00:50+00:00'))
realm_count = RealmCount.objects.values('realm__name', 'value', 'property') \
.filter(realm=self.realm, interval='hour').exists()
# assert no rows exist
self.assertFalse(realm_count)
def test_empty_counts_in_realm(self):
# type: () -> None
# test that rows with empty counts are returned if realm exists
stat = CountStat('test_active_humans', zerver_count_user_by_realm,
{'is_bot': False, 'is_active': True}, 'hour', 'hour')
self.create_user('human1', is_bot=False, is_active=True,
date_joined=parse_datetime('2016-09-27 02:22:50+00:00'))
process_count_stat(stat, range_start=parse_datetime('2016-09-27 01:00:50+00:00'),
range_end=parse_datetime('2016-09-27 05:00:50+00:00'))
realm_count = RealmCount.objects.values('end_time', 'value') \
.filter(realm=self.realm, interval='hour')
empty1 = realm_count.filter(end_time=datetime(2016, 9, 27, 2, 0, tzinfo=timezone.utc)) \
.values_list('value', flat=True)[0]
empty2 = realm_count.filter(end_time=datetime(2016, 9, 27, 4, 0, tzinfo=timezone.utc)) \
.values_list('value', flat=True)[0]
nonempty = realm_count.filter(end_time=datetime(2016, 9, 27, 5, 0, tzinfo=timezone.utc)) \
.values_list('value', flat=True)[0]
assert (empty1 == 0)
assert (empty2 == 0)
assert (nonempty == 1)

View File

@@ -0,0 +1,28 @@
from django.test import TestCase
from django.utils import timezone
from analytics.lib.interval import TimeInterval, floor_to_interval_boundary, subtract_interval, timeinterval_range
from datetime import datetime, timedelta
class TimeIntervalTest(TestCase):
def test_time_interval_creation(self):
# type: () -> None
time_interval = TimeInterval('day', datetime(2016, 4, 29, 3, 14, 15, 926535).replace(tzinfo=timezone.utc))
self.assertEqual(time_interval.start, datetime(2016, 4, 28, 3, 0, 0).replace(tzinfo=timezone.utc))
self.assertEqual(time_interval.end, datetime(2016, 4, 29, 3, 0, 0).replace(tzinfo=timezone.utc))
def test_datetime_leap_second(self):
# type: () -> None
after_leap = datetime(2015, 7, 1)
self.assertEqual(subtract_interval(after_leap, 'hour'), datetime(2015, 6, 30, 23))
def test_timeinterval_range(self):
# type: () -> None
first = datetime(2016, 4, 29, 3, 14, 15, 926535).replace(tzinfo=timezone.utc)
self.assertEqual(len(timeinterval_range(first, first + timedelta(days = 1), 'day', 'hour')), 24)
first_hour = floor_to_interval_boundary(first, 'hour')
self.assertEqual(len(timeinterval_range(first_hour, first_hour + timedelta(days = 1), 'day', 'hour')), 25)
self.assertEqual(len(timeinterval_range(first_hour, first_hour + timedelta(days = 1), 'day', 'day')), 1)
# TODO: test UTC / timezone flooring stuff

View File

@@ -3,6 +3,7 @@ try:
from django.conf import settings
from zerver.models import *
from zerver.lib.actions import * # type: ignore # Otherwise have duplicate imports with previous line
from analytics.models import *
except Exception:
import traceback
print("\nException importing Zulip core modules on startup!")

View File

@@ -212,7 +212,9 @@ def build_custom_checkers(by_lang):
{'pattern': '".*"%\([a-z_].*\)?$',
'description': 'Missing space around "%"'},
{'pattern': "'.*'%\([a-z_].*\)?$",
'exclude': set(['tools/lint-all']),
'exclude': set(['tools/lint-all',
'analytics/lib/counts.py',
]),
'exclude_line': set([
('zerver/views/users.py',
"return json_error(_(\"Email '%(email)s' does not belong to domain '%(domain)s'\") %"),

View File

@@ -112,7 +112,8 @@ if __name__ == "__main__":
if len(args) == 0:
suites = ["zerver.tests"]
suites = ["zerver.tests",
"analytics.tests"]
else:
suites = args

View File

@@ -4,6 +4,10 @@ import datetime
import calendar
from django.utils.timezone import utc
def is_timezone_aware(datetime_object):
# type: (datetime.datetime) -> bool
return datetime_object.tzinfo is not None
def timestamp_to_datetime(timestamp):
# type: (float) -> datetime.datetime
return datetime.datetime.utcfromtimestamp(float(timestamp)).replace(tzinfo=utc)
@@ -11,3 +15,9 @@ def timestamp_to_datetime(timestamp):
def datetime_to_timestamp(datetime_object):
# type: (datetime.datetime) -> int
return calendar.timegm(datetime_object.timetuple())
def datetime_to_string(datetime_object):
# type: (datetime.datetime) -> str
assert is_timezone_aware(datetime_object)
date_string = datetime_object.strftime('%Y-%m-%d %H:%M:%S%z')
return date_string