mirror of
				https://github.com/zulip/zulip.git
				synced 2025-11-04 05:53:43 +00:00 
			
		
		
		
	analytics: Add performance and transaction logging to counts.py.
For each database query made by an analytics function, log time spent and the number of rows changed to var/logs/analytics.log. In the spirit of write ahead logging, for each (stat, end_time) update, log the start and end of the "transaction", as well as time spent.
This commit is contained in:
		@@ -1,5 +1,6 @@
 | 
				
			|||||||
from django.db import connection, models
 | 
					from django.db import connection, models
 | 
				
			||||||
from django.utils import timezone
 | 
					from django.utils import timezone
 | 
				
			||||||
 | 
					from django.conf import settings
 | 
				
			||||||
from datetime import timedelta, datetime
 | 
					from datetime import timedelta, datetime
 | 
				
			||||||
 | 
					
 | 
				
			||||||
from analytics.models import InstallationCount, RealmCount, \
 | 
					from analytics.models import InstallationCount, RealmCount, \
 | 
				
			||||||
@@ -10,6 +11,21 @@ from zerver.lib.timestamp import floor_to_day
 | 
				
			|||||||
from typing import Any, Optional, Type
 | 
					from typing import Any, Optional, Type
 | 
				
			||||||
from six import text_type
 | 
					from six import text_type
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import logging
 | 
				
			||||||
 | 
					import time
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					## Logging setup ##
 | 
				
			||||||
 | 
					log_format = '%(asctime)s %(levelname)-8s %(message)s'
 | 
				
			||||||
 | 
					logging.basicConfig(format=log_format)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					formatter = logging.Formatter(log_format)
 | 
				
			||||||
 | 
					file_handler = logging.FileHandler(settings.ANALYTICS_LOG_PATH)
 | 
				
			||||||
 | 
					file_handler.setFormatter(formatter)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					logger = logging.getLogger("zulip.management")
 | 
				
			||||||
 | 
					logger.setLevel(logging.INFO)
 | 
				
			||||||
 | 
					logger.addHandler(file_handler)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
# First post office in Boston
 | 
					# First post office in Boston
 | 
				
			||||||
MIN_TIME = datetime(1639, 1, 1, 0, 0, 0, tzinfo=timezone.utc)
 | 
					MIN_TIME = datetime(1639, 1, 1, 0, 0, 0, tzinfo=timezone.utc)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -46,11 +62,14 @@ def process_count_stat(stat, fill_to_time):
 | 
				
			|||||||
        FillState.objects.create(property = stat.property,
 | 
					        FillState.objects.create(property = stat.property,
 | 
				
			||||||
                                 end_time = currently_filled,
 | 
					                                 end_time = currently_filled,
 | 
				
			||||||
                                 state = FillState.DONE)
 | 
					                                 state = FillState.DONE)
 | 
				
			||||||
 | 
					        logger.info("INITIALIZED %s %s" % (stat.property, currently_filled))
 | 
				
			||||||
    elif fill_state['state'] == FillState.STARTED:
 | 
					    elif fill_state['state'] == FillState.STARTED:
 | 
				
			||||||
 | 
					        logger.info("UNDO START %s %s" % (stat.property, fill_state['end_time']))
 | 
				
			||||||
        do_delete_count_stat_at_hour(stat, fill_state['end_time'])
 | 
					        do_delete_count_stat_at_hour(stat, fill_state['end_time'])
 | 
				
			||||||
        currently_filled = fill_state['end_time'] - timedelta(hours = 1)
 | 
					        currently_filled = fill_state['end_time'] - timedelta(hours = 1)
 | 
				
			||||||
        FillState.objects.filter(property = stat.property). \
 | 
					        FillState.objects.filter(property = stat.property). \
 | 
				
			||||||
            update(end_time = currently_filled, state = FillState.DONE)
 | 
					            update(end_time = currently_filled, state = FillState.DONE)
 | 
				
			||||||
 | 
					        logger.info("UNDO DONE %s" % (stat.property,))
 | 
				
			||||||
    elif fill_state['state'] == FillState.DONE:
 | 
					    elif fill_state['state'] == FillState.DONE:
 | 
				
			||||||
        currently_filled = fill_state['end_time']
 | 
					        currently_filled = fill_state['end_time']
 | 
				
			||||||
    else:
 | 
					    else:
 | 
				
			||||||
