mirror of
https://github.com/zulip/zulip.git
synced 2025-11-03 05:23:35 +00:00
queue: Drop register_json_consumer / json_drain_queue interface.
Now that all callsites use the same interface, drop the now-unused ones, and their tests.
This commit is contained in:
committed by
Tim Abbott
parent
5477b9d9a1
commit
c2132a4f9c
@@ -3,8 +3,7 @@ import random
|
||||
import threading
|
||||
import time
|
||||
from collections import defaultdict
|
||||
from contextlib import contextmanager
|
||||
from typing import Any, Callable, Dict, Iterator, List, Mapping, Optional, Set
|
||||
from typing import Any, Callable, Dict, List, Mapping, Optional, Set
|
||||
|
||||
import orjson
|
||||
import pika
|
||||
@@ -138,72 +137,6 @@ class SimpleQueueClient:
|
||||
self._reconnect()
|
||||
self.publish(queue_name, data)
|
||||
|
||||
def register_consumer(self, queue_name: str, consumer: Consumer) -> None:
|
||||
def wrapped_consumer(ch: BlockingChannel,
|
||||
method: Basic.Deliver,
|
||||
properties: pika.BasicProperties,
|
||||
body: bytes) -> None:
|
||||
try:
|
||||
consumer(ch, method, properties, body)
|
||||
ch.basic_ack(delivery_tag=method.delivery_tag)
|
||||
except Exception as e:
|
||||
ch.basic_nack(delivery_tag=method.delivery_tag)
|
||||
raise e
|
||||
|
||||
self.consumers[queue_name].add(wrapped_consumer)
|
||||
self.ensure_queue(
|
||||
queue_name,
|
||||
lambda channel: channel.basic_consume(
|
||||
queue_name,
|
||||
wrapped_consumer,
|
||||
consumer_tag=self._generate_ctag(queue_name),
|
||||
),
|
||||
)
|
||||
|
||||
def register_json_consumer(self, queue_name: str,
|
||||
callback: Callable[[Dict[str, Any]], None]) -> None:
|
||||
def wrapped_callback(ch: BlockingChannel,
|
||||
method: Basic.Deliver,
|
||||
properties: pika.BasicProperties,
|
||||
body: bytes) -> None:
|
||||
callback(orjson.loads(body))
|
||||
self.register_consumer(queue_name, wrapped_callback)
|
||||
|
||||
@contextmanager
|
||||
def drain_queue(self, queue_name: str) -> Iterator[List[bytes]]:
|
||||
"""As a contextmanger, yields all messages in the desired queue.
|
||||
|
||||
NACKs all of the messages if the block throws an exception,
|
||||
ACKs them otherwise.
|
||||
"""
|
||||
messages = []
|
||||
max_tag: Optional[int] = None
|
||||
|
||||
def opened(channel: BlockingChannel) -> None:
|
||||
nonlocal max_tag
|
||||
while True:
|
||||
(meta, _, message) = channel.basic_get(queue_name)
|
||||
if message is None:
|
||||
break
|
||||
max_tag = meta.delivery_tag
|
||||
messages.append(message)
|
||||
|
||||
self.ensure_queue(queue_name, opened)
|
||||
assert self.channel is not None
|
||||
try:
|
||||
yield messages
|
||||
if max_tag:
|
||||
self.channel.basic_ack(max_tag, multiple=True)
|
||||
except Exception:
|
||||
if max_tag:
|
||||
self.channel.basic_nack(max_tag, multiple=True)
|
||||
raise
|
||||
|
||||
@contextmanager
|
||||
def json_drain_queue(self, queue_name: str) -> Iterator[List[Dict[str, Any]]]:
|
||||
with self.drain_queue(queue_name) as binary_messages:
|
||||
yield list(map(orjson.loads, binary_messages))
|
||||
|
||||
def start_json_consumer(self,
|
||||
queue_name: str,
|
||||
callback: Callable[[List[Dict[str, Any]]], None],
|
||||
@@ -247,12 +180,6 @@ class SimpleQueueClient:
|
||||
assert self.channel is not None
|
||||
return self.channel.get_waiting_message_count() + len(self.channel._pending_events)
|
||||
|
||||
def start_consuming(self) -> None:
|
||||
assert self.channel is not None
|
||||
assert not self.is_consuming
|
||||
self.is_consuming = True
|
||||
self.channel.start_consuming()
|
||||
|
||||
def stop_consuming(self) -> None:
|
||||
assert self.channel is not None
|
||||
assert self.is_consuming
|
||||
|
||||
@@ -20,43 +20,6 @@ class TestTornadoQueueClient(ZulipTestCase):
|
||||
|
||||
|
||||
class TestQueueImplementation(ZulipTestCase):
|
||||
@override_settings(USING_RABBITMQ=True)
|
||||
def test_queue_basics(self) -> None:
|
||||
queue_client = get_queue_client()
|
||||
queue_client.publish("test_suite", b"test_event\x00\xff")
|
||||
|
||||
with queue_client.drain_queue("test_suite") as result:
|
||||
self.assertEqual(result, [b"test_event\x00\xff"])
|
||||
|
||||
@override_settings(USING_RABBITMQ=True)
|
||||
def test_queue_basics_json(self) -> None:
|
||||
queue_json_publish("test_suite", {"event": "my_event"})
|
||||
|
||||
queue_client = get_queue_client()
|
||||
with queue_client.json_drain_queue("test_suite") as result:
|
||||
self.assertEqual(len(result), 1)
|
||||
self.assertEqual(result[0]['event'], 'my_event')
|
||||
|
||||
@override_settings(USING_RABBITMQ=True)
|
||||
def test_queue_basics_json_error(self) -> None:
|
||||
queue_json_publish("test_suite", {"event": "my_event"})
|
||||
|
||||
queue_client = get_queue_client()
|
||||
raised = False
|
||||
try:
|
||||
with queue_client.json_drain_queue("test_suite") as result:
|
||||
self.assertEqual(len(result), 1)
|
||||
self.assertEqual(result[0]['event'], 'my_event')
|
||||
raise ValueError()
|
||||
except ValueError:
|
||||
raised = True
|
||||
assert raised
|
||||
|
||||
# Still in the queue to be fetched
|
||||
with queue_client.json_drain_queue("test_suite") as result:
|
||||
self.assertEqual(len(result), 1)
|
||||
self.assertEqual(result[0]['event'], 'my_event')
|
||||
|
||||
@override_settings(USING_RABBITMQ=True)
|
||||
def test_register_consumer(self) -> None:
|
||||
output = []
|
||||
|
||||
@@ -3,8 +3,7 @@ import os
|
||||
import smtplib
|
||||
import time
|
||||
from collections import defaultdict
|
||||
from contextlib import contextmanager
|
||||
from typing import Any, Callable, Dict, Iterator, List, Mapping, Optional
|
||||
from typing import Any, Callable, Dict, List, Mapping, Optional
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import orjson
|
||||
@@ -36,30 +35,11 @@ Event = Dict[str, Any]
|
||||
class WorkerTest(ZulipTestCase):
|
||||
class FakeClient:
|
||||
def __init__(self) -> None:
|
||||
self.consumers: Dict[str, Callable[[Dict[str, Any]], None]] = {}
|
||||
self.queues: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
|
||||
|
||||
def enqueue(self, queue_name: str, data: Dict[str, Any]) -> None:
|
||||
self.queues[queue_name].append(data)
|
||||
|
||||
def register_json_consumer(self,
|
||||
queue_name: str,
|
||||
callback: Callable[[Dict[str, Any]], None]) -> None:
|
||||
self.consumers[queue_name] = callback
|
||||
|
||||
def start_consuming(self) -> None:
|
||||
for queue_name in self.queues.keys():
|
||||
callback = self.consumers[queue_name]
|
||||
for data in self.queues[queue_name]:
|
||||
callback(data)
|
||||
self.queues[queue_name] = []
|
||||
|
||||
@contextmanager
|
||||
def json_drain_queue(self, queue_name: str) -> Iterator[List[Event]]:
|
||||
events = self.queues[queue_name]
|
||||
self.queues[queue_name] = []
|
||||
yield events
|
||||
|
||||
def start_json_consumer(self,
|
||||
queue_name: str,
|
||||
callback: Callable[[List[Dict[str, Any]]], None],
|
||||
|
||||
Reference in New Issue
Block a user