analytics: Add table to keep track of fill state.

Adds two simplifying assumptions to how we process analytics stats:
* Sets the atomic unit of work to: a stat processed at an hour boundary.
* For any given stat, only allows these atomic units of work to be processed
  in chronological order.

Adds a table FillState that, for each stat, keeps track of the last unit of
work that was processed.
This commit is contained in:
Rishi Gupta
2016-10-12 14:40:48 -07:00
committed by Tim Abbott
parent 721529b782
commit 655ee51e35
7 changed files with 170 additions and 108 deletions

View File

@@ -2,7 +2,7 @@ from django.db import connection, models
from datetime import timedelta, datetime
from analytics.models import InstallationCount, RealmCount, \
UserCount, StreamCount, BaseCount
UserCount, StreamCount, BaseCount, FillState, get_fill_state, installation_epoch
from analytics.lib.interval import TimeInterval, timeinterval_range, subintervals
from zerver.models import Realm, UserProfile, Message, Stream, models
@@ -26,29 +26,60 @@ class ZerverCountQuery(object):
self.analytics_table = analytics_table
self.query = query
def process_count_stat(stat, range_start, range_end):
# type: (CountStat, datetime, datetime) -> None
def process_count_stat(stat, fill_to_time):
# type: (CountStat, datetime) -> None
fill_state = get_fill_state(stat.property)
if fill_state is None:
currently_filled = installation_epoch()
FillState.objects.create(property = stat.property,
end_time = currently_filled,
state = FillState.DONE)
elif fill_state['state'] == FillState.STARTED:
do_delete_count_stat_at_hour(stat, fill_state['end_time'])
currently_filled = fill_state['end_time'] - timedelta(hours = 1)
FillState.objects.filter(property = stat.property). \
update(end_time = currently_filled, state = FillState.DONE)
elif fill_state['state'] == FillState.DONE:
currently_filled = fill_state['end_time']
else:
raise ValueError("Unknown value for FillState.state: %s." % fill_state['state'])
currently_filled = currently_filled + timedelta(hours = 1)
while currently_filled <= fill_to_time:
FillState.objects.filter(property = stat.property) \
.update(end_time = currently_filled, state = FillState.STARTED)
do_fill_count_stat_at_hour(stat, currently_filled)
FillState.objects.filter(property = stat.property).update(state = FillState.DONE)
currently_filled = currently_filled + timedelta(hours = 1)
# TODO: simplify implementation now that range_start and range_end are just
# a single end_time
def do_fill_count_stat_at_hour(stat, end_time):
# type: (CountStat, datetime) -> None
# stats that hit the prod database
for time_interval in timeinterval_range(range_start, range_end, stat.smallest_interval, stat.frequency):
for time_interval in timeinterval_range(end_time, end_time, stat.smallest_interval, stat.frequency):
do_pull_from_zerver(stat, time_interval)
# aggregate hour to day
for time_interval in timeinterval_range(range_start, range_end, 'day', stat.frequency):
for time_interval in timeinterval_range(end_time, end_time, 'day', stat.frequency):
if stat.smallest_interval == 'hour':
do_aggregate_hour_to_day(stat, time_interval)
# aggregate to summary tables
for interval in ['hour', 'day', 'gauge']:
for time_interval in timeinterval_range(range_start, range_end, interval, stat.frequency):
for time_interval in timeinterval_range(end_time, end_time, interval, stat.frequency):
if stat.smallest_interval in subintervals(interval):
do_aggregate_to_summary_table(stat, time_interval)
def do_delete_count_stat_at_hour(stat, end_time):
# type: (CountStat, datetime) -> None
UserCount.objects.filter(property = stat.property, end_time = end_time).delete()
StreamCount.objects.filter(property = stat.property, end_time = end_time).delete()
RealmCount.objects.filter(property = stat.property, end_time = end_time).delete()
InstallationCount.objects.filter(property = stat.property, end_time = end_time).delete()
def do_aggregate_to_summary_table(stat, time_interval):
# type: (CountStat, TimeInterval) -> None
if InstallationCount.objects.filter(property = stat.property,
end_time = time_interval.end,
interval = time_interval.interval).exists():
return
cursor = connection.cursor()
# Aggregate into RealmCount
@@ -100,11 +131,6 @@ def do_aggregate_hour_to_day(stat, time_interval):
id_cols = ''.join([col + ', ' for col in table.extended_id()])
group_by = 'GROUP BY %s' % id_cols if id_cols else ''
if table.objects.filter(property = stat.property,
end_time = time_interval.end,
interval = time_interval.interval).exists():
return
query = """
INSERT INTO %(table)s (%(id_cols)s value, property, end_time, interval)
SELECT %(id_cols)s sum(value), '%(property)s', %%(end_time)s, 'day'
@@ -136,14 +162,6 @@ def do_pull_from_zerver(stat, time_interval):
zerver_table = stat.zerver_count_query.zerver_table._meta.db_table # type: ignore
join_args = ' '.join('AND %s.%s = %s' % (zerver_table, key, value) \
for key, value in stat.filter_args.items())
if stat.zerver_count_query.analytics_table.objects \
.filter(property = stat.property,
end_time = time_interval.end,
interval = time_interval.interval) \
.exists():
return
# We do string replacement here because passing join_args as a param
# may result in problems when running cursor.execute; we do
# the string formatting prior so that cursor.execute runs it as sql

View File

@@ -38,6 +38,10 @@ def floor_to_interval_boundary(datetime_object, interval):
else:
raise ValueError("Unknown or unfloorable interval", interval)
def floor_to_day(datetime_object):
# type: (datetime) -> datetime
return datetime(*datetime_object.timetuple()[:3]).replace(tzinfo=datetime_object.tzinfo)
# Don't have to worry about leap seconds, since datetime doesn't support it
def subtract_interval(datetime_object, interval):
# type: (datetime, str) -> datetime

View File

@@ -14,6 +14,7 @@ DELETE FROM ONLY analytics_installationcount;
DELETE FROM ONLY analytics_realmcount;
DELETE FROM ONLY analytics_usercount;
DELETE FROM ONLY analytics_streamcount;
DELETE FROM ONLY analytics_fillstate;
"""
class Command(BaseCommand):