@@ -58,11 +77,15 @@ def process_count_stat(stat, fill_to_time):
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    currently_filled = currently_filled + timedelta(hours = 1)
 | 
					    currently_filled = currently_filled + timedelta(hours = 1)
 | 
				
			||||||
    while currently_filled <= fill_to_time:
 | 
					    while currently_filled <= fill_to_time:
 | 
				
			||||||
 | 
					        logger.info("START %s %s %s" % (stat.property, stat.interval, currently_filled))
 | 
				
			||||||
 | 
					        start = time.time()
 | 
				
			||||||
        FillState.objects.filter(property = stat.property) \
 | 
					        FillState.objects.filter(property = stat.property) \
 | 
				
			||||||
                     .update(end_time = currently_filled, state = FillState.STARTED)
 | 
					                     .update(end_time = currently_filled, state = FillState.STARTED)
 | 
				
			||||||
        do_fill_count_stat_at_hour(stat, currently_filled)
 | 
					        do_fill_count_stat_at_hour(stat, currently_filled)
 | 
				
			||||||
        FillState.objects.filter(property = stat.property).update(state = FillState.DONE)
 | 
					        FillState.objects.filter(property = stat.property).update(state = FillState.DONE)
 | 
				
			||||||
 | 
					        end = time.time()
 | 
				
			||||||
        currently_filled = currently_filled + timedelta(hours = 1)
 | 
					        currently_filled = currently_filled + timedelta(hours = 1)
 | 
				
			||||||
 | 
					        logger.info("DONE %s %s (%dms)" % (stat.property, stat.interval, (end-start)*1000))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
# We assume end_time is on an hour boundary, and is timezone aware.
 | 
					# We assume end_time is on an hour boundary, and is timezone aware.
 | 
				
			||||||
# It is the caller's responsibility to enforce this!
 | 
					# It is the caller's responsibility to enforce this!
 | 
				
			||||||
@@ -114,7 +137,10 @@ def do_aggregate_to_summary_table(stat, end_time, interval):
 | 
				
			|||||||
               'property' : stat.property,
 | 
					               'property' : stat.property,
 | 
				
			||||||
               'interval' : interval}
 | 
					               'interval' : interval}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        start = time.time()
 | 
				
			||||||
        cursor.execute(realmcount_query, {'end_time': end_time})
 | 
					        cursor.execute(realmcount_query, {'end_time': end_time})
 | 
				
			||||||
 | 
					        end = time.time()
 | 
				
			||||||
 | 
					        logger.info("%s RealmCount aggregation (%dms/%sr)" % (stat.property, (end-start)*1000, cursor.rowcount))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # Aggregate into InstallationCount
 | 
					    # Aggregate into InstallationCount
 | 
				
			||||||
    installationcount_query = """
 | 
					    installationcount_query = """
 | 
				
			||||||
@@ -132,7 +158,10 @@ def do_aggregate_to_summary_table(stat, end_time, interval):
 | 
				
			|||||||
    """ % {'property': stat.property,
 | 
					    """ % {'property': stat.property,
 | 
				
			||||||
           'interval': interval}
 | 
					           'interval': interval}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    start = time.time()
 | 
				
			||||||
    cursor.execute(installationcount_query, {'end_time': end_time})
 | 
					    cursor.execute(installationcount_query, {'end_time': end_time})
 | 
				
			||||||
 | 
					    end = time.time()
 | 
				
			||||||
 | 
					    logger.info("%s InstallationCount aggregation (%dms/%sr)" % (stat.property, (end-start)*1000, cursor.rowcount))
 | 
				
			||||||
    cursor.close()
 | 
					    cursor.close()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
## methods that hit the prod databases directly
 | 
					## methods that hit the prod databases directly
 | 
				
			||||||
@@ -153,7 +182,10 @@ def do_pull_from_zerver(stat, start_time, end_time, interval):
 | 
				
			|||||||
                                              'interval' : interval,
 | 
					                                              'interval' : interval,
 | 
				
			||||||
                                              'join_args' : join_args}
 | 
					                                              'join_args' : join_args}
 | 
				
			||||||
    cursor = connection.cursor()
 | 
					    cursor = connection.cursor()
 | 
				
			||||||
 | 
					    start = time.time()
 | 
				
			||||||
    cursor.execute(query_, {'time_start': start_time, 'time_end': end_time})
 | 
					    cursor.execute(query_, {'time_start': start_time, 'time_end': end_time})
 | 
				
			||||||
 | 
					    end = time.time()
 | 
				
			||||||
 | 
					    logger.info("%s do_pull_from_zerver (%dms/%sr)" % (stat.property, (end-start)*1000, cursor.rowcount))
 | 
				
			||||||
    cursor.close()
 | 
					    cursor.close()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
count_user_by_realm_query = """
 | 
					count_user_by_realm_query = """
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -843,6 +843,7 @@ ZULIP_PATHS = [
 | 
				
			|||||||
    ("QUEUE_ERROR_DIR", "/var/log/zulip/queue_error"),
 | 
					    ("QUEUE_ERROR_DIR", "/var/log/zulip/queue_error"),
 | 
				
			||||||
    ("STATS_DIR", "/home/zulip/stats"),
 | 
					    ("STATS_DIR", "/home/zulip/stats"),
 | 
				
			||||||
    ("DIGEST_LOG_PATH", "/var/log/zulip/digest.log"),
 | 
					    ("DIGEST_LOG_PATH", "/var/log/zulip/digest.log"),
 | 
				
			||||||
 | 
					    ("ANALYTICS_LOG_PATH", "/var/log/zulip/analytics.log"),
 | 
				
			||||||
    ]
 | 
					    ]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
# The Event log basically logs most significant database changes,
 | 
					# The Event log basically logs most significant database changes,
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user