mirror of
https://github.com/zulip/zulip.git
synced 2025-10-25 09:03:57 +00:00
deliver_scheduled_*: SELECT FOR UPDATE the relevant rows.
`deliver_scheduled_emails` and `deliver_scheduled_messages` use their respective tables like a queue, but do not have guarantees that there was only one consumer (besides the EMAIL_DELIVERER_DISABLED setting), and could send duplicate messages if multiple consumers raced in reading rows. Use database locking to ensure that the database only feeds a given ScheduledMessage or ScheduledEmail row to a single consumer. A second consumer, if it exists, will block until the first consumer commits the transaction.
This commit is contained in:
committed by
Tim Abbott
parent
82797dd53c
commit
1e67e0f218
@@ -15,6 +15,7 @@ from typing import Any
|
|||||||
|
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
from django.core.management.base import BaseCommand
|
from django.core.management.base import BaseCommand
|
||||||
|
from django.db import transaction
|
||||||
from django.utils.timezone import now as timezone_now
|
from django.utils.timezone import now as timezone_now
|
||||||
|
|
||||||
from zerver.lib.logging_util import log_to_file
|
from zerver.lib.logging_util import log_to_file
|
||||||
@@ -42,17 +43,20 @@ Usage: ./manage.py deliver_scheduled_emails
|
|||||||
sleep_forever()
|
sleep_forever()
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
email_jobs_to_deliver = ScheduledEmail.objects.filter(
|
found_rows = False
|
||||||
scheduled_timestamp__lte=timezone_now()
|
with transaction.atomic():
|
||||||
)
|
email_jobs_to_deliver = ScheduledEmail.objects.filter(
|
||||||
if email_jobs_to_deliver:
|
scheduled_timestamp__lte=timezone_now()
|
||||||
for job in email_jobs_to_deliver:
|
).select_for_update()
|
||||||
try:
|
if email_jobs_to_deliver:
|
||||||
deliver_scheduled_emails(job)
|
for job in email_jobs_to_deliver:
|
||||||
except EmailNotDeliveredException:
|
try:
|
||||||
logger.warning("%r not delivered", job)
|
deliver_scheduled_emails(job)
|
||||||
|
except EmailNotDeliveredException:
|
||||||
|
logger.warning("%r not delivered", job)
|
||||||
|
# Less load on the db during times of activity,
|
||||||
|
# and more responsiveness when the load is low
|
||||||
|
if found_rows:
|
||||||
time.sleep(10)
|
time.sleep(10)
|
||||||
else:
|
else:
|
||||||
# Less load on the db during times of activity,
|
|
||||||
# and more responsiveness when the load is low
|
|
||||||
time.sleep(2)
|
time.sleep(2)
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ from typing import Any
|
|||||||
|
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
from django.core.management.base import BaseCommand
|
from django.core.management.base import BaseCommand
|
||||||
|
from django.db import transaction
|
||||||
from django.utils.timezone import now as timezone_now
|
from django.utils.timezone import now as timezone_now
|
||||||
|
|
||||||
from zerver.lib.actions import build_message_send_dict, do_send_messages
|
from zerver.lib.actions import build_message_send_dict, do_send_messages
|
||||||
@@ -65,13 +66,14 @@ Usage: ./manage.py deliver_scheduled_messages
|
|||||||
sleep_forever()
|
sleep_forever()
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
messages_to_deliver = ScheduledMessage.objects.filter(
|
with transaction.atomic():
|
||||||
scheduled_timestamp__lte=timezone_now(), delivered=False
|
messages_to_deliver = ScheduledMessage.objects.filter(
|
||||||
)
|
scheduled_timestamp__lte=timezone_now(), delivered=False
|
||||||
for message in messages_to_deliver:
|
).select_for_update()
|
||||||
do_send_messages([self.construct_message(message)])
|
for message in messages_to_deliver:
|
||||||
message.delivered = True
|
do_send_messages([self.construct_message(message)])
|
||||||
message.save(update_fields=["delivered"])
|
message.delivered = True
|
||||||
|
message.save(update_fields=["delivered"])
|
||||||
|
|
||||||
cur_time = timezone_now()
|
cur_time = timezone_now()
|
||||||
time_next_min = (cur_time + timedelta(minutes=1)).replace(second=0, microsecond=0)
|
time_next_min = (cur_time + timedelta(minutes=1)).replace(second=0, microsecond=0)
|
||||||
|
|||||||
Reference in New Issue
Block a user