message_send: Update do_send_messages codepath to send event on commit.

Earlier, we were using 'send_event' & 'queue_json_publish' in
'do_send_messages' which can lead to a situation where we enqueue
events but the transaction fails at a later stage.

Events should not be sent until we know we're not rolling back.
This commit is contained in:
Prakhar Pratyush
2024-05-15 22:54:37 +05:30
committed by Tim Abbott
parent 27c4e46b30
commit c798d192dc
15 changed files with 473 additions and 270 deletions

View File

@@ -41,7 +41,7 @@ from zerver.models.streams import (
get_stream_by_id_in_realm,
)
from zerver.models.users import active_non_guest_user_ids, active_user_ids, is_cross_realm_bot_email
from zerver.tornado.django_api import send_event
from zerver.tornado.django_api import send_event_on_commit
class StreamDict(TypedDict, total=False):
@@ -123,9 +123,10 @@ def send_stream_creation_event(
recent_traffic: Optional[Dict[int, int]] = None,
) -> None:
event = dict(type="stream", op="create", streams=[stream_to_dict(stream, recent_traffic)])
send_event(realm, event, user_ids)
send_event_on_commit(realm, event, user_ids)
@transaction.atomic(savepoint=False)
def create_stream_if_needed(
realm: Realm,
stream_name: str,
@@ -149,40 +150,39 @@ def create_stream_if_needed(
)
assert can_remove_subscribers_group is not None
with transaction.atomic():
(stream, created) = Stream.objects.get_or_create(
(stream, created) = Stream.objects.get_or_create(
realm=realm,
name__iexact=stream_name,
defaults=dict(
name=stream_name,
creator=acting_user,
description=stream_description,
invite_only=invite_only,
is_web_public=is_web_public,
stream_post_policy=stream_post_policy,
history_public_to_subscribers=history_public_to_subscribers,
is_in_zephyr_realm=realm.is_zephyr_mirror_realm,
message_retention_days=message_retention_days,
can_remove_subscribers_group=can_remove_subscribers_group,
),
)
if created:
recipient = Recipient.objects.create(type_id=stream.id, type=Recipient.STREAM)
stream.recipient = recipient
stream.rendered_description = render_stream_description(stream_description, realm)
stream.save(update_fields=["recipient", "rendered_description"])
event_time = timezone_now()
RealmAuditLog.objects.create(
realm=realm,
name__iexact=stream_name,
defaults=dict(
name=stream_name,
creator=acting_user,
description=stream_description,
invite_only=invite_only,
is_web_public=is_web_public,
stream_post_policy=stream_post_policy,
history_public_to_subscribers=history_public_to_subscribers,
is_in_zephyr_realm=realm.is_zephyr_mirror_realm,
message_retention_days=message_retention_days,
can_remove_subscribers_group=can_remove_subscribers_group,
),
acting_user=acting_user,
modified_stream=stream,
event_type=RealmAuditLog.STREAM_CREATED,
event_time=event_time,
)
if created:
recipient = Recipient.objects.create(type_id=stream.id, type=Recipient.STREAM)
stream.recipient = recipient
stream.rendered_description = render_stream_description(stream_description, realm)
stream.save(update_fields=["recipient", "rendered_description"])
event_time = timezone_now()
RealmAuditLog.objects.create(
realm=realm,
acting_user=acting_user,
modified_stream=stream,
event_type=RealmAuditLog.STREAM_CREATED,
event_time=event_time,
)
if created:
if stream.is_public():
if stream.is_web_public:
notify_user_ids = active_user_ids(stream.realm_id)