mirror of
				https://github.com/zulip/zulip.git
				synced 2025-11-04 05:53:43 +00:00 
			
		
		
		
	This queue is used to things which definitionally may take longer than a request, so paging after 60s is rather aggressive. This is especially true because this queue has a very long tail of very slow tasks -- p99 of task time in this queue is 8.5s, while p99.9 is 197s. Raise the paging threshold to 15 minutes. While there are semi-user-facing tasks which use this queue (primarily marking messages as read), those being delayed for minutes is already a real possibility if they are stuck behind a large realm export -- and this is not a situation which should necessarily page, since it is not solvable by the administrator.
		
			
				
	
	
		
			192 lines
		
	
	
		
			6.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			192 lines
		
	
	
		
			6.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import json
 | 
						|
import os
 | 
						|
import re
 | 
						|
import subprocess
 | 
						|
import time
 | 
						|
from collections import defaultdict
 | 
						|
from typing import Any, DefaultDict, Dict, List
 | 
						|
 | 
						|
ZULIP_PATH = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
 | 
						|
 | 
						|
normal_queues = [
 | 
						|
    "deferred_work",
 | 
						|
    "digest_emails",
 | 
						|
    "email_mirror",
 | 
						|
    "email_senders",
 | 
						|
    "embed_links",
 | 
						|
    "embedded_bots",
 | 
						|
    "invites",
 | 
						|
    "missedmessage_emails",
 | 
						|
    "missedmessage_mobile_notifications",
 | 
						|
    "outgoing_webhooks",
 | 
						|
    "user_activity",
 | 
						|
    "user_activity_interval",
 | 
						|
    "user_presence",
 | 
						|
]
 | 
						|
 | 
						|
OK = 0
 | 
						|
WARNING = 1
 | 
						|
CRITICAL = 2
 | 
						|
UNKNOWN = 3
 | 
						|
 | 
						|
states = {
 | 
						|
    0: "OK",
 | 
						|
    1: "WARNING",
 | 
						|
    2: "CRITICAL",
 | 
						|
    3: "UNKNOWN",
 | 
						|
}
 | 
						|
 | 
						|
MAX_SECONDS_TO_CLEAR: DefaultDict[str, int] = defaultdict(
 | 
						|
    lambda: 30,
 | 
						|
    deferred_work=600,
 | 
						|
    digest_emails=1200,
 | 
						|
    missedmessage_mobile_notifications=120,
 | 
						|
    embed_links=60,
 | 
						|
)
 | 
						|
CRITICAL_SECONDS_TO_CLEAR: DefaultDict[str, int] = defaultdict(
 | 
						|
    lambda: 60,
 | 
						|
    deferred_work=900,
 | 
						|
    missedmessage_mobile_notifications=180,
 | 
						|
    digest_emails=1800,
 | 
						|
    embed_links=90,
 | 
						|
)
 | 
						|
 | 
						|
 | 
						|
def analyze_queue_stats(
 | 
						|
    queue_name: str, stats: Dict[str, Any], queue_count_rabbitmqctl: int
 | 
						|
) -> Dict[str, Any]:
 | 
						|
    now = int(time.time())
 | 
						|
    if stats == {}:
 | 
						|
        return dict(status=UNKNOWN, name=queue_name, message="invalid or no stats data")
 | 
						|
 | 
						|
    if now - stats["update_time"] > 180 and queue_count_rabbitmqctl > 10:
 | 
						|
        # Queue isn't updating the stats file and has some events in
 | 
						|
        # the backlog, it's likely stuck.
 | 
						|
        #
 | 
						|
        # TODO: There's an unlikely race condition here - if the queue
 | 
						|
        # was fully emptied and was idle due to no new events coming
 | 
						|
        # for over 180 seconds, suddenly gets a burst of events and
 | 
						|
        # this code runs exactly in the very small time window between
 | 
						|
        # those events popping up and the queue beginning to process
 | 
						|
        # the first one (which will refresh the stats file at the very
 | 
						|
        # start), we'll incorrectly return the CRITICAL status. The
 | 
						|
        # chance of that happening should be negligible because the queue
 | 
						|
        # worker should wake up immediately and log statistics before
 | 
						|
        # starting to process the first event.
 | 
						|
        return dict(
 | 
						|
            status=CRITICAL,
 | 
						|
            name=queue_name,
 | 
						|
            message="queue appears to be stuck, last update {}, queue size {}".format(
 | 
						|
                stats["update_time"], queue_count_rabbitmqctl
 | 
						|
            ),
 | 
						|
        )
 | 
						|
 | 
						|
    current_size = queue_count_rabbitmqctl
 | 
						|
    average_consume_time = stats["recent_average_consume_time"]
 | 
						|
    if average_consume_time is None:
 | 
						|
        # Queue just started; we can't effectively estimate anything.
 | 
						|
        #
 | 
						|
        # If the queue is stuck in this state and not processing
 | 
						|
        # anything, eventually the `update_time` rule above will fire.
 | 
						|
        return dict(status=OK, name=queue_name, message="")
 | 
						|
 | 
						|
    expected_time_to_clear_backlog = current_size * average_consume_time
 | 
						|
    if expected_time_to_clear_backlog > MAX_SECONDS_TO_CLEAR[queue_name]:
 | 
						|
        if expected_time_to_clear_backlog > CRITICAL_SECONDS_TO_CLEAR[queue_name]:
 | 
						|
            status = CRITICAL
 | 
						|
        else:
 | 
						|
            status = WARNING
 | 
						|
 | 
						|
        return dict(
 | 
						|
            status=status,
 | 
						|
            name=queue_name,
 | 
						|
            message=f"clearing the backlog will take too long: {expected_time_to_clear_backlog}s, size: {current_size}",
 | 
						|
        )
 | 
						|
 | 
						|
    return dict(status=OK, name=queue_name, message="")
 | 
						|
 | 
						|
 | 
						|
