From 3bad36ef8cf07cd57d0b257a739bae635a8527ac Mon Sep 17 00:00:00 2001 From: Prakhar Pratyush Date: Fri, 6 Dec 2024 12:14:03 +0530 Subject: [PATCH] queue: Rename queue_json_publish to queue_json_publish_rollback_unsafe. This commit renames the 'queue_json_publish' function to 'queue_json_publish_rollback_unsafe' to reflect the fact that it doesn't wait for the db transaction (within which it gets called, if any) to commit and sends event irrespective of commit or rollback. In most of the cases we don't want to send event in the case of rollbacks, so the caller should be aware that calling the function directly is rollback unsafe. Fixes part of #30489. --- docs/subsystems/queuing.md | 6 +++--- zerver/actions/user_activity.py | 4 ++-- zerver/decorator.py | 4 ++-- zerver/lib/digest.py | 4 ++-- zerver/lib/email_mirror.py | 4 ++-- zerver/lib/queue.py | 10 +++++++--- zerver/management/commands/enqueue_file.py | 4 ++-- .../migrations/0209_user_profile_no_empty_password.py | 4 ++-- zerver/migrations/0387_reupload_realmemoji_again.py | 4 ++-- zerver/signals.py | 4 ++-- zerver/tests/test_email_mirror.py | 2 +- zerver/tests/test_event_queue.py | 6 +++--- zerver/tests/test_link_embed.py | 8 ++++---- zerver/tests/test_message_delete.py | 2 +- zerver/tests/test_message_edit_notifications.py | 3 ++- zerver/tests/test_queue.py | 8 ++++---- zerver/tests/test_queue_worker.py | 7 +++++-- zerver/tests/test_reactions.py | 4 ++-- zerver/tests/test_realm_export.py | 4 ++-- zerver/tests/test_submessage.py | 2 +- zerver/tornado/django_api.py | 4 ++-- zerver/tornado/event_queue.py | 10 ++++++---- zerver/worker/deferred_work.py | 4 ++-- zilencer/management/commands/queue_rate.py | 4 ++-- 24 files changed, 63 insertions(+), 53 deletions(-) diff --git a/docs/subsystems/queuing.md b/docs/subsystems/queuing.md index 9e436f2a0c..18b37f15ca 100644 --- a/docs/subsystems/queuing.md +++ b/docs/subsystems/queuing.md @@ -57,13 +57,13 @@ manually. ### Publishing events into a queue You can publish events to a RabbitMQ queue using the -`queue_json_publish` function defined in `zerver/lib/queue.py`. +`queue_event_on_commit` function defined in `zerver/lib/queue.py`. An interesting challenge with queue processors is what should happen when queued events in Zulip's backend tests. Our current solution is -that in the tests, `queue_json_publish` will (by default) simple call +that in the tests, `queue_event_on_commit` will (by default) simple call the `consume` method for the relevant queue processor. However, -`queue_json_publish` also supports being passed a function that should +`queue_event_on_commit` also supports being passed a function that should be called in the tests instead of the queue processor's `consume` method. Where possible, we prefer the model of calling `consume` in tests since that's more predictable and automatically covers the queue diff --git a/zerver/actions/user_activity.py b/zerver/actions/user_activity.py index 0c747295fe..eb0ca8379b 100644 --- a/zerver/actions/user_activity.py +++ b/zerver/actions/user_activity.py @@ -1,6 +1,6 @@ from datetime import datetime -from zerver.lib.queue import queue_json_publish +from zerver.lib.queue import queue_json_publish_rollback_unsafe from zerver.lib.timestamp import datetime_to_timestamp from zerver.models import UserActivityInterval, UserProfile @@ -31,4 +31,4 @@ def do_update_user_activity_interval(user_profile: UserProfile, log_time: dateti def update_user_activity_interval(user_profile: UserProfile, log_time: datetime) -> None: event = {"user_profile_id": user_profile.id, "time": datetime_to_timestamp(log_time)} - queue_json_publish("user_activity_interval", event) + queue_json_publish_rollback_unsafe("user_activity_interval", event) diff --git a/zerver/decorator.py b/zerver/decorator.py index 1d6bb47e58..2e34cbced8 100644 --- a/zerver/decorator.py +++ b/zerver/decorator.py @@ -43,7 +43,7 @@ from zerver.lib.exceptions import ( UserDeactivatedError, WebhookError, ) -from zerver.lib.queue import queue_json_publish +from zerver.lib.queue import queue_json_publish_rollback_unsafe from zerver.lib.rate_limiter import is_local_addr, rate_limit_request_by_ip, rate_limit_user from zerver.lib.request import RequestNotes from zerver.lib.response import json_method_not_allowed @@ -91,7 +91,7 @@ def update_user_activity( "time": datetime_to_timestamp(timezone_now()), "client_id": request_notes.client.id, } - queue_json_publish("user_activity", event, lambda event: None) + queue_json_publish_rollback_unsafe("user_activity", event, lambda event: None) # Based on django.views.decorators.http.require_http_methods diff --git a/zerver/lib/digest.py b/zerver/lib/digest.py index 69f5cd7fd9..194f8518b8 100644 --- a/zerver/lib/digest.py +++ b/zerver/lib/digest.py @@ -17,7 +17,7 @@ from zerver.context_processors import common_context from zerver.lib.email_notifications import build_message_list from zerver.lib.logging_util import log_to_file from zerver.lib.message import get_last_message_id -from zerver.lib.queue import queue_json_publish +from zerver.lib.queue import queue_json_publish_rollback_unsafe from zerver.lib.send_email import FromAddress, send_future_email from zerver.lib.url_encoding import stream_narrow_url from zerver.models import ( @@ -92,7 +92,7 @@ class DigestTopic: def queue_digest_user_ids(user_ids: list[int], cutoff: datetime) -> None: # Convert cutoff to epoch seconds for transit. event = {"user_ids": user_ids, "cutoff": cutoff.strftime("%s")} - queue_json_publish("digest_emails", event) + queue_json_publish_rollback_unsafe("digest_emails", event) def enqueue_emails(cutoff: datetime) -> None: diff --git a/zerver/lib/email_mirror.py b/zerver/lib/email_mirror.py index 370517507d..e95a79b2ea 100644 --- a/zerver/lib/email_mirror.py +++ b/zerver/lib/email_mirror.py @@ -25,7 +25,7 @@ from zerver.lib.email_mirror_helpers import ( from zerver.lib.email_notifications import convert_html_to_markdown from zerver.lib.exceptions import JsonableError, RateLimitedError from zerver.lib.message import normalize_body, truncate_content, truncate_topic -from zerver.lib.queue import queue_json_publish +from zerver.lib.queue import queue_json_publish_rollback_unsafe from zerver.lib.rate_limiter import RateLimitedObject from zerver.lib.send_email import FromAddress from zerver.lib.streams import access_stream_for_send_message @@ -540,7 +540,7 @@ def mirror_email_message(rcpt_to: str, msg_base64: str) -> dict[str, str]: "msg": f"5.1.1 Bad destination mailbox address: {e}", } - queue_json_publish( + queue_json_publish_rollback_unsafe( "email_mirror", { "rcpt_to": rcpt_to, diff --git a/zerver/lib/queue.py b/zerver/lib/queue.py index 5aed1b5793..48d247f12c 100644 --- a/zerver/lib/queue.py +++ b/zerver/lib/queue.py @@ -434,7 +434,11 @@ def set_queue_client(queue_client: SimpleQueueClient | TornadoQueueClient) -> No thread_data.queue_client = queue_client -def queue_json_publish( +# One should generally use `queue_event_on_commit` unless there's a strong +# reason to use `queue_json_publish_rollback_unsafe` directly, as it doesn't +# wait for the db transaction (within which it gets called, if any) to commit +# and sends event irrespective of commit or rollback. +def queue_json_publish_rollback_unsafe( queue_name: str, event: dict[str, Any], processor: Callable[[Any], None] | None = None, @@ -452,7 +456,7 @@ def queue_json_publish( def queue_event_on_commit(queue_name: str, event: dict[str, Any]) -> None: - transaction.on_commit(lambda: queue_json_publish(queue_name, event)) + transaction.on_commit(lambda: queue_json_publish_rollback_unsafe(queue_name, event)) def retry_event( @@ -464,4 +468,4 @@ def retry_event( if event["failed_tries"] > MAX_REQUEST_RETRIES: failure_processor(event) else: - queue_json_publish(queue_name, event) + queue_json_publish_rollback_unsafe(queue_name, event) diff --git a/zerver/management/commands/enqueue_file.py b/zerver/management/commands/enqueue_file.py index fae27ce423..5fac0f7b33 100644 --- a/zerver/management/commands/enqueue_file.py +++ b/zerver/management/commands/enqueue_file.py @@ -6,7 +6,7 @@ import orjson from typing_extensions import override from zerver.lib.management import ZulipBaseCommand -from zerver.lib.queue import queue_json_publish +from zerver.lib.queue import queue_json_publish_rollback_unsafe def error(*args: Any) -> None: @@ -28,7 +28,7 @@ def enqueue_file(queue_name: str, f: IO[str]) -> None: # This is designed to use the `error` method rather than # the call_consume_in_tests flow. - queue_json_publish(queue_name, data, error) + queue_json_publish_rollback_unsafe(queue_name, data, error) class Command(ZulipBaseCommand): diff --git a/zerver/migrations/0209_user_profile_no_empty_password.py b/zerver/migrations/0209_user_profile_no_empty_password.py index 15aec02d0f..9c31246816 100644 --- a/zerver/migrations/0209_user_profile_no_empty_password.py +++ b/zerver/migrations/0209_user_profile_no_empty_password.py @@ -11,7 +11,7 @@ from django.db.migrations.state import StateApps from django.utils.timezone import now as timezone_now from zerver.lib.cache import cache_delete, user_profile_by_api_key_cache_key -from zerver.lib.queue import queue_json_publish +from zerver.lib.queue import queue_json_publish_rollback_unsafe from zerver.lib.utils import generate_api_key @@ -223,7 +223,7 @@ def reset_user_api_key(user_profile: Any) -> None: # we can just write to the queue processor that handles sending # those notices to the push notifications bouncer service. event = {"type": "clear_push_device_tokens", "user_profile_id": user_profile.id} - queue_json_publish("deferred_work", event) + queue_json_publish_rollback_unsafe("deferred_work", event) class Migration(migrations.Migration): diff --git a/zerver/migrations/0387_reupload_realmemoji_again.py b/zerver/migrations/0387_reupload_realmemoji_again.py index bdff80e250..1855476248 100644 --- a/zerver/migrations/0387_reupload_realmemoji_again.py +++ b/zerver/migrations/0387_reupload_realmemoji_again.py @@ -3,7 +3,7 @@ from django.db import migrations from django.db.backends.base.schema import BaseDatabaseSchemaEditor from django.db.migrations.state import StateApps -from zerver.lib.queue import queue_json_publish +from zerver.lib.queue import queue_json_publish_rollback_unsafe def reupload_realm_emoji(apps: StateApps, schema_editor: BaseDatabaseSchemaEditor) -> None: @@ -34,7 +34,7 @@ def reupload_realm_emoji(apps: StateApps, schema_editor: BaseDatabaseSchemaEdito "type": "reupload_realm_emoji", "realm_id": realm_id, } - queue_json_publish("deferred_work", event) + queue_json_publish_rollback_unsafe("deferred_work", event) class Migration(migrations.Migration): diff --git a/zerver/signals.py b/zerver/signals.py index e142b386bb..083cbde269 100644 --- a/zerver/signals.py +++ b/zerver/signals.py @@ -9,7 +9,7 @@ from django.utils.timezone import now as timezone_now from django.utils.translation import gettext as _ from confirmation.models import one_click_unsubscribe_link -from zerver.lib.queue import queue_json_publish +from zerver.lib.queue import queue_json_publish_rollback_unsafe from zerver.lib.send_email import FromAddress from zerver.lib.timezone import canonicalize_timezone from zerver.models import UserProfile @@ -110,7 +110,7 @@ def email_on_new_login(sender: Any, user: UserProfile, request: Any, **kwargs: A "from_address": FromAddress.NOREPLY, "context": context, } - queue_json_publish("email_senders", email_dict) + queue_json_publish_rollback_unsafe("email_senders", email_dict) @receiver(user_logged_out) diff --git a/zerver/tests/test_email_mirror.py b/zerver/tests/test_email_mirror.py index 3fce6ac7bc..b76fc69bcd 100644 --- a/zerver/tests/test_email_mirror.py +++ b/zerver/tests/test_email_mirror.py @@ -1577,7 +1577,7 @@ class TestEmailMirrorTornadoView(ZulipTestCase): "secret": settings.SHARED_SECRET, } - with mock_queue_publish("zerver.lib.email_mirror.queue_json_publish") as m: + with mock_queue_publish("zerver.lib.email_mirror.queue_json_publish_rollback_unsafe") as m: m.side_effect = check_queue_json_publish return self.client_post("/api/internal/email_mirror_message", post_data) diff --git a/zerver/tests/test_event_queue.py b/zerver/tests/test_event_queue.py index 99930afc4a..21cc48f8c7 100644 --- a/zerver/tests/test_event_queue.py +++ b/zerver/tests/test_event_queue.py @@ -41,13 +41,13 @@ class MaybeEnqueueNotificationsTest(ZulipTestCase): ) with mock_queue_publish( - "zerver.tornado.event_queue.queue_json_publish" + "zerver.tornado.event_queue.queue_json_publish_rollback_unsafe" ) as mock_queue_json_publish: notified = maybe_enqueue_notifications(**params) mock_queue_json_publish.assert_not_called() with mock_queue_publish( - "zerver.tornado.event_queue.queue_json_publish" + "zerver.tornado.event_queue.queue_json_publish_rollback_unsafe" ) as mock_queue_json_publish: params["user_notifications_data"] = self.create_user_notifications_data_object( user_id=1, dm_push_notify=True, dm_email_notify=True @@ -63,7 +63,7 @@ class MaybeEnqueueNotificationsTest(ZulipTestCase): self.assertTrue(notified["push_notified"]) with mock_queue_publish( - "zerver.tornado.event_queue.queue_json_publish" + "zerver.tornado.event_queue.queue_json_publish_rollback_unsafe" ) as mock_queue_json_publish: params = self.get_maybe_enqueue_notifications_parameters( message_id=1, diff --git a/zerver/tests/test_link_embed.py b/zerver/tests/test_link_embed.py index c5e08b1486..bc7305f52d 100644 --- a/zerver/tests/test_link_embed.py +++ b/zerver/tests/test_link_embed.py @@ -14,7 +14,7 @@ from typing_extensions import override from zerver.actions.message_delete import do_delete_messages from zerver.lib.cache import cache_delete, cache_get, preview_url_cache_key from zerver.lib.camo import get_camo_url -from zerver.lib.queue import queue_json_publish +from zerver.lib.queue import queue_json_publish_rollback_unsafe from zerver.lib.test_classes import ZulipTestCase from zerver.lib.test_helpers import mock_queue_publish from zerver.lib.url_preview.oembed import get_oembed_data, strip_cdata @@ -457,9 +457,9 @@ class PreviewTestCase(ZulipTestCase): self.assertTrue(responses.assert_call_count(edited_url, 0)) with self.settings(TEST_SUITE=False), self.assertLogs(level="INFO") as info_logs: - # Now proceed with the original queue_json_publish and call the - # up-to-date event for edited_url. - queue_json_publish(*args, **kwargs) + # Now proceed with the original queue_json_publish_rollback_unsafe + # and call the up-to-date event for edited_url. + queue_json_publish_rollback_unsafe(*args, **kwargs) msg = Message.objects.select_related("sender").get(id=msg_id) assert msg.rendered_content is not None self.assertIn( diff --git a/zerver/tests/test_message_delete.py b/zerver/tests/test_message_delete.py index 51bc55a234..f398db29f7 100644 --- a/zerver/tests/test_message_delete.py +++ b/zerver/tests/test_message_delete.py @@ -500,7 +500,7 @@ class DeleteMessageTest(ZulipTestCase): with ( self.capture_send_event_calls(expected_num_events=1), - mock.patch("zerver.tornado.django_api.queue_json_publish") as m, + mock.patch("zerver.tornado.django_api.queue_json_publish_rollback_unsafe") as m, ): m.side_effect = AssertionError( "Events should be sent only after the transaction commits." diff --git a/zerver/tests/test_message_edit_notifications.py b/zerver/tests/test_message_edit_notifications.py index 3841513e1b..661e0d0be8 100644 --- a/zerver/tests/test_message_edit_notifications.py +++ b/zerver/tests/test_message_edit_notifications.py @@ -133,7 +133,8 @@ class EditMessageSideEffectsTest(ZulipTestCase): ) with mock_queue_publish( - "zerver.tornado.event_queue.queue_json_publish", side_effect=fake_publish + "zerver.tornado.event_queue.queue_json_publish_rollback_unsafe", + side_effect=fake_publish, ) as m: maybe_enqueue_notifications(**enqueue_kwargs) diff --git a/zerver/tests/test_queue.py b/zerver/tests/test_queue.py index b6e2f75d87..544ae569ce 100644 --- a/zerver/tests/test_queue.py +++ b/zerver/tests/test_queue.py @@ -10,7 +10,7 @@ from zerver.lib.queue import ( SimpleQueueClient, TornadoQueueClient, get_queue_client, - queue_json_publish, + queue_json_publish_rollback_unsafe, ) from zerver.lib.test_classes import ZulipTestCase @@ -43,7 +43,7 @@ class TestQueueImplementation(ZulipTestCase): output.append(events[0]) queue_client.stop_consuming() - queue_json_publish("test_suite", {"event": "my_event"}) + queue_json_publish_rollback_unsafe("test_suite", {"event": "my_event"}) queue_client.start_json_consumer("test_suite", collect) @@ -67,7 +67,7 @@ class TestQueueImplementation(ZulipTestCase): raise Exception("Make me nack!") output.append(events[0]) - queue_json_publish("test_suite", {"event": "my_event"}) + queue_json_publish_rollback_unsafe("test_suite", {"event": "my_event"}) try: queue_client.start_json_consumer("test_suite", collect) @@ -97,7 +97,7 @@ class TestQueueImplementation(ZulipTestCase): mock.patch("zerver.lib.queue.SimpleQueueClient.publish", throw_connection_error_once), self.assertLogs("zulip.queue", level="WARN") as warn_logs, ): - queue_json_publish("test_suite", {"event": "my_event"}) + queue_json_publish_rollback_unsafe("test_suite", {"event": "my_event"}) self.assertEqual( warn_logs.output, ["WARNING:zulip.queue:Failed to send to rabbitmq, trying to reconnect and send again"], diff --git a/zerver/tests/test_queue_worker.py b/zerver/tests/test_queue_worker.py index a5cde9ab2e..1e0a4887f9 100644 --- a/zerver/tests/test_queue_worker.py +++ b/zerver/tests/test_queue_worker.py @@ -552,7 +552,8 @@ class WorkerTest(ZulipTestCase): with ( mock_queue_publish( - "zerver.lib.queue.queue_json_publish", side_effect=fake_publish + "zerver.lib.queue.queue_json_publish_rollback_unsafe", + side_effect=fake_publish, ), self.assertLogs( "zerver.worker.missedmessage_mobile_notifications", "WARNING" @@ -694,7 +695,9 @@ class WorkerTest(ZulipTestCase): worker.setup() with ( patch("zerver.lib.send_email.build_email", side_effect=EmailNotDeliveredError), - mock_queue_publish("zerver.lib.queue.queue_json_publish", side_effect=fake_publish), + mock_queue_publish( + "zerver.lib.queue.queue_json_publish_rollback_unsafe", side_effect=fake_publish + ), self.assertLogs(level="ERROR") as m, ): worker.start() diff --git a/zerver/tests/test_reactions.py b/zerver/tests/test_reactions.py index d69a00d107..911efb2b62 100644 --- a/zerver/tests/test_reactions.py +++ b/zerver/tests/test_reactions.py @@ -1056,7 +1056,7 @@ class ReactionAPIEventTest(EmojiReactionBase): } with ( self.capture_send_event_calls(expected_num_events=1) as events, - mock.patch("zerver.tornado.django_api.queue_json_publish") as m, + mock.patch("zerver.tornado.django_api.queue_json_publish_rollback_unsafe") as m, ): m.side_effect = AssertionError( "Events should be sent only after the transaction commits!" @@ -1141,7 +1141,7 @@ class ReactionAPIEventTest(EmojiReactionBase): with ( self.capture_send_event_calls(expected_num_events=1), - mock.patch("zerver.tornado.django_api.queue_json_publish") as m, + mock.patch("zerver.tornado.django_api.queue_json_publish_rollback_unsafe") as m, ): m.side_effect = AssertionError( "Events should be sent only after the transaction commits." diff --git a/zerver/tests/test_realm_export.py b/zerver/tests/test_realm_export.py index a1da2c8fa2..fd1a481e7e 100644 --- a/zerver/tests/test_realm_export.py +++ b/zerver/tests/test_realm_export.py @@ -9,7 +9,7 @@ from django.utils.timezone import now as timezone_now from analytics.models import RealmCount from zerver.actions.user_settings import do_change_user_setting from zerver.lib.exceptions import JsonableError -from zerver.lib.queue import queue_json_publish +from zerver.lib.queue import queue_json_publish_rollback_unsafe from zerver.lib.test_classes import ZulipTestCase from zerver.lib.test_helpers import ( HostRequestMock, @@ -250,7 +250,7 @@ class RealmExportTest(ZulipTestCase): patch("zerver.lib.export.do_export_realm") as mock_export, self.assertLogs(level="INFO") as info_logs, ): - queue_json_publish( + queue_json_publish_rollback_unsafe( "deferred_work", { "type": "realm_export", diff --git a/zerver/tests/test_submessage.py b/zerver/tests/test_submessage.py index 182180900d..4defaac333 100644 --- a/zerver/tests/test_submessage.py +++ b/zerver/tests/test_submessage.py @@ -196,7 +196,7 @@ class TestBasics(ZulipTestCase): with ( self.capture_send_event_calls(expected_num_events=1), - mock.patch("zerver.tornado.django_api.queue_json_publish") as m, + mock.patch("zerver.tornado.django_api.queue_json_publish_rollback_unsafe") as m, ): m.side_effect = AssertionError( "Events should be sent only after the transaction commits." diff --git a/zerver/tornado/django_api.py b/zerver/tornado/django_api.py index d7043401d8..e3a8865af5 100644 --- a/zerver/tornado/django_api.py +++ b/zerver/tornado/django_api.py @@ -14,7 +14,7 @@ from typing_extensions import override from urllib3.util import Retry from zerver.lib.partial import partial -from zerver.lib.queue import queue_json_publish +from zerver.lib.queue import queue_json_publish_rollback_unsafe from zerver.models import Client, Realm, UserProfile from zerver.tornado.sharding import ( get_realm_tornado_ports, @@ -201,7 +201,7 @@ def send_event_rollback_unsafe( port_user_map[get_user_id_tornado_port(realm_ports, user_id)].append(user) for port, port_users in port_user_map.items(): - queue_json_publish( + queue_json_publish_rollback_unsafe( notify_tornado_queue_name(port), dict(event=event, users=port_users), partial(send_notification_http, port), diff --git a/zerver/tornado/event_queue.py b/zerver/tornado/event_queue.py index 84964eb044..f0bbc6f932 100644 --- a/zerver/tornado/event_queue.py +++ b/zerver/tornado/event_queue.py @@ -27,7 +27,7 @@ from zerver.lib.message_cache import MessageDict from zerver.lib.narrow_helpers import narrow_dataclasses_from_tuples from zerver.lib.narrow_predicate import build_narrow_predicate from zerver.lib.notification_data import UserMessageNotificationsData -from zerver.lib.queue import queue_json_publish, retry_event +from zerver.lib.queue import queue_json_publish_rollback_unsafe, retry_event from zerver.middleware import async_request_timer_restart from zerver.models import CustomProfileField from zerver.tornado.descriptors import clear_descriptor_by_handler_id, set_descriptor_by_handler_id @@ -964,9 +964,11 @@ def maybe_enqueue_notifications( shard_id = ( user_notifications_data.user_id % settings.MOBILE_NOTIFICATIONS_SHARDS + 1 ) - queue_json_publish(f"missedmessage_mobile_notifications_shard{shard_id}", notice) + queue_json_publish_rollback_unsafe( + f"missedmessage_mobile_notifications_shard{shard_id}", notice + ) else: - queue_json_publish("missedmessage_mobile_notifications", notice) + queue_json_publish_rollback_unsafe("missedmessage_mobile_notifications", notice) notified["push_notified"] = True # Send missed_message emails if a direct message or a @@ -980,7 +982,7 @@ def maybe_enqueue_notifications( ) notice["mentioned_user_group_id"] = mentioned_user_group_id if not already_notified.get("email_notified"): - queue_json_publish("missedmessage_emails", notice, lambda notice: None) + queue_json_publish_rollback_unsafe("missedmessage_emails", notice, lambda notice: None) notified["email_notified"] = True return notified diff --git a/zerver/worker/deferred_work.py b/zerver/worker/deferred_work.py index 3d91876a29..95c23ebf18 100644 --- a/zerver/worker/deferred_work.py +++ b/zerver/worker/deferred_work.py @@ -17,7 +17,7 @@ from zerver.actions.message_send import internal_send_private_message from zerver.actions.realm_export import notify_realm_export from zerver.lib.export import export_realm_wrapper from zerver.lib.push_notifications import clear_push_device_tokens -from zerver.lib.queue import queue_json_publish, retry_event +from zerver.lib.queue import queue_json_publish_rollback_unsafe, retry_event from zerver.lib.remote_server import ( PushNotificationBouncerRetryLaterError, send_server_data_to_push_bouncer, @@ -100,7 +100,7 @@ class DeferredWorker(QueueProcessingWorker): # 30s, we re-push the task onto the tail of the # queue, to allow other deferred work to complete; # this task is extremely low priority. - queue_json_publish("deferred_work", {**event, "min_id": min_id}) + queue_json_publish_rollback_unsafe("deferred_work", {**event, "min_id": min_id}) break logger.info( "Marked %s messages as read for all users, stream_recipient_id %s", diff --git a/zilencer/management/commands/queue_rate.py b/zilencer/management/commands/queue_rate.py index db47e6a798..3fb63b9da3 100644 --- a/zilencer/management/commands/queue_rate.py +++ b/zilencer/management/commands/queue_rate.py @@ -6,7 +6,7 @@ from django.core.management.base import CommandParser from typing_extensions import override from zerver.lib.management import ZulipBaseCommand -from zerver.lib.queue import SimpleQueueClient, queue_json_publish +from zerver.lib.queue import SimpleQueueClient, queue_json_publish_rollback_unsafe from zerver.worker.test import BatchNoopWorker, NoopWorker @@ -71,7 +71,7 @@ class Command(ZulipBaseCommand): for i in range(1, reps + 1): worker.consumed = 0 timeit( - lambda: queue_json_publish(queue_name, {}), + lambda: queue_json_publish_rollback_unsafe(queue_name, {}), number=count, ) duration = timeit(worker.start, number=1)