mirror of
https://github.com/zulip/zulip.git
synced 2025-11-13 18:36:36 +00:00
Recent changes to the API removed Client.register(), and this change restores the correct API call, although the codepath this affects is probably ready for eventual deprecation.
424 lines
17 KiB
Python
424 lines
17 KiB
Python
from __future__ import absolute_import
|
|
from typing import Any, Callable, Mapping
|
|
|
|
from django.conf import settings
|
|
from django.core.handlers.wsgi import WSGIRequest
|
|
from django.core.handlers.base import BaseHandler
|
|
from zerver.models import get_user_profile_by_email, \
|
|
get_user_profile_by_id, get_prereg_user_by_email, get_client, \
|
|
UserMessage, Message
|
|
from zerver.lib.context_managers import lockfile
|
|
from zerver.lib.queue import SimpleQueueClient, queue_json_publish
|
|
from zerver.lib.timestamp import timestamp_to_datetime
|
|
from zerver.lib.notifications import handle_missedmessage_emails, enqueue_welcome_emails, \
|
|
clear_followup_emails_queue, send_local_email_template_with_delay
|
|
from zerver.lib.actions import do_send_confirmation_email, \
|
|
do_update_user_activity, do_update_user_activity_interval, do_update_user_presence, \
|
|
internal_send_message, check_send_message, extract_recipients, \
|
|
handle_push_notification, render_incoming_message, do_update_embedded_data
|
|
from zerver.lib.url_preview import preview as url_preview
|
|
from zerver.lib.digest import handle_digest_email
|
|
from zerver.lib.email_mirror import process_message as mirror_email
|
|
from zerver.decorator import JsonableError
|
|
from zerver.tornado.socket import req_redis_key
|
|
from confirmation.models import Confirmation
|
|
from zerver.lib.db import reset_queries
|
|
from django.core.mail import EmailMessage
|
|
from zerver.lib.redis_utils import get_redis_client
|
|
from zerver.context_processors import common_context
|
|
|
|
import os
|
|
import sys
|
|
import ujson
|
|
from collections import defaultdict
|
|
import email
|
|
import time
|
|
import datetime
|
|
import logging
|
|
import simplejson
|
|
from six.moves import cStringIO as StringIO
|
|
|
|
class WorkerDeclarationException(Exception):
|
|
pass
|
|
|
|
def assign_queue(queue_name, enabled=True):
|
|
# type: (str, bool) -> Callable[[QueueProcessingWorker], QueueProcessingWorker]
|
|
def decorate(clazz):
|
|
# type: (QueueProcessingWorker) -> QueueProcessingWorker
|
|
clazz.queue_name = queue_name
|
|
if enabled:
|
|
register_worker(queue_name, clazz)
|
|
return clazz
|
|
return decorate
|
|
|
|
worker_classes = {} # type: Dict[str, Any] # Any here should be QueueProcessingWorker type
|
|
def register_worker(queue_name, clazz):
|
|
# type: (str, QueueProcessingWorker) -> None
|
|
worker_classes[queue_name] = clazz
|
|
|
|
def get_worker(queue_name):
|
|
# type: (str) -> QueueProcessingWorker
|
|
return worker_classes[queue_name]()
|
|
|
|
def get_active_worker_queues():
|
|
# type: () -> List[str]
|
|
return list(worker_classes.keys())
|
|
|
|
class QueueProcessingWorker(object):
|
|
queue_name = None # type: str
|
|
|
|
def __init__(self):
|
|
# type: () -> None
|
|
self.q = None # type: SimpleQueueClient
|
|
if self.queue_name is None:
|
|
raise WorkerDeclarationException("Queue worker declared without queue_name")
|
|
|
|
def consume(self, data):
|
|
# type: (Mapping[str, Any]) -> None
|
|
raise WorkerDeclarationException("No consumer defined!")
|
|
|
|
def consume_wrapper(self, data):
|
|
# type: (Mapping[str, Any]) -> None
|
|
try:
|
|
self.consume(data)
|
|
except Exception:
|
|
self._log_problem()
|
|
if not os.path.exists(settings.QUEUE_ERROR_DIR):
|
|
os.mkdir(settings.QUEUE_ERROR_DIR)
|
|
fname = '%s.errors' % (self.queue_name,)
|
|
fn = os.path.join(settings.QUEUE_ERROR_DIR, fname)
|
|
line = u'%s\t%s\n' % (time.asctime(), ujson.dumps(data))
|
|
lock_fn = fn + '.lock'
|
|
with lockfile(lock_fn):
|
|
with open(fn, 'ab') as f:
|
|
f.write(line.encode('utf-8'))
|
|
reset_queries()
|
|
|
|
def _log_problem(self):
|
|
# type: () -> None
|
|
logging.exception("Problem handling data on queue %s" % (self.queue_name,))
|
|
|
|
def setup(self):
|
|
# type: () -> None
|
|
self.q = SimpleQueueClient()
|
|
|
|
def start(self):
|
|
# type: () -> None
|
|
self.q.register_json_consumer(self.queue_name, self.consume_wrapper)
|
|
self.q.start_consuming()
|
|
|
|
def stop(self):
|
|
# type: () -> None
|
|
self.q.stop_consuming()
|
|
|
|
if settings.MAILCHIMP_API_KEY:
|
|
from postmonkey import PostMonkey, MailChimpException
|
|
|
|
@assign_queue('signups')
|
|
class SignupWorker(QueueProcessingWorker):
|
|
def __init__(self):
|
|
# type: () -> None
|
|
super(SignupWorker, self).__init__()
|
|
if settings.MAILCHIMP_API_KEY:
|
|
self.pm = PostMonkey(settings.MAILCHIMP_API_KEY, timeout=10)
|
|
|
|
def consume(self, data):
|
|
# type: (Mapping[str, Any]) -> None
|
|
merge_vars = data['merge_vars']
|
|
# This should clear out any invitation reminder emails
|
|
clear_followup_emails_queue(data["EMAIL"])
|
|
if settings.MAILCHIMP_API_KEY and settings.PRODUCTION:
|
|
try:
|
|
self.pm.listSubscribe(
|
|
id=settings.ZULIP_FRIENDS_LIST_ID,
|
|
email_address=data['EMAIL'],
|
|
merge_vars=merge_vars,
|
|
double_optin=False,
|
|
send_welcome=False)
|
|
except MailChimpException as e:
|
|
if e.code == 214:
|
|
logging.warning("Attempted to sign up already existing email to list: %s" % (data['EMAIL'],))
|
|
else:
|
|
raise e
|
|
|
|
email = data.get("EMAIL")
|
|
name = merge_vars.get("NAME")
|
|
enqueue_welcome_emails(email, name)
|
|
|
|
@assign_queue('invites')
|
|
class ConfirmationEmailWorker(QueueProcessingWorker):
|
|
def consume(self, data):
|
|
# type: (Mapping[str, Any]) -> None
|
|
invitee = get_prereg_user_by_email(data["email"])
|
|
referrer = get_user_profile_by_email(data["referrer_email"])
|
|
do_send_confirmation_email(invitee, referrer)
|
|
|
|
# queue invitation reminder for two days from now.
|
|
link = Confirmation.objects.get_link_for_object(invitee, host=referrer.realm.host)
|
|
context = common_context(referrer)
|
|
context.update({
|
|
'activate_url': link,
|
|
'referrer': referrer,
|
|
'verbose_support_offers': settings.VERBOSE_SUPPORT_OFFERS,
|
|
'support_email': settings.ZULIP_ADMINISTRATOR
|
|
})
|
|
send_local_email_template_with_delay(
|
|
[{'email': data["email"], 'name': ""}],
|
|
"zerver/emails/invitation/invitation_reminder_email",
|
|
context,
|
|
datetime.timedelta(days=2),
|
|
tags=["invitation-reminders"],
|
|
sender={'email': settings.ZULIP_ADMINISTRATOR, 'name': 'Zulip'})
|
|
|
|
@assign_queue('user_activity')
|
|
class UserActivityWorker(QueueProcessingWorker):
|
|
def consume(self, event):
|
|
# type: (Mapping[str, Any]) -> None
|
|
user_profile = get_user_profile_by_id(event["user_profile_id"])
|
|
client = get_client(event["client"])
|
|
log_time = timestamp_to_datetime(event["time"])
|
|
query = event["query"]
|
|
do_update_user_activity(user_profile, client, query, log_time)
|
|
|
|
@assign_queue('user_activity_interval')
|
|
class UserActivityIntervalWorker(QueueProcessingWorker):
|
|
def consume(self, event):
|
|
# type: (Mapping[str, Any]) -> None
|
|
user_profile = get_user_profile_by_id(event["user_profile_id"])
|
|
log_time = timestamp_to_datetime(event["time"])
|
|
do_update_user_activity_interval(user_profile, log_time)
|
|
|
|
@assign_queue('user_presence')
|
|
class UserPresenceWorker(QueueProcessingWorker):
|
|
def consume(self, event):
|
|
# type: (Mapping[str, Any]) -> None
|
|
logging.info("Received event: %s" % (event),)
|
|
user_profile = get_user_profile_by_id(event["user_profile_id"])
|
|
client = get_client(event["client"])
|
|
log_time = timestamp_to_datetime(event["time"])
|
|
status = event["status"]
|
|
do_update_user_presence(user_profile, client, log_time, status)
|
|
|
|
@assign_queue('missedmessage_emails')
|
|
class MissedMessageWorker(QueueProcessingWorker):
|
|
def start(self):
|
|
# type: () -> None
|
|
while True:
|
|
missed_events = self.q.drain_queue("missedmessage_emails", json=True)
|
|
by_recipient = defaultdict(list) # type: Dict[int, List[Dict[str, Any]]]
|
|
|
|
for event in missed_events:
|
|
logging.info("Received event: %s" % (event,))
|
|
by_recipient[event['user_profile_id']].append(event)
|
|
|
|
for user_profile_id, events in by_recipient.items():
|
|
handle_missedmessage_emails(user_profile_id, events)
|
|
|
|
reset_queries()
|
|
# Aggregate all messages received every 2 minutes to let someone finish sending a batch
|
|
# of messages
|
|
time.sleep(2 * 60)
|
|
|
|
@assign_queue('missedmessage_mobile_notifications')
|
|
class PushNotificationsWorker(QueueProcessingWorker):
|
|
def consume(self, data):
|
|
# type: (Mapping[str, Any]) -> None
|
|
handle_push_notification(data['user_profile_id'], data)
|
|
|
|
def make_feedback_client():
|
|
# type: () -> Any # Should be zulip.Client, but not necessarily importable
|
|
sys.path.append(os.path.join(os.path.dirname(__file__), '../../api'))
|
|
import zulip
|
|
return zulip.Client(
|
|
client="ZulipFeedback/0.1",
|
|
email=settings.DEPLOYMENT_ROLE_NAME,
|
|
api_key=settings.DEPLOYMENT_ROLE_KEY,
|
|
verbose=True,
|
|
site=settings.FEEDBACK_TARGET)
|
|
|
|
# We probably could stop running this queue worker at all if ENABLE_FEEDBACK is False
|
|
@assign_queue('feedback_messages')
|
|
class FeedbackBot(QueueProcessingWorker):
|
|
def start(self):
|
|
# type: () -> None
|
|
if settings.ENABLE_FEEDBACK and settings.FEEDBACK_EMAIL is None:
|
|
self.staging_client = make_feedback_client()
|
|
QueueProcessingWorker.start(self)
|
|
|
|
def consume(self, event):
|
|
# type: (Mapping[str, Any]) -> None
|
|
if not settings.ENABLE_FEEDBACK:
|
|
return
|
|
if settings.FEEDBACK_EMAIL is not None:
|
|
to_email = settings.FEEDBACK_EMAIL
|
|
subject = "Zulip feedback from %s" % (event["sender_email"],)
|
|
content = event["content"]
|
|
from_email = '"%s" <%s>' % (event["sender_full_name"], settings.ZULIP_ADMINISTRATOR)
|
|
headers = {'Reply-To': '"%s" <%s>' % (event["sender_full_name"], event["sender_email"])}
|
|
msg = EmailMessage(subject, content, from_email, [to_email], headers=headers)
|
|
msg.send()
|
|
else:
|
|
# This code has been untested with the new API, and
|
|
# the endpoint it hits also uses a home-grown ticketing
|
|
# system that was from early days of Zulip, pre-open-source.
|
|
self.staging_client.call_endpoint(
|
|
method='POST',
|
|
url='deployments/feedback',
|
|
request=dict(message=simplejson.dumps(event))
|
|
)
|
|
|
|
@assign_queue('error_reports')
|
|
class ErrorReporter(QueueProcessingWorker):
|
|
def start(self):
|
|
# type: () -> None
|
|
if settings.DEPLOYMENT_ROLE_KEY:
|
|
self.staging_client = make_feedback_client()
|
|
self.staging_client._register(
|
|
'forward_error',
|
|
method='POST',
|
|
url='deployments/report_error',
|
|
make_request=(lambda type, report: {'type': type, 'report': simplejson.dumps(report)}),
|
|
)
|
|
QueueProcessingWorker.start(self)
|
|
|
|
def consume(self, event):
|
|
# type: (Mapping[str, Any]) -> None
|
|
if settings.DEPLOYMENT_ROLE_KEY:
|
|
self.staging_client.forward_error(event['type'], event['report'])
|
|
elif settings.ZILENCER_ENABLED:
|
|
from zilencer.views import do_report_error
|
|
do_report_error(settings.DEPLOYMENT_ROLE_NAME, event['type'], event['report'])
|
|
|
|
@assign_queue('slow_queries')
|
|
class SlowQueryWorker(QueueProcessingWorker):
|
|
def start(self):
|
|
# type: () -> None
|
|
while True:
|
|
self.process_one_batch()
|
|
# Aggregate all slow query messages in 1-minute chunks to avoid message spam
|
|
time.sleep(1 * 60)
|
|
|
|
def process_one_batch(self):
|
|
# type: () -> None
|
|
slow_queries = self.q.drain_queue("slow_queries", json=True)
|
|
|
|
if settings.ERROR_BOT is None:
|
|
return
|
|
|
|
if len(slow_queries) > 0:
|
|
topic = "%s: slow queries" % (settings.EXTERNAL_HOST,)
|
|
|
|
content = ""
|
|
for query in slow_queries:
|
|
content += " %s\n" % (query,)
|
|
|
|
internal_send_message(settings.ERROR_BOT, "stream", "logs", topic, content)
|
|
|
|
reset_queries()
|
|
|
|
@assign_queue("message_sender")
|
|
class MessageSenderWorker(QueueProcessingWorker):
|
|
def __init__(self):
|
|
# type: () -> None
|
|
super(MessageSenderWorker, self).__init__()
|
|
self.redis_client = get_redis_client()
|
|
self.handler = BaseHandler()
|
|
self.handler.load_middleware()
|
|
|
|
def consume(self, event):
|
|
# type: (Mapping[str, Any]) -> None
|
|
server_meta = event['server_meta']
|
|
|
|
environ = {'REQUEST_METHOD': 'SOCKET',
|
|
'SCRIPT_NAME': '',
|
|
'PATH_INFO': '/json/messages',
|
|
'SERVER_NAME': '127.0.0.1',
|
|
'SERVER_PORT': 9993,
|
|
'SERVER_PROTOCOL': 'ZULIP_SOCKET/1.0',
|
|
'wsgi.version': (1, 0),
|
|
'wsgi.input': StringIO(),
|
|
'wsgi.errors': sys.stderr,
|
|
'wsgi.multithread': False,
|
|
'wsgi.multiprocess': True,
|
|
'wsgi.run_once': False,
|
|
'zulip.emulated_method': 'POST'}
|
|
# We're mostly using a WSGIRequest for convenience
|
|
environ.update(server_meta['request_environ'])
|
|
request = WSGIRequest(environ)
|
|
# Note: If we ever support non-POST methods, we'll need to change this.
|
|
request._post = event['request']
|
|
request.csrf_processing_done = True
|
|
|
|
user_profile = get_user_profile_by_id(server_meta['user_id'])
|
|
request._cached_user = user_profile
|
|
|
|
resp = self.handler.get_response(request)
|
|
server_meta['time_request_finished'] = time.time()
|
|
server_meta['worker_log_data'] = request._log_data
|
|
|
|
resp_content = resp.content.decode('utf-8')
|
|
result = {'response': ujson.loads(resp_content), 'req_id': event['req_id'],
|
|
'server_meta': server_meta}
|
|
|
|
redis_key = req_redis_key(event['req_id'])
|
|
self.redis_client.hmset(redis_key, {'status': 'complete',
|
|
'response': resp_content})
|
|
|
|
queue_json_publish(server_meta['return_queue'], result, lambda e: None)
|
|
|
|
@assign_queue('digest_emails')
|
|
class DigestWorker(QueueProcessingWorker):
|
|
# Who gets a digest is entirely determined by the enqueue_digest_emails
|
|
# management command, not here.
|
|
def consume(self, event):
|
|
# type: (Mapping[str, Any]) -> None
|
|
logging.info("Received digest event: %s" % (event,))
|
|
handle_digest_email(event["user_profile_id"], event["cutoff"])
|
|
|
|
@assign_queue('email_mirror')
|
|
class MirrorWorker(QueueProcessingWorker):
|
|
# who gets a digest is entirely determined by the enqueue_digest_emails
|
|
# management command, not here.
|
|
def consume(self, event):
|
|
# type: (Mapping[str, Any]) -> None
|
|
mirror_email(email.message_from_string(event["message"]),
|
|
rcpt_to=event["rcpt_to"], pre_checked=True)
|
|
|
|
@assign_queue('test')
|
|
class TestWorker(QueueProcessingWorker):
|
|
# This worker allows you to test the queue worker infrastructure without
|
|
# creating significant side effects. It can be useful in development or
|
|
# for troubleshooting prod/staging. It pulls a message off the test queue
|
|
# and appends it to a file in /tmp.
|
|
def consume(self, event):
|
|
# type: (Mapping[str, Any]) -> None
|
|
fn = settings.ZULIP_WORKER_TEST_FILE
|
|
message = ujson.dumps(event)
|
|
logging.info("TestWorker should append this message to %s: %s" % (fn, message))
|
|
with open(fn, 'a') as f:
|
|
f.write(message + '\n')
|
|
|
|
@assign_queue('embed_links')
|
|
class FetchLinksEmbedData(QueueProcessingWorker):
|
|
def consume(self, event):
|
|
# type: (Mapping[str, Any]) -> None
|
|
for url in event['urls']:
|
|
url_preview.get_link_embed_data(url)
|
|
|
|
message = Message.objects.get(id=event['message_id'])
|
|
# If the message changed, we will run this task after updating the message
|
|
# in zerver.views.messages.update_message_backend
|
|
if message.content != event['message_content']:
|
|
return
|
|
if message.content is not None:
|
|
ums = UserMessage.objects.filter(
|
|
message=message.id).select_related("user_profile")
|
|
message_users = {um.user_profile for um in ums}
|
|
# If rendering fails, the called code will raise a JsonableError.
|
|
rendered_content = render_incoming_message(
|
|
message,
|
|
content=message.content,
|
|
message_users=message_users)
|
|
do_update_embedded_data(
|
|
message.sender, message, message.content, rendered_content)
|