WARN_COUNT_THRESHOLD_DEFAULT = 10
 | 
						|
CRITICAL_COUNT_THRESHOLD_DEFAULT = 50
 | 
						|
 | 
						|
 | 
						|
def check_other_queues(queue_counts_dict: Dict[str, int]) -> List[Dict[str, Any]]:
 | 
						|
    """Do a simple queue size check for queues whose workers don't publish stats files."""
 | 
						|
 | 
						|
    results = []
 | 
						|
    for queue, count in queue_counts_dict.items():
 | 
						|
        if queue in normal_queues:
 | 
						|
            continue
 | 
						|
 | 
						|
        if count > CRITICAL_COUNT_THRESHOLD_DEFAULT:
 | 
						|
            results.append(dict(status=CRITICAL, name=queue, message=f"count critical: {count}"))
 | 
						|
        elif count > WARN_COUNT_THRESHOLD_DEFAULT:
 | 
						|
            results.append(dict(status=WARNING, name=queue, message=f"count warning: {count}"))
 | 
						|
        else:
 | 
						|
            results.append(dict(status=OK, name=queue, message=""))
 | 
						|
 | 
						|
    return results
 | 
						|
 | 
						|
 | 
						|
def check_rabbitmq_queues() -> None:
 | 
						|
    pattern = re.compile(r"(\w+)\t(\d+)\t(\d+)")
 | 
						|
    if "USER" in os.environ and os.environ["USER"] not in ["root", "rabbitmq"]:
 | 
						|
        print("This script must be run as the root or rabbitmq user")
 | 
						|
 | 
						|
    list_queues_output = subprocess.check_output(
 | 
						|
        ["/usr/sbin/rabbitmqctl", "list_queues", "name", "messages", "consumers"],
 | 
						|
        text=True,
 | 
						|
    )
 | 
						|
    queue_counts_rabbitmqctl = {}
 | 
						|
    queues_with_consumers = []
 | 
						|
    for line in list_queues_output.split("\n"):
 | 
						|
        line = line.strip()
 | 
						|
        m = pattern.match(line)
 | 
						|
        if m:
 | 
						|
            queue = m.group(1)
 | 
						|
            count = int(m.group(2))
 | 
						|
            consumers = int(m.group(3))
 | 
						|
            queue_counts_rabbitmqctl[queue] = count
 | 
						|
            if consumers > 0 and not queue.startswith("notify_tornado"):
 | 
						|
                queues_with_consumers.append(queue)
 | 
						|
 | 
						|
    queue_stats_dir = subprocess.check_output(
 | 
						|
        [os.path.join(ZULIP_PATH, "scripts/get-django-setting"), "QUEUE_STATS_DIR"],
 | 
						|
        text=True,
 | 
						|
    ).strip()
 | 
						|
    queue_stats: Dict[str, Dict[str, Any]] = {}
 | 
						|
    queues_to_check = set(normal_queues).intersection(set(queues_with_consumers))
 | 
						|
    for queue in queues_to_check:
 | 
						|
        fn = queue + ".stats"
 | 
						|
        file_path = os.path.join(queue_stats_dir, fn)
 | 
						|
        if not os.path.exists(file_path):
 | 
						|
            queue_stats[queue] = {}
 | 
						|
            continue
 | 
						|
 | 
						|
        with open(file_path) as f:
 | 
						|
            try:
 | 
						|
                queue_stats[queue] = json.load(f)
 | 
						|
            except json.decoder.JSONDecodeError:
 | 
						|
                queue_stats[queue] = {}
 | 
						|
 | 
						|
    results = []
 | 
						|
    for queue_name, stats in queue_stats.items():
 | 
						|
        results.append(analyze_queue_stats(queue_name, stats, queue_counts_rabbitmqctl[queue_name]))
 | 
						|
 | 
						|
    results.extend(check_other_queues(queue_counts_rabbitmqctl))
 | 
						|
 | 
						|
    status = max(result["status"] for result in results)
 | 
						|
 | 
						|
    now = int(time.time())
 | 
						|
 | 
						|
    if status > 0:
 | 
						|
        queue_error_template = "queue {} problem: {}:{}"
 | 
						|
        error_message = "; ".join(
 | 
						|
            queue_error_template.format(result["name"], states[result["status"]], result["message"])
 | 
						|
            for result in results
 | 
						|
            if result["status"] > 0
 | 
						|
        )
 | 
						|
        print(f"{now}|{status}|{states[status]}|{error_message}")
 | 
						|
    else:
 | 
						|
        print(f"{now}|{status}|{states[status]}|queues normal")
 |