mirror of
https://github.com/zulip/zulip.git
synced 2025-11-16 20:02:15 +00:00
email: Fix race conditions with concurrent ScheduledEmail handling.
The main race conditions, which actually happened in production was with concurrent execution of deliver_email and clear_scheduled_emails. clear_scheduled_emails could delete all email.users in the middle of deliver_email execution, causing it to pass empty to_user_ids list to send_email. We mitigate this by getting the list of user ids in a single query and moving forward with that snapshot, not having to worry about database data being mutated anymore. clear_scheduled_emails had potential race conditions with concurrent execution of itself due to not locking the appropriate rows upon selecting them for the purpose of potentially deleting them. FOR UPDATE locks need to be acquired to prevent simultaneous mutation. Tested manually with some print+sleep debugging to make some races happen. fixes #zulip-2k (sentry)
This commit is contained in:
committed by
Tim Abbott
parent
b7b7475672
commit
f95dd628bd
@@ -12,6 +12,7 @@ import orjson
|
|||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
from django.core.mail import EmailMultiAlternatives
|
from django.core.mail import EmailMultiAlternatives
|
||||||
from django.core.management import CommandError
|
from django.core.management import CommandError
|
||||||
|
from django.db import transaction
|
||||||
from django.template import loader
|
from django.template import loader
|
||||||
from django.template.exceptions import TemplateDoesNotExist
|
from django.template.exceptions import TemplateDoesNotExist
|
||||||
from django.utils.timezone import now as timezone_now
|
from django.utils.timezone import now as timezone_now
|
||||||
@@ -219,13 +220,28 @@ def clear_scheduled_invitation_emails(email: str) -> None:
|
|||||||
type=ScheduledEmail.INVITATION_REMINDER)
|
type=ScheduledEmail.INVITATION_REMINDER)
|
||||||
items.delete()
|
items.delete()
|
||||||
|
|
||||||
|
@transaction.atomic()
|
||||||
def clear_scheduled_emails(user_ids: List[int], email_type: Optional[int]=None) -> None:
|
def clear_scheduled_emails(user_ids: List[int], email_type: Optional[int]=None) -> None:
|
||||||
items = ScheduledEmail.objects.filter(users__in=user_ids).distinct()
|
# We need to obtain a FOR UPDATE lock on the selected rows to keep a concurrent
|
||||||
|
# execution of this function (or something else) from deleting them before we access
|
||||||
|
# the .users attribute.
|
||||||
|
items = ScheduledEmail.objects.filter(users__in=user_ids).select_for_update()
|
||||||
if email_type is not None:
|
if email_type is not None:
|
||||||
items = items.filter(type=email_type)
|
items = items.filter(type=email_type)
|
||||||
|
|
||||||
|
deduplicated_items = {}
|
||||||
for item in items:
|
for item in items:
|
||||||
|
deduplicated_items[item.id] = item
|
||||||
|
for item in deduplicated_items.values():
|
||||||
|
# Now we want a FOR UPDATE lock on the item.users rows
|
||||||
|
# to prevent a concurrent transaction from mutating them
|
||||||
|
# simultanously.
|
||||||
|
item.users.all().select_for_update()
|
||||||
item.users.remove(*user_ids)
|
item.users.remove(*user_ids)
|
||||||
if item.users.all().count() == 0:
|
if item.users.all().count() == 0:
|
||||||
|
# Due to our transaction holding the row lock we have a guarantee
|
||||||
|
# that the obtained COUNT is accurate, thus we can reliably use it
|
||||||
|
# to decide whether to delete the ScheduledEmail row.
|
||||||
item.delete()
|
item.delete()
|
||||||
|
|
||||||
def handle_send_email_format_changes(job: Dict[str, Any]) -> None:
|
def handle_send_email_format_changes(job: Dict[str, Any]) -> None:
|
||||||
@@ -242,8 +258,16 @@ def handle_send_email_format_changes(job: Dict[str, Any]) -> None:
|
|||||||
|
|
||||||
def deliver_email(email: ScheduledEmail) -> None:
|
def deliver_email(email: ScheduledEmail) -> None:
|
||||||
data = orjson.loads(email.data)
|
data = orjson.loads(email.data)
|
||||||
if email.users.exists():
|
user_ids = list(email.users.values_list('id', flat=True))
|
||||||
data['to_user_ids'] = [user.id for user in email.users.all()]
|
if not user_ids and not email.address:
|
||||||
|
# This state doesn't make sense, so something must be mutating,
|
||||||
|
# or in the process of deleting, the object. We assume it will bring
|
||||||
|
# things to a correct state, and we just do nothing except logging this event.
|
||||||
|
logger.warning("ScheduledEmail id %s has empty users and address attributes.", email.id)
|
||||||
|
return
|
||||||
|
|
||||||
|
if user_ids:
|
||||||
|
data['to_user_ids'] = user_ids
|
||||||
if email.address is not None:
|
if email.address is not None:
|
||||||
data['to_emails'] = [email.address]
|
data['to_emails'] = [email.address]
|
||||||
handle_send_email_format_changes(data)
|
handle_send_email_format_changes(data)
|
||||||
|
|||||||
@@ -1284,6 +1284,25 @@ class ActivateTest(ZulipTestCase):
|
|||||||
)
|
)
|
||||||
self.assertEqual(ScheduledEmail.objects.count(), 0)
|
self.assertEqual(ScheduledEmail.objects.count(), 0)
|
||||||
|
|
||||||
|
def test_deliver_email_no_addressees(self) -> None:
|
||||||
|
iago = self.example_user('iago')
|
||||||
|
hamlet = self.example_user('hamlet')
|
||||||
|
to_user_ids = [hamlet.id, iago.id]
|
||||||
|
send_future_email('zerver/emails/followup_day1', iago.realm,
|
||||||
|
to_user_ids=to_user_ids, delay=datetime.timedelta(hours=1))
|
||||||
|
self.assertEqual(ScheduledEmail.objects.count(), 1)
|
||||||
|
email = ScheduledEmail.objects.all().first()
|
||||||
|
email.users.remove(*to_user_ids)
|
||||||
|
|
||||||
|
with self.assertLogs('zulip.send_email', level='INFO') as info_log:
|
||||||
|
deliver_email(email)
|
||||||
|
from django.core.mail import outbox
|
||||||
|
self.assertEqual(len(outbox), 0)
|
||||||
|
self.assertEqual(ScheduledEmail.objects.count(), 1)
|
||||||
|
self.assertEqual(info_log.output, [
|
||||||
|
f'WARNING:zulip.send_email:ScheduledEmail id {email.id} has empty users and address attributes.'
|
||||||
|
])
|
||||||
|
|
||||||
class RecipientInfoTest(ZulipTestCase):
|
class RecipientInfoTest(ZulipTestCase):
|
||||||
def test_stream_recipient_info(self) -> None:
|
def test_stream_recipient_info(self) -> None:
|
||||||
hamlet = self.example_user('hamlet')
|
hamlet = self.example_user('hamlet')
|
||||||
|
|||||||
Reference in New Issue
Block a user