queue: Round-trip events through orjson in tests.

This ensures that tests don't rely on non-JSON'able datatypes being
able to be put in an event.
This commit is contained in:
Alex Vandiver
2025-03-06 16:46:42 +00:00
committed by Tim Abbott
parent b3862c5008
commit 1eb4b65b5a
2 changed files with 20 additions and 2 deletions

View File

@@ -446,13 +446,20 @@ def queue_json_publish_rollback_unsafe(
if settings.USING_RABBITMQ: if settings.USING_RABBITMQ:
get_queue_client().json_publish(queue_name, event) get_queue_client().json_publish(queue_name, event)
elif processor: elif processor:
processor(event) # Round-trip through orjson to simulate what RabbitMQ does.
processor(orjson.loads(orjson.dumps(event)))
else: else:
# The else branch is only hit during tests, where rabbitmq is not enabled. # The else branch is only hit during tests, where rabbitmq is not enabled.
# Must be imported here: A top section import leads to circular imports # Must be imported here: A top section import leads to circular imports
from zerver.worker.queue_processors import get_worker from zerver.worker.queue_processors import get_worker
get_worker(queue_name, disable_timeout=True).consume_single_event(event) # As above, we round-trip the event through orjson to emulate
# what happens with RabbitMQ enqueueing and dequeueing the
# event. This ensures that we don't rely on non-JSON'able
# datatypes in the events.
get_worker(queue_name, disable_timeout=True).consume_single_event(
orjson.loads(orjson.dumps(event))
)
def queue_event_on_commit(queue_name: str, event: dict[str, Any]) -> None: def queue_event_on_commit(queue_name: str, event: dict[str, Any]) -> None:

View File

@@ -225,4 +225,15 @@ def send_event_rollback_unsafe(
def send_event_on_commit( def send_event_on_commit(
realm: Realm, event: Mapping[str, Any], users: Iterable[int] | Iterable[Mapping[str, Any]] realm: Realm, event: Mapping[str, Any], users: Iterable[int] | Iterable[Mapping[str, Any]]
) -> None: ) -> None:
if not settings.USING_RABBITMQ:
# In tests, round-trip the event through JSON, as happens with
# RabbitMQ. zerver.lib.queue also enforces this, but the
# on-commit nature of the event sending makes it difficult to
# trace which event was at fault -- so we also check it
# immediately, here.
try:
event = orjson.loads(orjson.dumps(event))
except TypeError:
print(event)
raise
transaction.on_commit(lambda: send_event_rollback_unsafe(realm, event, users)) transaction.on_commit(lambda: send_event_rollback_unsafe(realm, event, users))