message delete: Select Message FOR UPDATE when archiving.

Further commits will start locking the message rows while
adding related fields like reactions or submessages,
to handle races caused by deleting the message itself at the
same time.

The message locking implemented then will create a possibility
of deadlocks, where the related field transaction holds a lock
on the message row, and the message-delete transaction holds a
lock on the database row of the related field (which will also
need to be deleted when the message is deleted), and both
transactions wait for each other.

To prevent such a deadlock, we lock the message itself while
it is being deleted, so that the message-delete transaction
will have to wait till the other transaction (which is about
to delete the related field, and also holds a lock on the
message row) commits.

https://chat.zulip.org/#narrow/near/1185943 has more details.
This commit is contained in:
Abhijeet Prasad Bodas
2021-06-03 19:16:13 +05:30
committed by Tim Abbott
parent 1a9f385e17
commit 5f4113cf60
5 changed files with 31 additions and 5 deletions

View File

@@ -6156,7 +6156,7 @@ def do_delete_messages(realm: Realm, messages: Iterable[Message]) -> None:
move_messages_to_archive(message_ids, realm=realm, chunk_size=archiving_chunk_size)
event["message_type"] = message_type
send_event(realm, event, users_to_notify)
transaction.on_commit(lambda: send_event(realm, event, users_to_notify))
def do_delete_messages_by_sender(user: UserProfile) -> None:

View File

@@ -676,7 +676,7 @@ def access_message(
"""
try:
base_query = Message.objects.select_related()
if lock_message: # nocoverage
if lock_message:
# We want to lock only the `Message` row, and not the related fields
# because the `Message` row only has a possibility of races.
base_query = base_query.select_for_update(of=("self",))

View File

@@ -10,6 +10,7 @@ from django.http import HttpResponse
from zerver.lib.actions import (
do_change_stream_post_policy,
do_change_user_role,
do_delete_messages,
do_set_realm_property,
do_update_message,
get_topic_messages,
@@ -2035,3 +2036,20 @@ class DeleteMessageTest(ZulipTestCase):
m.side_effect = Message.DoesNotExist()
result = test_delete_message_by_owner(msg_id=msg_id)
self.assert_json_error(result, "Message already deleted")
def test_delete_event_sent_after_transaction_commits(self) -> None:
"""
Tests that `send_event` is hooked to `transaction.on_commit`. This is important, because
we don't want to end up holding locks on message rows for too long if the event queue runs
into a problem.
"""
hamlet = self.example_user("hamlet")
self.send_stream_message(hamlet, "Scotland")
message = self.get_last_message()
with self.tornado_redirected_to_list([], expected_num_events=1):
with mock.patch("zerver.lib.actions.send_event") as m:
m.side_effect = AssertionError(
"Events should be sent only after the transaction commits."
)
do_delete_messages(hamlet.realm, [message])

View File

@@ -2,7 +2,7 @@ import datetime
from typing import Any, Dict, List, Optional
import orjson
from django.db import IntegrityError
from django.db import IntegrityError, transaction
from django.http import HttpRequest, HttpResponse
from django.utils.timezone import now as timezone_now
from django.utils.translation import gettext as _
@@ -143,13 +143,18 @@ def validate_can_delete_message(user_profile: UserProfile, message: Message) ->
return
@transaction.atomic
@has_request_variables
def delete_message_backend(
request: HttpRequest,
user_profile: UserProfile,
message_id: int = REQ(converter=to_non_negative_int, path_only=True),
) -> HttpResponse:
message, ignored_user_message = access_message(user_profile, message_id)
# We lock the `Message` object to ensure that any transactions modifying the `Message` object
# concurrently are serialized properly with deleting the message; this prevents a deadlock
# that would otherwise happen because of the other transaction holding a lock on the `Message`
# row.
message, ignored_user_message = access_message(user_profile, message_id, lock_message=True)
validate_can_delete_message(user_profile, message)
try:
do_delete_messages(user_profile.realm, [message])

View File

@@ -740,6 +740,7 @@ def get_topics_backend(
return json_success(dict(topics=result))
@transaction.atomic
@require_realm_admin
@has_request_variables
def delete_in_topic(
@@ -756,7 +757,9 @@ def delete_in_topic(
deletable_message_ids = UserMessage.objects.filter(
user_profile=user_profile, message_id__in=messages
).values_list("message_id", flat=True)
messages = [message for message in messages if message.id in deletable_message_ids]
messages = messages.filter(id__in=deletable_message_ids)
messages = messages.select_for_update(of=("self",))
do_delete_messages(user_profile.realm, messages)