mirror of
https://github.com/zulip/zulip.git
synced 2025-11-11 09:27:43 +00:00
Only clear_scheduled_emails previously took a lock on the users before removing them; make deliver_scheduled_emails do so as well, by using prefetch_related to ensure that the table appears in the SELECT. This is not necessary for correctness, since all accesses of ScheduledEmailUser first access the ScheduledEmail and lock it; it is merely for consistency. Since SELECT ... FOR UPDATE takes an UPDATE lock on all tables mentioned in the SELECT, merely doing the prefetch is sufficient to lock both tables; no `on=(...)` is needed to `select_for_update`. This also does not address the pre-existing potential deadlock from these two use cases, where both try to lock the same ScheduledEmail rows in opposite orders.
57 lines
1.8 KiB
Python
57 lines
1.8 KiB
Python
"""\
|
|
Send email messages that have been queued for later delivery by
|
|
various things (at this time invitation reminders and day1/day2
|
|
followup emails).
|
|
|
|
This management command is run via supervisor.
|
|
"""
|
|
import logging
|
|
import time
|
|
from typing import Any
|
|
|
|
from django.conf import settings
|
|
from django.core.management.base import BaseCommand
|
|
from django.db import transaction
|
|
from django.utils.timezone import now as timezone_now
|
|
|
|
from zerver.lib.logging_util import log_to_file
|
|
from zerver.lib.send_email import EmailNotDeliveredException, deliver_scheduled_emails
|
|
from zerver.models import ScheduledEmail
|
|
|
|
## Setup ##
|
|
logger = logging.getLogger(__name__)
|
|
log_to_file(logger, settings.EMAIL_DELIVERER_LOG_PATH)
|
|
|
|
|
|
class Command(BaseCommand):
|
|
help = """Send emails queued by various parts of Zulip
|
|
for later delivery.
|
|
|
|
Run this command under supervisor.
|
|
|
|
Usage: ./manage.py deliver_scheduled_emails
|
|
"""
|
|
|
|
def handle(self, *args: Any, **options: Any) -> None:
|
|
while True:
|
|
found_rows = False
|
|
with transaction.atomic():
|
|
email_jobs_to_deliver = (
|
|
ScheduledEmail.objects.filter(scheduled_timestamp__lte=timezone_now())
|
|
.prefetch_related("users")
|
|
.select_for_update()
|
|
)
|
|
if email_jobs_to_deliver:
|
|
found_rows = True
|
|
for job in email_jobs_to_deliver:
|
|
try:
|
|
deliver_scheduled_emails(job)
|
|
except EmailNotDeliveredException:
|
|
logger.warning("%r not delivered", job)
|
|
# Less load on the db during times of activity,
|
|
# and more responsiveness when the load is low
|
|
if found_rows:
|
|
time.sleep(10)
|
|
else:
|
|
time.sleep(2)
|