mirror of
https://github.com/zulip/zulip.git
synced 2025-10-25 09:03:57 +00:00
We've had a bug for a while that if any ScheduledEmail objects get created with the wrong email sender address, even after the sysadmin corrects the problem, they'll still get errors because of the objects stored with the wrong format. We solve this by using FromAddress placeholders strings in send_future_email function, so that ScheduledEmail objects end up setting the final `from_address` value when mail is actually sent using the setting in effect at that time. Fixes #11008.
226 lines
9.7 KiB
Python
226 lines
9.7 KiB
Python
from django.conf import settings
|
|
from django.core.mail import EmailMultiAlternatives
|
|
from django.template import loader
|
|
from django.utils.timezone import now as timezone_now
|
|
from django.utils.translation import override as override_language
|
|
from django.utils.translation import ugettext as _
|
|
from django.template.exceptions import TemplateDoesNotExist
|
|
|
|
from zerver.models import ScheduledEmail, get_user_profile_by_id, \
|
|
EMAIL_TYPES, Realm, UserProfile
|
|
|
|
import datetime
|
|
from email.utils import parseaddr, formataddr
|
|
import logging
|
|
import ujson
|
|
|
|
import os
|
|
from typing import Any, Dict, List, Mapping, Optional, Tuple
|
|
|
|
from zerver.lib.logging_util import log_to_file
|
|
from confirmation.models import generate_key
|
|
|
|
## Logging setup ##
|
|
|
|
logger = logging.getLogger('zulip.send_email')
|
|
log_to_file(logger, settings.EMAIL_LOG_PATH)
|
|
|
|
class FromAddress:
|
|
SUPPORT = parseaddr(settings.ZULIP_ADMINISTRATOR)[1]
|
|
NOREPLY = parseaddr(settings.NOREPLY_EMAIL_ADDRESS)[1]
|
|
|
|
support_placeholder = "SUPPORT"
|
|
no_reply_placeholder = 'NO_REPLY'
|
|
tokenized_no_reply_placeholder = 'TOKENIZED_NO_REPLY'
|
|
|
|
# Generates an unpredictable noreply address.
|
|
@staticmethod
|
|
def tokenized_no_reply_address() -> str:
|
|
if settings.ADD_TOKENS_TO_NOREPLY_ADDRESS:
|
|
return parseaddr(settings.TOKENIZED_NOREPLY_EMAIL_ADDRESS)[1].format(token=generate_key())
|
|
return FromAddress.NOREPLY
|
|
|
|
@staticmethod
|
|
def security_email_from_name(language: Optional[str]=None,
|
|
user_profile: Optional[UserProfile]=None) -> str:
|
|
if language is None:
|
|
assert user_profile is not None
|
|
language = user_profile.default_language
|
|
|
|
with override_language(language):
|
|
return _("Zulip Account Security")
|
|
|
|
def build_email(template_prefix: str, to_user_ids: Optional[List[int]]=None,
|
|
to_emails: Optional[List[str]]=None, from_name: Optional[str]=None,
|
|
from_address: Optional[str]=None, reply_to_email: Optional[str]=None,
|
|
language: Optional[str]=None, context: Optional[Dict[str, Any]]=None
|
|
) -> EmailMultiAlternatives:
|
|
# Callers should pass exactly one of to_user_id and to_email.
|
|
assert (to_user_ids is None) ^ (to_emails is None)
|
|
if to_user_ids is not None:
|
|
to_users = [get_user_profile_by_id(to_user_id) for to_user_id in to_user_ids]
|
|
to_emails = [formataddr((to_user.full_name, to_user.delivery_email)) for to_user in to_users]
|
|
|
|
if context is None:
|
|
context = {}
|
|
|
|
context.update({
|
|
'support_email': FromAddress.SUPPORT,
|
|
'email_images_base_uri': settings.ROOT_DOMAIN_URI + '/static/images/emails',
|
|
'physical_address': settings.PHYSICAL_ADDRESS,
|
|
})
|
|
|
|
def render_templates() -> Tuple[str, str, str]:
|
|
email_subject = loader.render_to_string(template_prefix + '.subject.txt',
|
|
context=context,
|
|
using='Jinja2_plaintext').strip().replace('\n', '')
|
|
message = loader.render_to_string(template_prefix + '.txt',
|
|
context=context, using='Jinja2_plaintext')
|
|
|
|
try:
|
|
html_message = loader.render_to_string(template_prefix + '.html', context)
|
|
except TemplateDoesNotExist:
|
|
emails_dir = os.path.dirname(template_prefix)
|
|
template = os.path.basename(template_prefix)
|
|
compiled_template_prefix = os.path.join(emails_dir, "compiled", template)
|
|
html_message = loader.render_to_string(compiled_template_prefix + '.html', context)
|
|
return (html_message, message, email_subject)
|
|
|
|
if not language and to_user_ids is not None:
|
|
language = to_users[0].default_language
|
|
if language:
|
|
with override_language(language):
|
|
# Make sure that we render the email using the target's native language
|
|
(html_message, message, email_subject) = render_templates()
|
|
else:
|
|
(html_message, message, email_subject) = render_templates()
|
|
logger.warning("Missing language for email template '{}'".format(template_prefix))
|
|
|
|
if from_name is None:
|
|
from_name = "Zulip"
|
|
if from_address is None:
|
|
from_address = FromAddress.NOREPLY
|
|
if from_address == FromAddress.tokenized_no_reply_placeholder:
|
|
from_address = FromAddress.tokenized_no_reply_address()
|
|
if from_address == FromAddress.no_reply_placeholder:
|
|
from_address = FromAddress.NOREPLY
|
|
if from_address == FromAddress.support_placeholder:
|
|
from_address = FromAddress.SUPPORT
|
|
|
|
from_email = formataddr((from_name, from_address))
|
|
reply_to = None
|
|
if reply_to_email is not None:
|
|
reply_to = [reply_to_email]
|
|
# Remove the from_name in the reply-to for noreply emails, so that users
|
|
# see "noreply@..." rather than "Zulip" or whatever the from_name is
|
|
# when they reply in their email client.
|
|
elif from_address == FromAddress.NOREPLY:
|
|
reply_to = [FromAddress.NOREPLY]
|
|
|
|
mail = EmailMultiAlternatives(email_subject, message, from_email, to_emails, reply_to=reply_to)
|
|
if html_message is not None:
|
|
mail.attach_alternative(html_message, 'text/html')
|
|
return mail
|
|
|
|
class EmailNotDeliveredException(Exception):
|
|
pass
|
|
|
|
# When changing the arguments to this function, you may need to write a
|
|
# migration to change or remove any emails in ScheduledEmail.
|
|
def send_email(template_prefix: str, to_user_ids: Optional[List[int]]=None,
|
|
to_emails: Optional[List[str]]=None, from_name: Optional[str]=None,
|
|
from_address: Optional[str]=None, reply_to_email: Optional[str]=None,
|
|
language: Optional[str]=None, context: Dict[str, Any]={}) -> None:
|
|
mail = build_email(template_prefix, to_user_ids=to_user_ids, to_emails=to_emails,
|
|
from_name=from_name, from_address=from_address,
|
|
reply_to_email=reply_to_email, language=language, context=context)
|
|
template = template_prefix.split("/")[-1]
|
|
logger.info("Sending %s email to %s" % (template, mail.to))
|
|
|
|
if mail.send() == 0:
|
|
logger.error("Error sending %s email to %s" % (template, mail.to))
|
|
raise EmailNotDeliveredException
|
|
|
|
def send_email_from_dict(email_dict: Mapping[str, Any]) -> None:
|
|
send_email(**dict(email_dict))
|
|
|
|
def send_future_email(template_prefix: str, realm: Realm, to_user_ids: Optional[List[int]]=None,
|
|
to_emails: Optional[List[str]]=None, from_name: Optional[str]=None,
|
|
from_address: Optional[str]=None, language: Optional[str]=None,
|
|
context: Dict[str, Any]={}, delay: datetime.timedelta=datetime.timedelta(0)) -> None:
|
|
template_name = template_prefix.split('/')[-1]
|
|
email_fields = {'template_prefix': template_prefix, 'from_name': from_name, 'from_address': from_address,
|
|
'language': language, 'context': context}
|
|
|
|
if settings.DEVELOPMENT_LOG_EMAILS:
|
|
send_email(template_prefix, to_user_ids=to_user_ids, to_emails=to_emails, from_name=from_name,
|
|
from_address=from_address, language=language, context=context)
|
|
# For logging the email
|
|
|
|
assert (to_user_ids is None) ^ (to_emails is None)
|
|
email = ScheduledEmail.objects.create(
|
|
type=EMAIL_TYPES[template_name],
|
|
scheduled_timestamp=timezone_now() + delay,
|
|
realm=realm,
|
|
data=ujson.dumps(email_fields))
|
|
|
|
# We store the recipients in the ScheduledEmail object itself,
|
|
# rather than the JSON data object, so that we can find and clear
|
|
# them using clear_scheduled_emails.
|
|
try:
|
|
if to_user_ids is not None:
|
|
email.users.add(*to_user_ids)
|
|
else:
|
|
assert to_emails is not None
|
|
assert(len(to_emails) == 1)
|
|
email.address = parseaddr(to_emails[0])[1]
|
|
email.save()
|
|
except Exception as e:
|
|
email.delete()
|
|
raise e
|
|
|
|
def send_email_to_admins(template_prefix: str, realm: Realm, from_name: Optional[str]=None,
|
|
from_address: Optional[str]=None, context: Dict[str, Any]={}) -> None:
|
|
admins = realm.get_human_admin_users()
|
|
admin_user_ids = [admin.id for admin in admins]
|
|
send_email(template_prefix, to_user_ids=admin_user_ids, from_name=from_name,
|
|
from_address=from_address, context=context)
|
|
|
|
def clear_scheduled_invitation_emails(email: str) -> None:
|
|
"""Unlike most scheduled emails, invitation emails don't have an
|
|
existing user object to key off of, so we filter by address here."""
|
|
items = ScheduledEmail.objects.filter(address__iexact=email,
|
|
type=ScheduledEmail.INVITATION_REMINDER)
|
|
items.delete()
|
|
|
|
def clear_scheduled_emails(user_ids: List[int], email_type: Optional[int]=None) -> None:
|
|
items = ScheduledEmail.objects.filter(users__in=user_ids).distinct()
|
|
if email_type is not None:
|
|
items = items.filter(type=email_type)
|
|
for item in items:
|
|
item.users.remove(*user_ids)
|
|
if item.users.all().count() == 0:
|
|
item.delete()
|
|
|
|
def handle_send_email_format_changes(job: Dict[str, Any]) -> None:
|
|
# Reformat any jobs that used the old to_email
|
|
# and to_user_ids argument formats.
|
|
if 'to_email' in job:
|
|
if job['to_email'] is not None:
|
|
job['to_emails'] = [job['to_email']]
|
|
del job['to_email']
|
|
if 'to_user_id' in job:
|
|
if job['to_user_id'] is not None:
|
|
job['to_user_ids'] = [job['to_user_id']]
|
|
del job['to_user_id']
|
|
|
|
def deliver_email(email: ScheduledEmail) -> None:
|
|
data = ujson.loads(email.data)
|
|
if email.users.exists():
|
|
data['to_user_ids'] = [user.id for user in email.users.all()]
|
|
if email.address is not None:
|
|
data['to_emails'] = [email.address]
|
|
handle_send_email_format_changes(data)
|
|
send_email(**data)
|
|
email.delete()
|