mirror of
				https://github.com/zulip/zulip.git
				synced 2025-11-03 21:43:21 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			79 lines
		
	
	
		
			2.3 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			79 lines
		
	
	
		
			2.3 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
#!/usr/bin/env python3
 | 
						|
import argparse
 | 
						|
import os
 | 
						|
import re
 | 
						|
import subprocess
 | 
						|
import sys
 | 
						|
import time
 | 
						|
from collections import defaultdict
 | 
						|
 | 
						|
ZULIP_PATH = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
 | 
						|
sys.path.append(ZULIP_PATH)
 | 
						|
from scripts.lib.check_rabbitmq_queue import normal_queues
 | 
						|
from scripts.lib.zulip_tools import (
 | 
						|
    atomic_nagios_write,
 | 
						|
    get_config,
 | 
						|
    get_config_file,
 | 
						|
    get_tornado_ports,
 | 
						|
)
 | 
						|
 | 
						|
if "USER" in os.environ and os.environ["USER"] not in ["root", "rabbitmq"]:
 | 
						|
    print("This script must be run as the root or rabbitmq user")
 | 
						|
 | 
						|
 | 
						|
parser = argparse.ArgumentParser()
 | 
						|
parser.parse_args()
 | 
						|
 | 
						|
config_file = get_config_file()
 | 
						|
TORNADO_PROCESSES = len(get_tornado_ports(config_file))
 | 
						|
 | 
						|
output = subprocess.check_output(["/usr/sbin/rabbitmqctl", "list_consumers"], text=True)
 | 
						|
 | 
						|
consumers: dict[str, int] = defaultdict(int)
 | 
						|
 | 
						|
queues = {
 | 
						|
    *normal_queues,
 | 
						|
    # All tornado queues get grouped into this, even if sharded
 | 
						|
    "notify_tornado",
 | 
						|
}
 | 
						|
 | 
						|
for queue_name in queues:
 | 
						|
    queue_name = queue_name.strip()
 | 
						|
    consumers[queue_name] = 0
 | 
						|
 | 
						|
for line in output.split("\n"):
 | 
						|
    parts = line.split("\t")
 | 
						|
    if len(parts) >= 2:
 | 
						|
        queue_name = parts[0]
 | 
						|
        if queue_name.startswith("notify_tornado_"):
 | 
						|
            queue_name = "notify_tornado"
 | 
						|
 | 
						|
        # Collapse sharded queues into a single queue for this check.
 | 
						|
        queue_name = re.sub(r"_shard(\d+)", "", queue_name)
 | 
						|
        consumers[queue_name] += 1
 | 
						|
 | 
						|
now = int(time.time())
 | 
						|
 | 
						|
for queue_name, count in consumers.items():
 | 
						|
    target_count = 1
 | 
						|
    if queue_name == "notify_tornado":
 | 
						|
        target_count = TORNADO_PROCESSES
 | 
						|
    elif queue_name == "missedmessage_mobile_notifications":
 | 
						|
        target_count = int(
 | 
						|
            get_config(config_file, "application_server", "mobile_notification_shards", "1")
 | 
						|
        )
 | 
						|
    elif queue_name == "user_activity":
 | 
						|
        target_count = int(
 | 
						|
            get_config(config_file, "application_server", "user_activity_shards", "1")
 | 
						|
        )
 | 
						|
    else:
 | 
						|
        target_count = int(
 | 
						|
            get_config(config_file, "application_server", f"{queue_name}_workers", "1")
 | 
						|
        )
 | 
						|
 | 
						|
    atomic_nagios_write(
 | 
						|
        "check-rabbitmq-consumers-" + queue_name,
 | 
						|
        "critical" if count < target_count else "ok",
 | 
						|
        f"queue {queue_name} has {count} consumers, needs {target_count}",
 | 
						|
    )
 |