scheduled_email: Consistently lock users table.

Only clear_scheduled_emails previously took a lock on the users before
removing them; make deliver_scheduled_emails do so as well, by using
prefetch_related to ensure that the table appears in the SELECT.  This
is not necessary for correctness, since all accesses of
ScheduledEmailUser first access the ScheduledEmail and lock it; it is
merely for consistency.

Since SELECT ... FOR UPDATE takes an UPDATE lock on all tables
mentioned in the SELECT, merely doing the prefetch is sufficient to
lock both tables; no `on=(...)` is needed to `select_for_update`.

This also does not address the pre-existing potential deadlock from
these two use cases, where both try to lock the same ScheduledEmail
rows in opposite orders.
This commit is contained in:
Alex Vandiver
2021-08-14 01:19:07 +00:00
committed by Tim Abbott
parent ebaafb32f3
commit 4c518c2bba
2 changed files with 10 additions and 8 deletions

View File

@@ -420,15 +420,15 @@ def clear_scheduled_emails(user_id: int, email_type: Optional[int] = None) -> No
# We need to obtain a FOR UPDATE lock on the selected rows to keep a concurrent
# execution of this function (or something else) from deleting them before we access
# the .users attribute.
items = ScheduledEmail.objects.filter(users__in=[user_id]).select_for_update()
items = (
ScheduledEmail.objects.filter(users__in=[user_id])
.prefetch_related("users")
.select_for_update()
)
if email_type is not None:
items = items.filter(type=email_type)
for item in items:
# Now we want a FOR UPDATE lock on the item.users rows
# to prevent a concurrent transaction from mutating them
# simultaneously.
item.users.all().select_for_update()
item.users.remove(user_id)
if item.users.all().count() == 0:
# Due to our transaction holding the row lock we have a guarantee

View File

@@ -36,9 +36,11 @@ Usage: ./manage.py deliver_scheduled_emails
while True:
found_rows = False
with transaction.atomic():
email_jobs_to_deliver = ScheduledEmail.objects.filter(
scheduled_timestamp__lte=timezone_now()
).select_for_update()
email_jobs_to_deliver = (
ScheduledEmail.objects.filter(scheduled_timestamp__lte=timezone_now())
.prefetch_related("users")
.select_for_update()
)
if email_jobs_to_deliver:
found_rows = True
for job in email_jobs_to_deliver: