queue_rate: Output to CSV, and run multiple prefetch values.

This commit is contained in:
Alex Vandiver
2021-11-10 02:02:01 +00:00
committed by Tim Abbott
parent 49ad188449
commit 800e38016a
2 changed files with 50 additions and 33 deletions

View File

@@ -1067,7 +1067,7 @@ class NoopWorker(QueueProcessingWorker):
class BatchNoopWorker(LoopQueueProcessingWorker): class BatchNoopWorker(LoopQueueProcessingWorker):
"""Used to profile the queue processing framework, in zilencer's queue_rate.""" """Used to profile the queue processing framework, in zilencer's queue_rate."""
batch_size = 500 batch_size = 100
def __init__(self, max_consume: int = 1000, slow_queries: Sequence[int] = []) -> None: def __init__(self, max_consume: int = 1000, slow_queries: Sequence[int] = []) -> None:
self.consumed = 0 self.consumed = 0

View File

@@ -1,10 +1,11 @@
import csv
from timeit import timeit from timeit import timeit
from typing import Any from typing import Any, Union
from django.core.management.base import BaseCommand, CommandParser from django.core.management.base import BaseCommand, CommandParser
from zerver.lib.queue import SimpleQueueClient, queue_json_publish from zerver.lib.queue import SimpleQueueClient, queue_json_publish
from zerver.worker.queue_processors import BatchNoopWorker, NoopWorker, QueueProcessingWorker from zerver.worker.queue_processors import BatchNoopWorker, NoopWorker
class Command(BaseCommand): class Command(BaseCommand):
@@ -16,10 +17,12 @@ class Command(BaseCommand):
) )
parser.add_argument("--reps", help="Iterations of enqueue/dequeue", default=1, type=int) parser.add_argument("--reps", help="Iterations of enqueue/dequeue", default=1, type=int)
parser.add_argument("--batch", help="Enables batch dequeuing", action="store_true") parser.add_argument("--batch", help="Enables batch dequeuing", action="store_true")
parser.add_argument("--csv", help="Path to CSV output", default="rabbitmq-timings.csv")
parser.add_argument( parser.add_argument(
"--prefetch", "--prefetches",
help="Limits the prefetch size; RabbitMQ defaults to unbounded (0)", help="Limits the prefetch size; RabbitMQ defaults to unbounded (0)",
default=0, default=[0],
nargs="+",
type=int, type=int,
) )
parser.add_argument( parser.add_argument(
@@ -35,38 +38,52 @@ class Command(BaseCommand):
queue = SimpleQueueClient() queue = SimpleQueueClient()
queue_name = "noop_batch" if options["batch"] else "noop" queue_name = "noop_batch" if options["batch"] else "noop"
queue.ensure_queue(queue_name, lambda channel: channel.queue_purge("noop")) queue.ensure_queue(queue_name, lambda channel: channel.queue_purge("noop"))
count = options["count"] count = options["count"]
reps = options["reps"] reps = options["reps"]
worker: QueueProcessingWorker = NoopWorker(count, options["slow"]) with open(options["csv"], "w", newline="") as csvfile:
if options["batch"]: writer = csv.DictWriter(
worker = BatchNoopWorker(count, options["slow"]) csvfile, fieldnames=["Queue size", "Queue type", "Prefetch", "Rate"]
worker.ENABLE_TIMEOUTS = True
worker.setup()
assert worker.q is not None
assert worker.q.channel is not None
worker.q.channel.basic_qos(prefetch_count=options["prefetch"])
total_enqueue_time = 0.0
total_dequeue_time = 0.0
def one_rep() -> None:
nonlocal total_enqueue_time, total_dequeue_time
total_enqueue_time += timeit(
lambda: queue_json_publish(queue_name, {}),
number=count,
)
total_dequeue_time += timeit(
lambda: worker.start(),
number=1,
) )
writer.writeheader()
rate = lambda time, iterations: int(iterations / time) for prefetch in options["prefetches"]:
print(f"Queue size {count}, prefetch {prefetch}...")
worker: Union[NoopWorker, BatchNoopWorker] = NoopWorker(count, options["slow"])
if options["batch"]:
worker = BatchNoopWorker(count, options["slow"])
if prefetch > 0 and prefetch < worker.batch_size:
print(
f" Skipping, as prefetch {prefetch} is less than batch size {worker.batch_size}"
)
continue
worker.ENABLE_TIMEOUTS = True
worker.setup()
total_reps_time = timeit(one_rep, number=reps) assert worker.q is not None
if reps > 1: assert worker.q.channel is not None
print(f"Total rate per rep: {rate(total_reps_time, reps)} / sec") worker.q.channel.basic_qos(prefetch_count=prefetch)
print(f"Enqueue rate: {rate(total_enqueue_time, count * reps)} / sec") total_time = 0.0
print(f"Dequeue rate: {rate(total_dequeue_time, count * reps)} / sec") for i in range(1, reps + 1):
worker.consumed = 0
timeit(
lambda: queue_json_publish(queue_name, {}),
number=count,
)
duration = timeit(
lambda: worker.start(),
number=1,
)
print(f" {i}/{reps}: {count}/{duration}s = {count / duration}/s")
total_time += duration
writer.writerow(
{
"Queue size": count,
"Queue type": queue_name,
"Prefetch": prefetch,
"Rate": count / duration,
}
)
csvfile.flush()
print(f" Overall: {reps * count}/{total_time}s = {(reps * count) / total_time}/s")