diff --git a/docs/subsystems/queuing.md b/docs/subsystems/queuing.md index 9e436f2a0c..18b37f15ca 100644 --- a/docs/subsystems/queuing.md +++ b/docs/subsystems/queuing.md @@ -57,13 +57,13 @@ manually. ### Publishing events into a queue You can publish events to a RabbitMQ queue using the -`queue_json_publish` function defined in `zerver/lib/queue.py`. +`queue_event_on_commit` function defined in `zerver/lib/queue.py`. An interesting challenge with queue processors is what should happen when queued events in Zulip's backend tests. Our current solution is -that in the tests, `queue_json_publish` will (by default) simple call +that in the tests, `queue_event_on_commit` will (by default) simple call the `consume` method for the relevant queue processor. However, -`queue_json_publish` also supports being passed a function that should +`queue_event_on_commit` also supports being passed a function that should be called in the tests instead of the queue processor's `consume` method. Where possible, we prefer the model of calling `consume` in tests since that's more predictable and automatically covers the queue diff --git a/zerver/actions/user_activity.py b/zerver/actions/user_activity.py index 0c747295fe..eb0ca8379b 100644 --- a/zerver/actions/user_activity.py +++ b/zerver/actions/user_activity.py @@ -1,6 +1,6 @@ from datetime import datetime -from zerver.lib.queue import queue_json_publish +from zerver.lib.queue import queue_json_publish_rollback_unsafe from zerver.lib.timestamp import datetime_to_timestamp from zerver.models import UserActivityInterval, UserProfile @@ -31,4 +31,4 @@ def do_update_user_activity_interval(user_profile: UserProfile, log_time: dateti def update_user_activity_interval(user_profile: UserProfile, log_time: datetime) -> None: event = {"user_profile_id": user_profile.id, "time": datetime_to_timestamp(log_time)} - queue_json_publish("user_activity_interval", event) + queue_json_publish_rollback_unsafe("user_activity_interval", event) diff --git a/zerver/decorator.py b/zerver/decorator.py index 1d6bb47e58..2e34cbced8 100644 --- a/zerver/decorator.py +++ b/zerver/decorator.py @@ -43,7 +43,7 @@ from zerver.lib.exceptions import ( UserDeactivatedError, WebhookError, ) -from zerver.lib.queue import queue_json_publish +from zerver.lib.queue import queue_json_publish_rollback_unsafe from zerver.lib.rate_limiter import is_local_addr, rate_limit_request_by_ip, rate_limit_user from zerver.lib.request import RequestNotes from zerver.lib.response import json_method_not_allowed @@ -91,7 +91,7 @@ def update_user_activity( "time": datetime_to_timestamp(timezone_now()), "client_id": request_notes.client.id, } - queue_json_publish("user_activity", event, lambda event: None) + queue_json_publish_rollback_unsafe("user_activity", event, lambda event: None) # Based on django.views.decorators.http.require_http_methods diff --git a/zerver/lib/digest.py b/zerver/lib/digest.py index 69f5cd7fd9..194f8518b8 100644 --- a/zerver/lib/digest.py +++ b/zerver/lib/digest.py @@ -17,7 +17,7 @@ from zerver.context_processors import common_context from zerver.lib.email_notifications import build_message_list from zerver.lib.logging_util import log_to_file from zerver.lib.message import get_last_message_id -from zerver.lib.queue import queue_json_publish +from zerver.lib.queue import queue_json_publish_rollback_unsafe from zerver.lib.send_email import FromAddress, send_future_email from zerver.lib.url_encoding import stream_narrow_url from zerver.models import ( @@ -92,7 +92,7 @@ class DigestTopic: def queue_digest_user_ids(user_ids: list[int], cutoff: datetime) -> None: # Convert cutoff to epoch seconds for transit. event = {"user_ids": user_ids, "cutoff": cutoff.strftime("%s")} - queue_json_publish("digest_emails", event) + queue_json_publish_rollback_unsafe("digest_emails", event) def enqueue_emails(cutoff: datetime) -> None: diff --git a/zerver/lib/email_mirror.py b/zerver/lib/email_mirror.py index 370517507d..e95a79b2ea 100644 --- a/zerver/lib/email_mirror.py +++ b/zerver/lib/email_mirror.py @@ -25,7 +25,7 @@ from zerver.lib.email_mirror_helpers import ( from zerver.lib.email_notifications import convert_html_to_markdown from zerver.lib.exceptions import JsonableError, RateLimitedError from zerver.lib.message import normalize_body, truncate_content, truncate_topic -from zerver.lib.queue import queue_json_publish +from zerver.lib.queue import queue_json_publish_rollback_unsafe from zerver.lib.rate_limiter import RateLimitedObject from zerver.lib.send_email import FromAddress from zerver.lib.streams import access_stream_for_send_message @@ -540,7 +540,7 @@ def mirror_email_message(rcpt_to: str, msg_base64: str) -> dict[str, str]: "msg": f"5.1.1 Bad destination mailbox address: {e}", } - queue_json_publish( + queue_json_publish_rollback_unsafe( "email_mirror", { "rcpt_to": rcpt_to, diff --git a/zerver/lib/queue.py b/zerver/lib/queue.py index 5aed1b5793..48d247f12c 100644 --- a/zerver/lib/queue.py +++ b/zerver/lib/queue.py @@ -434,7 +434,11 @@ def set_queue_client(queue_client: SimpleQueueClient | TornadoQueueClient) -> No thread_data.queue_client = queue_client -def queue_json_publish( +# One should generally use `queue_event_on_commit` unless there's a strong +# reason to use `queue_json_publish_rollback_unsafe` directly, as it doesn't +# wait for the db transaction (within which it gets called, if any) to commit +# and sends event irrespective of commit or rollback. +def queue_json_publish_rollback_unsafe( queue_name: str, event: dict[str, Any], processor: Callable[[Any], None] | None = None, @@ -452,7 +456,7 @@ def queue_json_publish( def queue_event_on_commit(queue_name: str, event: dict[str, Any]) -> None: - transaction.on_commit(lambda: queue_json_publish(queue_name, event)) + transaction.on_commit(lambda: queue_json_publish_rollback_unsafe(queue_name, event)) def retry_event( @@ -464,4 +468,4 @@ def retry_event( if event["failed_tries"] > MAX_REQUEST_RETRIES: failure_processor(event) else: - queue_json_publish(queue_name, event) + queue_json_publish_rollback_unsafe(queue_name, event) diff --git a/zerver/management/commands/enqueue_file.py b/zerver/management/commands/enqueue_file.py index fae27ce423..5fac0f7b33 100644 --- a/zerver/management/commands/enqueue_file.py +++ b/zerver/management/commands/enqueue_file.py @@ -6,7 +6,7 @@ import orjson from typing_extensions import override from zerver.lib.management import ZulipBaseCommand -from zerver.lib.queue import queue_json_publish +from zerver.lib.queue import queue_json_publish_rollback_unsafe def error(*args: Any) -> None: @@ -28,7 +28,7 @@ def enqueue_file(queue_name: str, f: IO[str]) -> None: # This is designed to use the `error` method rather than # the call_consume_in_tests flow. - queue_json_publish(queue_name, data, error) + queue_json_publish_rollback_unsafe(queue_name, data, error) class Command(ZulipBaseCommand): diff --git a/zerver/migrations/0209_user_profile_no_empty_password.py b/zerver/migrations/0209_user_profile_no_empty_password.py index 15aec02d0f..9c31246816 100644 --- a/zerver/migrations/0209_user_profile_no_empty_password.py +++ b/zerver/migrations/0209_user_profile_no_empty_password.py @@ -11,7 +11,7 @@ from django.db.migrations.state import StateApps from django.utils.timezone import now as timezone_now from zerver.lib.cache import cache_delete, user_profile_by_api_key_cache_key -from zerver.lib.queue import queue_json_publish +from zerver.lib.queue import queue_json_publish_rollback_unsafe from zerver.lib.utils import generate_api_key @@ -223,7 +223,7 @@ def reset_user_api_key(user_profile: Any) -> None: # we can just write to the queue processor that handles sending # those notices to the push notifications bouncer service. event = {"type": "clear_push_device_tokens", "user_profile_id": user_profile.id} - queue_json_publish("deferred_work", event) + queue_json_publish_rollback_unsafe("deferred_work", event) class Migration(migrations.Migration): diff --git a/zerver/migrations/0387_reupload_realmemoji_again.py b/zerver/migrations/0387_reupload_realmemoji_again.py index bdff80e250..1855476248 100644 --- a/zerver/migrations/0387_reupload_realmemoji_again.py +++ b/zerver/migrations/0387_reupload_realmemoji_again.py @@ -3,7 +3,7 @@ from django.db import migrations from django.db.backends.base.schema import BaseDatabaseSchemaEditor from django.db.migrations.state import StateApps -from zerver.lib.queue import queue_json_publish +from zerver.lib.queue import queue_json_publish_rollback_unsafe def reupload_realm_emoji(apps: StateApps, schema_editor: BaseDatabaseSchemaEditor) -> None: @@ -34,7 +34,7 @@ def reupload_realm_emoji(apps: StateApps, schema_editor: BaseDatabaseSchemaEdito "type": "reupload_realm_emoji", "realm_id": realm_id, } - queue_json_publish("deferred_work", event) + queue_json_publish_rollback_unsafe("deferred_work", event) class Migration(migrations.Migration): diff --git a/zerver/signals.py b/zerver/signals.py index e142b386bb..083cbde269 100644 --- a/zerver/signals.py +++ b/zerver/signals.py @@ -9,7 +9,7 @@ from django.utils.timezone import now as timezone_now from django.utils.translation import gettext as _ from confirmation.models import one_click_unsubscribe_link -from zerver.lib.queue import queue_json_publish +from zerver.lib.queue import queue_json_publish_rollback_unsafe from zerver.lib.send_email import FromAddress from zerver.lib.timezone import canonicalize_timezone from zerver.models import UserProfile @@ -110,7 +110,7 @@ def email_on_new_login(sender: Any, user: UserProfile, request: Any, **kwargs: A "from_address": FromAddress.NOREPLY, "context": context, } - queue_json_publish("email_senders", email_dict) + queue_json_publish_rollback_unsafe("email_senders", email_dict) @receiver(user_logged_out) diff --git a/zerver/tests/test_email_mirror.py b/zerver/tests/test_email_mirror.py index 3fce6ac7bc..b76fc69bcd 100644 --- a/zerver/tests/test_email_mirror.py +++ b/zerver/tests/test_email_mirror.py @@ -1577,7 +1577,7 @@ class TestEmailMirrorTornadoView(ZulipTestCase): "secret": settings.SHARED_SECRET, } - with mock_queue_publish("zerver.lib.email_mirror.queue_json_publish") as m: + with mock_queue_publish("zerver.lib.email_mirror.queue_json_publish_rollback_unsafe") as m: m.side_effect = check_queue_json_publish return self.client_post("/api/internal/email_mirror_message", post_data) diff --git a/zerver/tests/test_event_queue.py b/zerver/tests/test_event_queue.py index 99930afc4a..21cc48f8c7 100644 --- a/zerver/tests/test_event_queue.py +++ b/zerver/tests/test_event_queue.py @@ -41,13 +41,13 @@ class MaybeEnqueueNotificationsTest(ZulipTestCase): ) with mock_queue_publish( - "zerver.tornado.event_queue.queue_json_publish" + "zerver.tornado.event_queue.queue_json_publish_rollback_unsafe" ) as mock_queue_json_publish: notified = maybe_enqueue_notifications(**params) mock_queue_json_publish.assert_not_called() with mock_queue_publish( - "zerver.tornado.event_queue.queue_json_publish" + "zerver.tornado.event_queue.queue_json_publish_rollback_unsafe" ) as mock_queue_json_publish: params["user_notifications_data"] = self.create_user_notifications_data_object( user_id=1, dm_push_notify=True, dm_email_notify=True @@ -63,7 +63,7 @@ class MaybeEnqueueNotificationsTest(ZulipTestCase): self.assertTrue(notified["push_notified"]) with mock_queue_publish( - "zerver.tornado.event_queue.queue_json_publish" + "zerver.tornado.event_queue.queue_json_publish_rollback_unsafe" ) as mock_queue_json_publish: params = self.get_maybe_enqueue_notifications_parameters( message_id=1, diff --git a/zerver/tests/test_link_embed.py b/zerver/tests/test_link_embed.py index c5e08b1486..bc7305f52d 100644 --- a/zerver/tests/test_link_embed.py +++ b/zerver/tests/test_link_embed.py @@ -14,7 +14,7 @@ from typing_extensions import override from zerver.actions.message_delete import do_delete_messages from zerver.lib.cache import cache_delete, cache_get, preview_url_cache_key from zerver.lib.camo import get_camo_url -from zerver.lib.queue import queue_json_publish +from zerver.lib.queue import queue_json_publish_rollback_unsafe from zerver.lib.test_classes import ZulipTestCase from zerver.lib.test_helpers import mock_queue_publish from zerver.lib.url_preview.oembed import get_oembed_data, strip_cdata @@ -457,9 +457,9 @@ class PreviewTestCase(ZulipTestCase): self.assertTrue(responses.assert_call_count(edited_url, 0)) with self.settings(TEST_SUITE=False), self.assertLogs(level="INFO") as info_logs: - # Now proceed with the original queue_json_publish and call the - # up-to-date event for edited_url. - queue_json_publish(*args, **kwargs) + # Now proceed with the original queue_json_publish_rollback_unsafe + # and call the up-to-date event for edited_url. + queue_json_publish_rollback_unsafe(*args, **kwargs) msg = Message.objects.select_related("sender").get(id=msg_id) assert msg.rendered_content is not None self.assertIn( diff --git a/zerver/tests/test_message_delete.py b/zerver/tests/test_message_delete.py index 51bc55a234..f398db29f7 100644 --- a/zerver/tests/test_message_delete.py +++ b/zerver/tests/test_message_delete.py @@ -500,7 +500,7 @@ class DeleteMessageTest(ZulipTestCase): with ( self.capture_send_event_calls(expected_num_events=1), - mock.patch("zerver.tornado.django_api.queue_json_publish") as m, + mock.patch("zerver.tornado.django_api.queue_json_publish_rollback_unsafe") as m, ): m.side_effect = AssertionError( "Events should be sent only after the transaction commits." diff --git a/zerver/tests/test_message_edit_notifications.py b/zerver/tests/test_message_edit_notifications.py index 3841513e1b..661e0d0be8 100644 --- a/zerver/tests/test_message_edit_notifications.py +++ b/zerver/tests/test_message_edit_notifications.py @@ -133,7 +133,8 @@ class EditMessageSideEffectsTest(ZulipTestCase): ) with mock_queue_publish( - "zerver.tornado.event_queue.queue_json_publish", side_effect=fake_publish + "zerver.tornado.event_queue.queue_json_publish_rollback_unsafe", + side_effect=fake_publish, ) as m: maybe_enqueue_notifications(**enqueue_kwargs) diff --git a/zerver/tests/test_queue.py b/zerver/tests/test_queue.py index b6e2f75d87..544ae569ce 100644 --- a/zerver/tests/test_queue.py +++ b/zerver/tests/test_queue.py @@ -10,7 +10,7 @@ from zerver.lib.queue import ( SimpleQueueClient, TornadoQueueClient, get_queue_client, - queue_json_publish, + queue_json_publish_rollback_unsafe, ) from zerver.lib.test_classes import ZulipTestCase @@ -43,7 +43,7 @@ class TestQueueImplementation(ZulipTestCase): output.append(events[0]) queue_client.stop_consuming() - queue_json_publish("test_suite", {"event": "my_event"}) + queue_json_publish_rollback_unsafe("test_suite", {"event": "my_event"}) queue_client.start_json_consumer("test_suite", collect) @@ -67,7 +67,7 @@ class TestQueueImplementation(ZulipTestCase): raise Exception("Make me nack!") output.append(events[0]) - queue_json_publish("test_suite", {"event": "my_event"}) + queue_json_publish_rollback_unsafe("test_suite", {"event": "my_event"}) try: queue_client.start_json_consumer("test_suite", collect) @@ -97,7 +97,7 @@ class TestQueueImplementation(ZulipTestCase): mock.patch("zerver.lib.queue.SimpleQueueClient.publish", throw_connection_error_once), self.assertLogs("zulip.queue", level="WARN") as warn_logs, ): - queue_json_publish("test_suite", {"event": "my_event"}) + queue_json_publish_rollback_unsafe("test_suite", {"event": "my_event"}) self.assertEqual( warn_logs.output, ["WARNING:zulip.queue:Failed to send to rabbitmq, trying to reconnect and send again"], diff --git a/zerver/tests/test_queue_worker.py b/zerver/tests/test_queue_worker.py index a5cde9ab2e..1e0a4887f9 100644 --- a/zerver/tests/test_queue_worker.py +++ b/zerver/tests/test_queue_worker.py @@ -552,7 +552,8 @@ class WorkerTest(ZulipTestCase): with ( mock_queue_publish( - "zerver.lib.queue.queue_json_publish", side_effect=fake_publish + "zerver.lib.queue.queue_json_publish_rollback_unsafe", + side_effect=fake_publish, ), self.assertLogs( "zerver.worker.missedmessage_mobile_notifications", "WARNING" @@ -694,7 +695,9 @@ class WorkerTest(ZulipTestCase): worker.setup() with ( patch("zerver.lib.send_email.build_email", side_effect=EmailNotDeliveredError), - mock_queue_publish("zerver.lib.queue.queue_json_publish", side_effect=fake_publish), + mock_queue_publish( + "zerver.lib.queue.queue_json_publish_rollback_unsafe", side_effect=fake_publish + ), self.assertLogs(level="ERROR") as m, ): worker.start() diff --git a/zerver/tests/test_reactions.py b/zerver/tests/test_reactions.py index d69a00d107..911efb2b62 100644 --- a/zerver/tests/test_reactions.py +++ b/zerver/tests/test_reactions.py @@ -1056,7 +1056,7 @@ class ReactionAPIEventTest(EmojiReactionBase): } with ( self.capture_send_event_calls(expected_num_events=1) as events, - mock.patch("zerver.tornado.django_api.queue_json_publish") as m, + mock.patch("zerver.tornado.django_api.queue_json_publish_rollback_unsafe") as m, ): m.side_effect = AssertionError( "Events should be sent only after the transaction commits!" @@ -1141,7 +1141,7 @@ class ReactionAPIEventTest(EmojiReactionBase): with ( self.capture_send_event_calls(expected_num_events=1), - mock.patch("zerver.tornado.django_api.queue_json_publish") as m, + mock.patch("zerver.tornado.django_api.queue_json_publish_rollback_unsafe") as m, ): m.side_effect = AssertionError( "Events should be sent only after the transaction commits." diff --git a/zerver/tests/test_realm_export.py b/zerver/tests/test_realm_export.py index a1da2c8fa2..fd1a481e7e 100644 --- a/zerver/tests/test_realm_export.py +++ b/zerver/tests/test_realm_export.py @@ -9,7 +9,7 @@ from django.utils.timezone import now as timezone_now from analytics.models import RealmCount from zerver.actions.user_settings import do_change_user_setting from zerver.lib.exceptions import JsonableError -from zerver.lib.queue import queue_json_publish +from zerver.lib.queue import queue_json_publish_rollback_unsafe from zerver.lib.test_classes import ZulipTestCase from zerver.lib.test_helpers import ( HostRequestMock, @@ -250,7 +250,7 @@ class RealmExportTest(ZulipTestCase): patch("zerver.lib.export.do_export_realm") as mock_export, self.assertLogs(level="INFO") as info_logs, ): - queue_json_publish( + queue_json_publish_rollback_unsafe( "deferred_work", { "type": "realm_export", diff --git a/zerver/tests/test_submessage.py b/zerver/tests/test_submessage.py index 182180900d..4defaac333 100644 --- a/zerver/tests/test_submessage.py +++ b/zerver/tests/test_submessage.py @@ -196,7 +196,7 @@ class TestBasics(ZulipTestCase): with ( self.capture_send_event_calls(expected_num_events=1), - mock.patch("zerver.tornado.django_api.queue_json_publish") as m, + mock.patch("zerver.tornado.django_api.queue_json_publish_rollback_unsafe") as m, ): m.side_effect = AssertionError( "Events should be sent only after the transaction commits." diff --git a/zerver/tornado/django_api.py b/zerver/tornado/django_api.py index d7043401d8..e3a8865af5 100644 --- a/zerver/tornado/django_api.py +++ b/zerver/tornado/django_api.py @@ -14,7 +14,7 @@ from typing_extensions import override from urllib3.util import Retry from zerver.lib.partial import partial -from zerver.lib.queue import queue_json_publish +from zerver.lib.queue import queue_json_publish_rollback_unsafe from zerver.models import Client, Realm, UserProfile from zerver.tornado.sharding import ( get_realm_tornado_ports, @@ -201,7 +201,7 @@ def send_event_rollback_unsafe( port_user_map[get_user_id_tornado_port(realm_ports, user_id)].append(user) for port, port_users in port_user_map.items(): - queue_json_publish( + queue_json_publish_rollback_unsafe( notify_tornado_queue_name(port), dict(event=event, users=port_users), partial(send_notification_http, port), diff --git a/zerver/tornado/event_queue.py b/zerver/tornado/event_queue.py index 84964eb044..f0bbc6f932 100644 --- a/zerver/tornado/event_queue.py +++ b/zerver/tornado/event_queue.py @@ -27,7 +27,7 @@ from zerver.lib.message_cache import MessageDict from zerver.lib.narrow_helpers import narrow_dataclasses_from_tuples from zerver.lib.narrow_predicate import build_narrow_predicate from zerver.lib.notification_data import UserMessageNotificationsData -from zerver.lib.queue import queue_json_publish, retry_event +from zerver.lib.queue import queue_json_publish_rollback_unsafe, retry_event from zerver.middleware import async_request_timer_restart from zerver.models import CustomProfileField from zerver.tornado.descriptors import clear_descriptor_by_handler_id, set_descriptor_by_handler_id @@ -964,9 +964,11 @@ def maybe_enqueue_notifications( shard_id = ( user_notifications_data.user_id % settings.MOBILE_NOTIFICATIONS_SHARDS + 1 ) - queue_json_publish(f"missedmessage_mobile_notifications_shard{shard_id}", notice) + queue_json_publish_rollback_unsafe( + f"missedmessage_mobile_notifications_shard{shard_id}", notice + ) else: - queue_json_publish("missedmessage_mobile_notifications", notice) + queue_json_publish_rollback_unsafe("missedmessage_mobile_notifications", notice) notified["push_notified"] = True # Send missed_message emails if a direct message or a @@ -980,7 +982,7 @@ def maybe_enqueue_notifications( ) notice["mentioned_user_group_id"] = mentioned_user_group_id if not already_notified.get("email_notified"): - queue_json_publish("missedmessage_emails", notice, lambda notice: None) + queue_json_publish_rollback_unsafe("missedmessage_emails", notice, lambda notice: None) notified["email_notified"] = True return notified diff --git a/zerver/worker/deferred_work.py b/zerver/worker/deferred_work.py index 3d91876a29..95c23ebf18 100644 --- a/zerver/worker/deferred_work.py +++ b/zerver/worker/deferred_work.py @@ -17,7 +17,7 @@ from zerver.actions.message_send import internal_send_private_message from zerver.actions.realm_export import notify_realm_export from zerver.lib.export import export_realm_wrapper from zerver.lib.push_notifications import clear_push_device_tokens -from zerver.lib.queue import queue_json_publish, retry_event +from zerver.lib.queue import queue_json_publish_rollback_unsafe, retry_event from zerver.lib.remote_server import ( PushNotificationBouncerRetryLaterError, send_server_data_to_push_bouncer, @@ -100,7 +100,7 @@ class DeferredWorker(QueueProcessingWorker): # 30s, we re-push the task onto the tail of the # queue, to allow other deferred work to complete; # this task is extremely low priority. - queue_json_publish("deferred_work", {**event, "min_id": min_id}) + queue_json_publish_rollback_unsafe("deferred_work", {**event, "min_id": min_id}) break logger.info( "Marked %s messages as read for all users, stream_recipient_id %s", diff --git a/zilencer/management/commands/queue_rate.py b/zilencer/management/commands/queue_rate.py index db47e6a798..3fb63b9da3 100644 --- a/zilencer/management/commands/queue_rate.py +++ b/zilencer/management/commands/queue_rate.py @@ -6,7 +6,7 @@ from django.core.management.base import CommandParser from typing_extensions import override from zerver.lib.management import ZulipBaseCommand -from zerver.lib.queue import SimpleQueueClient, queue_json_publish +from zerver.lib.queue import SimpleQueueClient, queue_json_publish_rollback_unsafe from zerver.worker.test import BatchNoopWorker, NoopWorker @@ -71,7 +71,7 @@ class Command(ZulipBaseCommand): for i in range(1, reps + 1): worker.consumed = 0 timeit( - lambda: queue_json_publish(queue_name, {}), + lambda: queue_json_publish_rollback_unsafe(queue_name, {}), number=count, ) duration = timeit(worker.start, number=1)