queue_processors: Gather statistics on queue worker operations.

This commit is contained in:
Mateusz Mandera
2020-03-18 20:48:49 +01:00
committed by Tim Abbott
parent f65e6d0d94
commit 5252b081bd
5 changed files with 76 additions and 2 deletions

View File

@@ -164,6 +164,13 @@ class zulip::base {
mode => '0640',
}
file { '/var/log/zulip/queue_stats':
ensure => 'directory',
owner => 'zulip',
group => 'zulip',
mode => '0640',
}
file { "${zulip::common::nagios_plugins_dir}/zulip_base":
require => Package[$zulip::common::nagios_plugins],
recurse => true,

View File

@@ -175,6 +175,9 @@ class SimpleQueueClient:
self.ensure_queue(queue_name, opened)
return messages
def queue_size(self) -> int:
return len(self.channel._pending_events)
def start_consuming(self) -> None:
self.channel.start_consuming()

View File

@@ -78,6 +78,9 @@ class WorkerTest(ZulipTestCase):
return events
def queue_size(self) -> int:
return len(self.queue)
@override_settings(SLOW_QUERY_LOGS_STREAM="errors")
def test_slow_queries_worker(self) -> None:
error_bot = get_system_bot(settings.ERROR_BOT)

View File

@@ -1,6 +1,6 @@
# Documented in https://zulip.readthedocs.io/en/latest/subsystems/queuing.html
from abc import ABC, abstractmethod
from typing import Any, Callable, Dict, List, Mapping, Optional, cast, Tuple, TypeVar, Type
from typing import Any, Callable, Dict, List, Mapping, MutableSequence, Optional, cast, Tuple, TypeVar, Type
import copy
import signal
@@ -47,7 +47,7 @@ from zerver.lib.remote_server import PushNotificationBouncerRetryLaterError
import os
import ujson
from collections import defaultdict
from collections import defaultdict, deque
import email
import time
import datetime
@@ -115,12 +115,48 @@ def retry_send_email_failures(
class QueueProcessingWorker(ABC):
queue_name = None # type: str
CONSUME_ITERATIONS_BEFORE_UPDATE_STATS_NUM = 50
def __init__(self) -> None:
self.q = None # type: SimpleQueueClient
if self.queue_name is None:
raise WorkerDeclarationException("Queue worker declared without queue_name")
self.initialize_statistics()
def initialize_statistics(self) -> None:
self.queue_last_emptied_timestamp = time.time()
self.consumed_since_last_emptied = 0
self.recent_consume_times = deque(maxlen=50) # type: MutableSequence[Tuple[int, float]]
self.consume_interation_counter = 0
self.update_statistics(0)
def update_statistics(self, remaining_queue_size: int) -> None:
total_seconds = sum([seconds for _, seconds in self.recent_consume_times])
total_events = sum([events_number for events_number, _ in self.recent_consume_times])
if total_events == 0:
recent_average_consume_time = None
else:
recent_average_consume_time = total_seconds / total_events
stats_dict = dict(
update_time=time.time(),
recent_average_consume_time=recent_average_consume_time,
current_queue_size=remaining_queue_size,
queue_last_emptied_timestamp=self.queue_last_emptied_timestamp,
consumed_since_last_emptied=self.consumed_since_last_emptied,
)
os.makedirs(settings.QUEUE_STATS_DIR, exist_ok=True)
fname = '%s.stats' % (self.queue_name,)
fn = os.path.join(settings.QUEUE_STATS_DIR, fname)
with lockfile(fn + '.lock'):
tmp_fn = fn + '.tmp'
with open(tmp_fn, 'w') as f:
ujson.dump(stats_dict, f, indent=2)
os.rename(tmp_fn, fn)
@abstractmethod
def consume(self, data: Dict[str, Any]) -> None:
pass
@@ -128,13 +164,35 @@ class QueueProcessingWorker(ABC):
def do_consume(self, consume_func: Callable[[List[Dict[str, Any]]], None],
events: List[Dict[str, Any]]) -> None:
try:
time_start = time.time()
consume_func(events)
consume_time_seconds = time.time() - time_start # type: Optional[float]
self.consumed_since_last_emptied += len(events)
except Exception:
self._handle_consume_exception(events)
consume_time_seconds = None
finally:
flush_per_request_caches()
reset_queries()
if consume_time_seconds is not None:
self.recent_consume_times.append((len(events), consume_time_seconds))
if self.q is not None:
remaining_queue_size = self.q.queue_size()
else:
remaining_queue_size = 0
if remaining_queue_size == 0:
self.queue_last_emptied_timestamp = time.time()
self.consumed_since_last_emptied = 0
self.consume_interation_counter += 1
if self.consume_interation_counter >= self.CONSUME_ITERATIONS_BEFORE_UPDATE_STATS_NUM:
self.consume_interation_counter = 0
self.update_statistics(remaining_queue_size)
def consume_wrapper(self, data: Dict[str, Any]) -> None:
consume_func = lambda events: self.consume(events[0])
self.do_consume(consume_func, [data])
@@ -159,6 +217,7 @@ class QueueProcessingWorker(ABC):
self.q = SimpleQueueClient()
def start(self) -> None:
self.initialize_statistics()
self.q.register_json_consumer(self.queue_name, self.consume_wrapper)
self.q.start_consuming()
@@ -170,6 +229,7 @@ class LoopQueueProcessingWorker(QueueProcessingWorker):
sleep_only_if_empty = True
def start(self) -> None: # nocoverage
self.initialize_statistics()
while True:
events = self.q.drain_queue(self.queue_name, json=True)
self.do_consume(self.consume_batch, events)

View File

@@ -656,6 +656,7 @@ EMAIL_CONTENT_LOG_PATH = zulip_path("/var/log/zulip/email_content.log")
LDAP_LOG_PATH = zulip_path("/var/log/zulip/ldap.log")
LDAP_SYNC_LOG_PATH = zulip_path("/var/log/zulip/sync_ldap_user_data.log")
QUEUE_ERROR_DIR = zulip_path("/var/log/zulip/queue_error")
QUEUE_STATS_DIR = zulip_path("/var/log/zulip/queue_stats")
DIGEST_LOG_PATH = zulip_path("/var/log/zulip/digest.log")
ANALYTICS_LOG_PATH = zulip_path("/var/log/zulip/analytics.log")
ANALYTICS_LOCK_DIR = zulip_path("/home/zulip/deployments/analytics-lock-dir")