View File

@@ -22,42 +22,29 @@ class Command(BaseCommand):
def add_arguments(self, parser):
# type: (ArgumentParser) -> None
parser.add_argument('--range-start', '-s',
parser.add_argument('--time', '-t',
type=str,
help="Time to backfill from.")
parser.add_argument('--range-end', '-e',
type=str,
help='Time to backfill to, defaulst to now.',
help='Update stat tables from current state to --time. Defaults to the current time.',
default=datetime_to_string(timezone.now()))
parser.add_argument('--utc',
type=bool,
help="Interpret --range-start and --range-end as times in UTC.",
help="Interpret --time in UTC.",
default=False)
parser.add_argument('--stat', '-q',
parser.add_argument('--stat', '-s',
type=str,
help="CountStat to process. If omitted, all stats are processed")
help="CountStat to process. If omitted, all stats are processed.")
def handle(self, *args, **options):
# type: (*Any, **Any) -> None
range_end = parse_datetime(options['range_end'])
if options['range_start'] is not None:
range_start = parse_datetime(options['range_start'])
else:
range_start = range_end - timedelta(seconds = 3600)
# throw error if start time is greater than end time
if range_start > range_end:
raise ValueError("--range-start cannot be greater than --range-end.")
fill_to_time = parse_datetime(options['time'])
if options['utc']:
range_start = range_start.replace(tzinfo=timezone.utc)
range_end = range_end.replace(tzinfo=timezone.utc)
fill_to_time = fill_to_time.replace(tzinfo=timezone.utc)
if not (is_timezone_aware(range_start) and is_timezone_aware(range_end)):
raise ValueError("--range-start and --range-end must be timezone aware. Maybe you meant to use the --utc option?")
if not (is_timezone_aware(fill_to_time)):
raise ValueError("--time must be timezone aware. Maybe you meant to use the --utc option?")
if options['stat'] is not None:
process_count_stat(COUNT_STATS[options['stat']], range_start, range_end)
process_count_stat(COUNT_STATS[options['stat']], fill_to_time)
else:
for stat in COUNT_STATS.values():
process_count_stat(stat, range_start, range_end)
process_count_stat(stat, fill_to_time)

View File

@@ -0,0 +1,26 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations, models
import zerver.lib.str_utils
class Migration(migrations.Migration):
dependencies = [
('analytics', '0002_remove_huddlecount'),
]
operations = [
migrations.CreateModel(
name='FillState',
fields=[
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
('property', models.CharField(unique=True, max_length=40)),
('end_time', models.DateTimeField()),
('state', models.PositiveSmallIntegerField()),
('last_modified', models.DateTimeField(auto_now=True)),
],
bases=(zerver.lib.str_utils.ModelReprMixin, models.Model),
),
]

View File

@@ -1,13 +1,41 @@
from django.db import models
from django.utils import timezone
from analytics.lib.interval import floor_to_day
from zerver.models import Realm, UserProfile, Stream, Recipient
from zerver.lib.str_utils import ModelReprMixin
import datetime
from six import text_type
from typing import Optional, Tuple, Union
from typing import Optional, Tuple, Union, Dict, Any
from analytics.lib.interval import MIN_TIME
class FillState(ModelReprMixin, models.Model):
property = models.CharField(max_length=40, unique=True) # type: text_type
end_time = models.DateTimeField() # type: datetime.datetime
# Valid states are {DONE, STARTED}
DONE = 1
STARTED = 2
state = models.PositiveSmallIntegerField() # type: int
last_modified = models.DateTimeField(auto_now=True) # type: datetime.datetime
def __unicode__(self):
# type: () -> text_type
return u"<FillState: %s %s %s>" % (self.property, self.end_time, self.state)
def get_fill_state(property):
# type: (text_type) -> Optional[Dict[str, Any]]
try:
return FillState.objects.filter(property = property).values('end_time', 'state')[0]
except IndexError:
return None
# The earliest/starting end_time in FillState
# We assume there is at least one realm
def installation_epoch():
# type: () -> datetime.datetime
return floor_to_day(Realm.objects.aggregate(models.Min('date_created'))['date_created__min'])
# would only ever make entries here by hand
class Anomaly(ModelReprMixin, models.Model):

View File

@@ -5,9 +5,10 @@ from django.utils import timezone
from analytics.lib.interval import TimeInterval
from analytics.lib.counts import CountStat, COUNT_STATS, process_count_stat, \
zerver_count_user_by_realm, zerver_count_message_by_user, \
zerver_count_message_by_stream, zerver_count_stream_by_realm
zerver_count_message_by_stream, zerver_count_stream_by_realm, \
do_fill_count_stat_at_hour, ZerverCountQuery
from analytics.models import BaseCount, InstallationCount, RealmCount, \
UserCount, StreamCount
UserCount, StreamCount, FillState, get_fill_state, installation_epoch
from zerver.models import Realm, UserProfile, Message, Stream, Recipient, \
get_user_profile_by_email, get_client
@@ -24,18 +25,14 @@ class AnalyticsTestCase(TestCase):
TIME_ZERO = datetime(2042, 3, 14).replace(tzinfo=timezone.utc)
TIME_LAST_HOUR = TIME_ZERO - HOUR
count_stat = CountStat('test stat', ZerverCountQuery(Recipient, UserCount, 'select 0'),
{}, 'hour', 'hour')
def setUp(self):
# type: () -> None
self.default_realm = Realm.objects.create(domain='analytics.test', name='Realm Test',
date_created=self.TIME_ZERO - 2*self.DAY)
def process_last_hour(self, stat):
# type: (CountStat) -> None
# The last two arguments below are eventually passed as the first and
# last arguments of lib.interval.timeinterval_range, which is an
# inclusive range.
process_count_stat(stat, self.TIME_ZERO, self.TIME_ZERO)
# Lightweight creation of users, streams, and messages
def create_user(self, email, **kwargs):
# type: (str, **Any) -> UserProfile
@@ -102,7 +99,7 @@ class TestUpdateAnalyticsCounts(AnalyticsTestCase):
self.create_stream(name='stream3')
# run do_pull_from_zerver
self.process_last_hour(stat)
do_fill_count_stat_at_hour(stat, self.TIME_ZERO)
# check analytics_* values are correct
self.assertCountEquals(RealmCount, 'test_stat_write', 3)
@@ -117,21 +114,57 @@ class TestUpdateAnalyticsCounts(AnalyticsTestCase):
self.create_message(user1, recipient)
# run command
self.process_last_hour(stat)
do_fill_count_stat_at_hour(stat, self.TIME_ZERO)
usercount_row = UserCount.objects.filter(realm=self.default_realm, interval='hour',
property='test_messages_sent').values_list(
'value', flat=True)[0]
assert (usercount_row == 1)
# run command with dates before message creation
process_count_stat(stat, range_start=self.TIME_ZERO - 2*self.HOUR,
range_end=self.TIME_LAST_HOUR)
# run command with date before message creation
do_fill_count_stat_at_hour(stat, self.TIME_LAST_HOUR)
# check no earlier rows created, old ones still there
self.assertFalse(UserCount.objects.filter(end_time__lt = self.TIME_ZERO - 2*self.HOUR).exists())
self.assertFalse(UserCount.objects.filter(end_time__lt = self.TIME_LAST_HOUR).exists())
self.assertCountEquals(UserCount, 'test_messages_sent', 1, user = user1)
class TestProcessCountStat(AnalyticsTestCase):
def assertFillStateEquals(self, end_time, state = FillState.DONE, property = None):
# type: (datetime, int, Optional[text_type]) -> None
if property is None:
property = self.count_stat.property
fill_state = get_fill_state(property)
self.assertEqual(fill_state['end_time'], end_time)
self.assertEqual(fill_state['state'], state)
def test_process_stat(self):
# type: () -> None
# process new stat
current_time = installation_epoch() + self.HOUR
process_count_stat(self.count_stat, current_time)
self.assertFillStateEquals(current_time)
self.assertEqual(InstallationCount.objects.filter(property = self.count_stat.property,
interval = 'hour').count(), 1)
# dirty stat
FillState.objects.filter(property=self.count_stat.property).update(state=FillState.STARTED)
process_count_stat(self.count_stat, current_time)
self.assertFillStateEquals(current_time)
self.assertEqual(InstallationCount.objects.filter(property = self.count_stat.property,
interval = 'hour').count(), 1)
# clean stat, no update
process_count_stat(self.count_stat, current_time)
self.assertFillStateEquals(current_time)
self.assertEqual(InstallationCount.objects.filter(property = self.count_stat.property,
interval = 'hour').count(), 1)
# clean stat, with update
current_time = current_time + self.HOUR
process_count_stat(self.count_stat, current_time)
self.assertFillStateEquals(current_time)
self.assertEqual(InstallationCount.objects.filter(property = self.count_stat.property,
interval = 'hour').count(), 2)
# test users added in last hour
def test_add_new_users(self):
# type: () -> None
@@ -145,32 +178,11 @@ class TestProcessCountStat(AnalyticsTestCase):
self.create_user('email3', date_joined=self.TIME_ZERO - 2*self.HOUR)
# check if user added before the hour is not included
self.process_last_hour(stat)
do_fill_count_stat_at_hour(stat, self.TIME_ZERO)
# do_update is writing the stat.property to all zerver tables
self.assertCountEquals(RealmCount, 'add_new_user_test', 2)
# test if process count does nothing if count already processed
def test_process_count(self):
# type: () -> None
# add some active and inactive users that are human
self.create_user('email1', is_bot=False, is_active=False)
self.create_user('email2', is_bot=False, is_active=False)
self.create_user('email3', is_bot=False, is_active=True)
# run stat to pull active humans
stat = CountStat('active_humans', zerver_count_user_by_realm,
{'is_bot': False, 'is_active': True}, 'hour', 'hour')
self.process_last_hour(stat)
self.assertCountEquals(RealmCount, 'active_humans', 1)
# run command again
self.process_last_hour(stat)
# check that row is same as before
self.assertCountEquals(RealmCount, 'active_humans', 1)
def test_do_aggregate(self):
# type: () -> None
@@ -187,7 +199,7 @@ class TestProcessCountStat(AnalyticsTestCase):
self.create_message(user1, recipient)
# run command
self.process_last_hour(stat)
do_fill_count_stat_at_hour(stat, self.TIME_ZERO)
# check no rows for hour interval on usercount granularity
self.assertFalse(UserCount.objects.filter(realm=self.default_realm, interval='hour').exists())
@@ -208,38 +220,24 @@ class TestProcessCountStat(AnalyticsTestCase):
self.create_user('email', realm=realm)
# run count prior to realm creation
process_count_stat(stat, range_start=self.TIME_ZERO - 2*self.HOUR,
range_end=self.TIME_LAST_HOUR)
do_fill_count_stat_at_hour(stat, self.TIME_LAST_HOUR)
self.assertFalse(RealmCount.objects.filter(realm=realm).exists())
def test_empty_counts_in_realm(self):
# type: () -> None
# test that rows with empty counts are returned if realm exists
stat = CountStat('test_active_humans', zerver_count_user_by_realm,
{'is_bot': False, 'is_active': True}, 'hour', 'hour')
self.create_user('email')
process_count_stat(stat, range_start=self.TIME_ZERO - 2*self.HOUR,
range_end=self.TIME_ZERO)
self.assertCountEquals(RealmCount, 'test_active_humans', 0, end_time = self.TIME_ZERO - 2*self.HOUR)
self.assertCountEquals(RealmCount, 'test_active_humans', 0, end_time = self.TIME_LAST_HOUR)
self.assertCountEquals(RealmCount, 'test_active_humans', 1, end_time = self.TIME_ZERO)
do_fill_count_stat_at_hour(stat, self.TIME_ZERO)
self.assertCountEquals(RealmCount, 'test_active_humans', 0)
def test_empty_message_aggregates(self):
# type: () -> None
# test that we write empty rows to realmcount in the event that we
# have no messages and no users
stat = COUNT_STATS['messages_sent']
process_count_stat(stat, range_start=self.TIME_ZERO - 2 * self.HOUR,
range_end=self.TIME_ZERO)
self.assertCountEquals(RealmCount, 'messages_sent', 0, end_time=self.TIME_ZERO - 2 * self.HOUR)
self.assertCountEquals(RealmCount, 'messages_sent', 0, end_time=self.TIME_LAST_HOUR)
self.assertCountEquals(RealmCount, 'messages_sent', 0, end_time=self.TIME_ZERO)
do_fill_count_stat_at_hour(stat, self.TIME_ZERO)
self.assertCountEquals(RealmCount, 'messages_sent', 0)
class TestAggregates(AnalyticsTestCase):
pass
@@ -259,7 +257,7 @@ class TestXByYQueries(AnalyticsTestCase):
self.create_message(user, recipient = recipient)
# run command
self.process_last_hour(stat)
do_fill_count_stat_at_hour(stat, self.TIME_ZERO)
self.assertCountEquals(StreamCount, 'test_messages_to_stream', 1)
@@ -278,7 +276,7 @@ class TestCountStats(AnalyticsTestCase):
self.create_user('email3-human', is_bot=False)
for stat in stats:
self.process_last_hour(stat)
do_fill_count_stat_at_hour(stat, self.TIME_ZERO)
self.assertCountEquals(RealmCount, 'test_active_humans', 1)
self.assertCountEquals(RealmCount, 'test_active_bots', 2)