From f95dd628bdaf5cc72e77f0e2d0e52a20e4e5cf7c Mon Sep 17 00:00:00 2001 From: Mateusz Mandera Date: Sat, 5 Sep 2020 19:57:28 +0200 Subject: [PATCH] email: Fix race conditions with concurrent ScheduledEmail handling. The main race conditions, which actually happened in production was with concurrent execution of deliver_email and clear_scheduled_emails. clear_scheduled_emails could delete all email.users in the middle of deliver_email execution, causing it to pass empty to_user_ids list to send_email. We mitigate this by getting the list of user ids in a single query and moving forward with that snapshot, not having to worry about database data being mutated anymore. clear_scheduled_emails had potential race conditions with concurrent execution of itself due to not locking the appropriate rows upon selecting them for the purpose of potentially deleting them. FOR UPDATE locks need to be acquired to prevent simultaneous mutation. Tested manually with some print+sleep debugging to make some races happen. fixes #zulip-2k (sentry) --- zerver/lib/send_email.py | 30 +++++++++++++++++++++++++++--- zerver/tests/test_users.py | 19 +++++++++++++++++++ 2 files changed, 46 insertions(+), 3 deletions(-) diff --git a/zerver/lib/send_email.py b/zerver/lib/send_email.py index 8818e11751..76049d9537 100644 --- a/zerver/lib/send_email.py +++ b/zerver/lib/send_email.py @@ -12,6 +12,7 @@ import orjson from django.conf import settings from django.core.mail import EmailMultiAlternatives from django.core.management import CommandError +from django.db import transaction from django.template import loader from django.template.exceptions import TemplateDoesNotExist from django.utils.timezone import now as timezone_now @@ -219,13 +220,28 @@ def clear_scheduled_invitation_emails(email: str) -> None: type=ScheduledEmail.INVITATION_REMINDER) items.delete() +@transaction.atomic() def clear_scheduled_emails(user_ids: List[int], email_type: Optional[int]=None) -> None: - items = ScheduledEmail.objects.filter(users__in=user_ids).distinct() + # We need to obtain a FOR UPDATE lock on the selected rows to keep a concurrent + # execution of this function (or something else) from deleting them before we access + # the .users attribute. + items = ScheduledEmail.objects.filter(users__in=user_ids).select_for_update() if email_type is not None: items = items.filter(type=email_type) + + deduplicated_items = {} for item in items: + deduplicated_items[item.id] = item + for item in deduplicated_items.values(): + # Now we want a FOR UPDATE lock on the item.users rows + # to prevent a concurrent transaction from mutating them + # simultanously. + item.users.all().select_for_update() item.users.remove(*user_ids) if item.users.all().count() == 0: + # Due to our transaction holding the row lock we have a guarantee + # that the obtained COUNT is accurate, thus we can reliably use it + # to decide whether to delete the ScheduledEmail row. item.delete() def handle_send_email_format_changes(job: Dict[str, Any]) -> None: @@ -242,8 +258,16 @@ def handle_send_email_format_changes(job: Dict[str, Any]) -> None: def deliver_email(email: ScheduledEmail) -> None: data = orjson.loads(email.data) - if email.users.exists(): - data['to_user_ids'] = [user.id for user in email.users.all()] + user_ids = list(email.users.values_list('id', flat=True)) + if not user_ids and not email.address: + # This state doesn't make sense, so something must be mutating, + # or in the process of deleting, the object. We assume it will bring + # things to a correct state, and we just do nothing except logging this event. + logger.warning("ScheduledEmail id %s has empty users and address attributes.", email.id) + return + + if user_ids: + data['to_user_ids'] = user_ids if email.address is not None: data['to_emails'] = [email.address] handle_send_email_format_changes(data) diff --git a/zerver/tests/test_users.py b/zerver/tests/test_users.py index d86363fa56..547646610e 100644 --- a/zerver/tests/test_users.py +++ b/zerver/tests/test_users.py @@ -1284,6 +1284,25 @@ class ActivateTest(ZulipTestCase): ) self.assertEqual(ScheduledEmail.objects.count(), 0) + def test_deliver_email_no_addressees(self) -> None: + iago = self.example_user('iago') + hamlet = self.example_user('hamlet') + to_user_ids = [hamlet.id, iago.id] + send_future_email('zerver/emails/followup_day1', iago.realm, + to_user_ids=to_user_ids, delay=datetime.timedelta(hours=1)) + self.assertEqual(ScheduledEmail.objects.count(), 1) + email = ScheduledEmail.objects.all().first() + email.users.remove(*to_user_ids) + + with self.assertLogs('zulip.send_email', level='INFO') as info_log: + deliver_email(email) + from django.core.mail import outbox + self.assertEqual(len(outbox), 0) + self.assertEqual(ScheduledEmail.objects.count(), 1) + self.assertEqual(info_log.output, [ + f'WARNING:zulip.send_email:ScheduledEmail id {email.id} has empty users and address attributes.' + ]) + class RecipientInfoTest(ZulipTestCase): def test_stream_recipient_info(self) -> None: hamlet = self.example_user('hamlet')