tests: Move simulated_queue_client to the only test it is used in.

This commit is contained in:
Alex Vandiver
2021-11-15 11:53:35 -08:00
committed by Tim Abbott
parent 64268f47e8
commit d1822b5630
2 changed files with 45 additions and 44 deletions

View File

@@ -60,7 +60,6 @@ from zerver.models import (
get_stream, get_stream,
) )
from zerver.tornado.handlers import AsyncDjangoHandler, allocate_handler_id from zerver.tornado.handlers import AsyncDjangoHandler, allocate_handler_id
from zerver.worker import queue_processors
from zilencer.models import RemoteZulipServer from zilencer.models import RemoteZulipServer
from zproject.backends import ExternalAuthDataDict, ExternalAuthResult from zproject.backends import ExternalAuthDataDict, ExternalAuthResult
@@ -92,12 +91,6 @@ def stub_event_queue_user_events(
yield yield
@contextmanager
def simulated_queue_client(client: Callable[[], object]) -> Iterator[None]:
with mock.patch.object(queue_processors, "SimpleQueueClient", client):
yield
@contextmanager @contextmanager
def cache_tries_captured() -> Iterator[List[Tuple[str, Union[str, List[str]], Optional[str]]]]: def cache_tries_captured() -> Iterator[List[Tuple[str, Union[str, List[str]], Optional[str]]]]:
cache_queries: List[Tuple[str, Union[str, List[str]], Optional[str]]] = [] cache_queries: List[Tuple[str, Union[str, List[str]], Optional[str]]] = []

View File

@@ -4,8 +4,9 @@ import os
import signal import signal
import time import time
from collections import defaultdict from collections import defaultdict
from contextlib import contextmanager
from inspect import isabstract from inspect import isabstract
from typing import Any, Callable, Dict, List, Mapping, Optional from typing import Any, Callable, Dict, Iterator, List, Mapping, Optional
from unittest.mock import MagicMock, patch from unittest.mock import MagicMock, patch
import orjson import orjson
@@ -20,7 +21,7 @@ from zerver.lib.rate_limiter import RateLimiterLockingException
from zerver.lib.remote_server import PushNotificationBouncerRetryLaterError from zerver.lib.remote_server import PushNotificationBouncerRetryLaterError
from zerver.lib.send_email import EmailNotDeliveredException, FromAddress from zerver.lib.send_email import EmailNotDeliveredException, FromAddress
from zerver.lib.test_classes import ZulipTestCase from zerver.lib.test_classes import ZulipTestCase
from zerver.lib.test_helpers import mock_queue_publish, simulated_queue_client from zerver.lib.test_helpers import mock_queue_publish
from zerver.models import ( from zerver.models import (
NotificationTriggers, NotificationTriggers,
PreregistrationUser, PreregistrationUser,
@@ -45,34 +46,41 @@ from zerver.worker.queue_processors import (
Event = Dict[str, Any] Event = Dict[str, Any]
class FakeClient:
def __init__(self) -> None:
self.queues: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
def enqueue(self, queue_name: str, data: Dict[str, Any]) -> None:
self.queues[queue_name].append(data)
def start_json_consumer(
self,
queue_name: str,
callback: Callable[[List[Dict[str, Any]]], None],
batch_size: int = 1,
timeout: Optional[int] = None,
) -> None:
chunk: List[Dict[str, Any]] = []
queue = self.queues[queue_name]
while queue:
chunk.append(queue.pop(0))
if len(chunk) >= batch_size or not len(queue):
callback(chunk)
chunk = []
def local_queue_size(self) -> int:
return sum(len(q) for q in self.queues.values())
@contextmanager
def simulated_queue_client(client: Callable[[], object]) -> Iterator[None]:
with patch.object(queue_processors, "SimpleQueueClient", client):
yield
class WorkerTest(ZulipTestCase): class WorkerTest(ZulipTestCase):
class FakeClient:
def __init__(self) -> None:
self.queues: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
def enqueue(self, queue_name: str, data: Dict[str, Any]) -> None:
self.queues[queue_name].append(data)
def start_json_consumer(
self,
queue_name: str,
callback: Callable[[List[Dict[str, Any]]], None],
batch_size: int = 1,
timeout: Optional[int] = None,
) -> None:
chunk: List[Dict[str, Any]] = []
queue = self.queues[queue_name]
while queue:
chunk.append(queue.pop(0))
if len(chunk) >= batch_size or not len(queue):
callback(chunk)
chunk = []
def local_queue_size(self) -> int:
return sum(len(q) for q in self.queues.values())
def test_UserActivityWorker(self) -> None: def test_UserActivityWorker(self) -> None:
fake_client = self.FakeClient() fake_client = FakeClient()
user = self.example_user("hamlet") user = self.example_user("hamlet")
UserActivity.objects.filter( UserActivity.objects.filter(
@@ -174,7 +182,7 @@ class WorkerTest(ZulipTestCase):
events = [hamlet_event1, hamlet_event2, othello_event] events = [hamlet_event1, hamlet_event2, othello_event]
fake_client = self.FakeClient() fake_client = FakeClient()
for event in events: for event in events:
fake_client.enqueue("missedmessage_emails", event) fake_client.enqueue("missedmessage_emails", event)
@@ -381,7 +389,7 @@ class WorkerTest(ZulipTestCase):
functions to immediately produce the effect we want, to test its handling by the queue functions to immediately produce the effect we want, to test its handling by the queue
processor. processor.
""" """
fake_client = self.FakeClient() fake_client = FakeClient()
def fake_publish( def fake_publish(
queue_name: str, event: Dict[str, Any], processor: Callable[[Any], None] queue_name: str, event: Dict[str, Any], processor: Callable[[Any], None]
@@ -466,7 +474,7 @@ class WorkerTest(ZulipTestCase):
@patch("zerver.worker.queue_processors.mirror_email") @patch("zerver.worker.queue_processors.mirror_email")
def test_mirror_worker(self, mock_mirror_email: MagicMock) -> None: def test_mirror_worker(self, mock_mirror_email: MagicMock) -> None:
fake_client = self.FakeClient() fake_client = FakeClient()
stream = get_stream("Denmark", get_realm("zulip")) stream = get_stream("Denmark", get_realm("zulip"))
stream_to_address = encode_email_address(stream) stream_to_address = encode_email_address(stream)
data = [ data = [
@@ -489,7 +497,7 @@ class WorkerTest(ZulipTestCase):
@patch("zerver.worker.queue_processors.mirror_email") @patch("zerver.worker.queue_processors.mirror_email")
@override_settings(RATE_LIMITING_MIRROR_REALM_RULES=[(10, 2)]) @override_settings(RATE_LIMITING_MIRROR_REALM_RULES=[(10, 2)])
def test_mirror_worker_rate_limiting(self, mock_mirror_email: MagicMock) -> None: def test_mirror_worker_rate_limiting(self, mock_mirror_email: MagicMock) -> None:
fake_client = self.FakeClient() fake_client = FakeClient()
realm = get_realm("zulip") realm = get_realm("zulip")
RateLimitedRealmMirror(realm).clear_history() RateLimitedRealmMirror(realm).clear_history()
stream = get_stream("Denmark", realm) stream = get_stream("Denmark", realm)
@@ -565,7 +573,7 @@ class WorkerTest(ZulipTestCase):
def test_email_sending_worker_retries(self) -> None: def test_email_sending_worker_retries(self) -> None:
"""Tests the retry_send_email_failures decorator to make sure it """Tests the retry_send_email_failures decorator to make sure it
retries sending the email 3 times and then gives up.""" retries sending the email 3 times and then gives up."""
fake_client = self.FakeClient() fake_client = FakeClient()
data = { data = {
"template_prefix": "zerver/emails/confirm_new_email", "template_prefix": "zerver/emails/confirm_new_email",
@@ -597,7 +605,7 @@ class WorkerTest(ZulipTestCase):
self.assertEqual(data["failed_tries"], 1 + MAX_REQUEST_RETRIES) self.assertEqual(data["failed_tries"], 1 + MAX_REQUEST_RETRIES)
def test_invites_worker(self) -> None: def test_invites_worker(self) -> None:
fake_client = self.FakeClient() fake_client = FakeClient()
inviter = self.example_user("iago") inviter = self.example_user("iago")
prereg_alice = PreregistrationUser.objects.create( prereg_alice = PreregistrationUser.objects.create(
email=self.nonreg_email("alice"), referred_by=inviter, realm=inviter.realm email=self.nonreg_email("alice"), referred_by=inviter, realm=inviter.realm
@@ -647,7 +655,7 @@ class WorkerTest(ZulipTestCase):
raise Exception("Worker task not performing as expected!") raise Exception("Worker task not performing as expected!")
processed.append(data["type"]) processed.append(data["type"])
fake_client = self.FakeClient() fake_client = FakeClient()
for msg in ["good", "fine", "unexpected behaviour", "back to normal"]: for msg in ["good", "fine", "unexpected behaviour", "back to normal"]:
fake_client.enqueue("unreliable_worker", {"type": msg}) fake_client.enqueue("unreliable_worker", {"type": msg})
@@ -727,7 +735,7 @@ class WorkerTest(ZulipTestCase):
time.sleep(5) time.sleep(5)
processed.append(data["type"]) processed.append(data["type"])
fake_client = self.FakeClient() fake_client = FakeClient()
for msg in ["good", "fine", "timeout", "back to normal"]: for msg in ["good", "fine", "timeout", "back to normal"]:
fake_client.enqueue("timeout_worker", {"type": msg}) fake_client.enqueue("timeout_worker", {"type": msg})
@@ -767,7 +775,7 @@ class WorkerTest(ZulipTestCase):
pid = os.getpid() pid = os.getpid()
os.kill(pid, signal.SIGALRM) os.kill(pid, signal.SIGALRM)
fake_client = self.FakeClient() fake_client = FakeClient()
fake_client.enqueue( fake_client.enqueue(
"timeout_worker", "timeout_worker",
{ {