mypy: Migrate queue_processors.py to python3 syntax.

Forward-declarations of QueueProcessingWorker allow code order to remain
unchanged.

Note added re use of Dict vs Mapping.
This commit is contained in:
neiljp (Neil Pilgrim)
2018-03-10 07:29:46 +00:00
committed by Tim Abbott
parent 6e048c5d3f
commit fed51666d6

View File

@@ -65,10 +65,10 @@ class WorkerDeclarationException(Exception):
ConcreteQueueWorker = TypeVar('ConcreteQueueWorker', bound='QueueProcessingWorker') ConcreteQueueWorker = TypeVar('ConcreteQueueWorker', bound='QueueProcessingWorker')
def assign_queue(queue_name, enabled=True, queue_type="consumer"): def assign_queue(
# type: (str, bool, str) -> Callable[[Type[ConcreteQueueWorker]], Type[ConcreteQueueWorker]] queue_name: str, enabled: bool=True, queue_type: str="consumer"
def decorate(clazz): ) -> Callable[[Type[ConcreteQueueWorker]], Type[ConcreteQueueWorker]]:
# type: (Type[ConcreteQueueWorker]) -> Type[ConcreteQueueWorker] def decorate(clazz: Type[ConcreteQueueWorker]) -> Type[ConcreteQueueWorker]:
clazz.queue_name = queue_name clazz.queue_name = queue_name
if enabled: if enabled:
register_worker(queue_name, clazz, queue_type) register_worker(queue_name, clazz, queue_type)
@@ -77,26 +77,22 @@ def assign_queue(queue_name, enabled=True, queue_type="consumer"):
worker_classes = {} # type: Dict[str, Any] # Any here should be QueueProcessingWorker type worker_classes = {} # type: Dict[str, Any] # Any here should be QueueProcessingWorker type
queues = {} # type: Dict[str, Dict[str, Type[QueueProcessingWorker]]] queues = {} # type: Dict[str, Dict[str, Type[QueueProcessingWorker]]]
def register_worker(queue_name, clazz, queue_type): def register_worker(queue_name: str, clazz: Type['QueueProcessingWorker'], queue_type: str) -> None:
# type: (str, Type[QueueProcessingWorker], str) -> None
if queue_type not in queues: if queue_type not in queues:
queues[queue_type] = {} queues[queue_type] = {}
queues[queue_type][queue_name] = clazz queues[queue_type][queue_name] = clazz
worker_classes[queue_name] = clazz worker_classes[queue_name] = clazz
def get_worker(queue_name): def get_worker(queue_name: str) -> 'QueueProcessingWorker':
# type: (str) -> QueueProcessingWorker
return worker_classes[queue_name]() return worker_classes[queue_name]()
def get_active_worker_queues(queue_type=None): def get_active_worker_queues(queue_type: Optional[str]=None) -> List[str]:
# type: (Optional[str]) -> List[str]
"""Returns all the non-test worker queues.""" """Returns all the non-test worker queues."""
if queue_type is None: if queue_type is None:
return list(worker_classes.keys()) return list(worker_classes.keys())
return list(queues[queue_type].keys()) return list(queues[queue_type].keys())
def check_and_send_restart_signal(): def check_and_send_restart_signal() -> None:
# type: () -> None
try: try:
if not connection.is_usable(): if not connection.is_usable():
logging.warning("*** Sending self SIGUSR1 to trigger a restart.") logging.warning("*** Sending self SIGUSR1 to trigger a restart.")
@@ -104,20 +100,19 @@ def check_and_send_restart_signal():
except Exception: except Exception:
pass pass
def retry_send_email_failures(func): def retry_send_email_failures(
# type: (Callable[[Any, Dict[str, Any]], None]) -> Callable[[QueueProcessingWorker, Dict[str, Any]], None] func: Callable[[Any, Dict[str, Any]], None]
) -> Callable[['QueueProcessingWorker', Dict[str, Any]], None]:
# If we don't use cast() and use QueueProcessingWorker instead of Any in # If we don't use cast() and use QueueProcessingWorker instead of Any in
# function type annotation then mypy complains. # function type annotation then mypy complains.
func = cast(Callable[[QueueProcessingWorker, Dict[str, Any]], None], func) func = cast(Callable[[QueueProcessingWorker, Dict[str, Any]], None], func)
@wraps(func) @wraps(func)
def wrapper(worker, data): def wrapper(worker: 'QueueProcessingWorker', data: Dict[str, Any]) -> None:
# type: (QueueProcessingWorker, Dict[str, Any]) -> None
try: try:
func(worker, data) func(worker, data)
except (smtplib.SMTPServerDisconnected, socket.gaierror, EmailNotDeliveredException): except (smtplib.SMTPServerDisconnected, socket.gaierror, EmailNotDeliveredException):
def on_failure(event): def on_failure(event: Dict[str, Any]) -> None:
# type: (Dict[str, Any]) -> None
logging.exception("Event {} failed".format(event)) logging.exception("Event {} failed".format(event))
retry_event(worker.queue_name, data, on_failure) retry_event(worker.queue_name, data, on_failure)
@@ -127,18 +122,15 @@ def retry_send_email_failures(func):
class QueueProcessingWorker: class QueueProcessingWorker:
queue_name = None # type: str queue_name = None # type: str
def __init__(self): def __init__(self) -> None:
# type: () -> None
self.q = None # type: SimpleQueueClient self.q = None # type: SimpleQueueClient
if self.queue_name is None: if self.queue_name is None:
raise WorkerDeclarationException("Queue worker declared without queue_name") raise WorkerDeclarationException("Queue worker declared without queue_name")
def consume(self, data): def consume(self, data: Dict[str, Any]) -> None:
# type: (Dict[str, Any]) -> None
raise WorkerDeclarationException("No consumer defined!") raise WorkerDeclarationException("No consumer defined!")
def consume_wrapper(self, data): def consume_wrapper(self, data: Dict[str, Any]) -> None:
# type: (Dict[str, Any]) -> None
try: try:
self.consume(data) self.consume(data)
except Exception: except Exception:
@@ -156,21 +148,17 @@ class QueueProcessingWorker:
finally: finally:
reset_queries() reset_queries()
def _log_problem(self): def _log_problem(self) -> None:
# type: () -> None
logging.exception("Problem handling data on queue %s" % (self.queue_name,)) logging.exception("Problem handling data on queue %s" % (self.queue_name,))
def setup(self): def setup(self) -> None:
# type: () -> None
self.q = SimpleQueueClient() self.q = SimpleQueueClient()
def start(self): def start(self) -> None:
# type: () -> None
self.q.register_json_consumer(self.queue_name, self.consume_wrapper) self.q.register_json_consumer(self.queue_name, self.consume_wrapper)
self.q.start_consuming() self.q.start_consuming()
def stop(self): # nocoverage def stop(self) -> None: # nocoverage
# type: () -> None
self.q.stop_consuming() self.q.stop_consuming()
class LoopQueueProcessingWorker(QueueProcessingWorker): class LoopQueueProcessingWorker(QueueProcessingWorker):
@@ -195,8 +183,8 @@ class LoopQueueProcessingWorker(QueueProcessingWorker):
@assign_queue('signups') @assign_queue('signups')
class SignupWorker(QueueProcessingWorker): class SignupWorker(QueueProcessingWorker):
def consume(self, data): def consume(self, data: Dict[str, Any]) -> None:
# type: (Dict[str, Any]) -> None # TODO: This is the only implementation with Dict cf Mapping; should we simplify?
user_profile = get_user_profile_by_id(data['user_id']) user_profile = get_user_profile_by_id(data['user_id'])
logging.info("Processing signup for user %s in realm %s" % ( logging.info("Processing signup for user %s in realm %s" % (
user_profile.email, user_profile.realm.string_id)) user_profile.email, user_profile.realm.string_id))
@@ -218,9 +206,7 @@ class SignupWorker(QueueProcessingWorker):
@assign_queue('invites') @assign_queue('invites')
class ConfirmationEmailWorker(QueueProcessingWorker): class ConfirmationEmailWorker(QueueProcessingWorker):
def consume(self, data): def consume(self, data: Mapping[str, Any]) -> None:
# type: (Mapping[str, Any]) -> None
if "email" in data: if "email" in data:
# When upgrading from a version up through 1.7.1, there may be # When upgrading from a version up through 1.7.1, there may be
# existing items in the queue with `email` instead of `prereg_id`. # existing items in the queue with `email` instead of `prereg_id`.
@@ -255,8 +241,7 @@ class ConfirmationEmailWorker(QueueProcessingWorker):
@assign_queue('user_activity') @assign_queue('user_activity')
class UserActivityWorker(QueueProcessingWorker): class UserActivityWorker(QueueProcessingWorker):
def consume(self, event): def consume(self, event: Mapping[str, Any]) -> None:
# type: (Mapping[str, Any]) -> None
user_profile = get_user_profile_by_id(event["user_profile_id"]) user_profile = get_user_profile_by_id(event["user_profile_id"])
client = get_client(event["client"]) client = get_client(event["client"])
log_time = timestamp_to_datetime(event["time"]) log_time = timestamp_to_datetime(event["time"])
@@ -265,16 +250,14 @@ class UserActivityWorker(QueueProcessingWorker):
@assign_queue('user_activity_interval') @assign_queue('user_activity_interval')
class UserActivityIntervalWorker(QueueProcessingWorker): class UserActivityIntervalWorker(QueueProcessingWorker):
def consume(self, event): def consume(self, event: Mapping[str, Any]) -> None:
# type: (Mapping[str, Any]) -> None
user_profile = get_user_profile_by_id(event["user_profile_id"]) user_profile = get_user_profile_by_id(event["user_profile_id"])
log_time = timestamp_to_datetime(event["time"]) log_time = timestamp_to_datetime(event["time"])
do_update_user_activity_interval(user_profile, log_time) do_update_user_activity_interval(user_profile, log_time)
@assign_queue('user_presence') @assign_queue('user_presence')
class UserPresenceWorker(QueueProcessingWorker): class UserPresenceWorker(QueueProcessingWorker):
def consume(self, event): def consume(self, event: Mapping[str, Any]) -> None:
# type: (Mapping[str, Any]) -> None
logging.debug("Received presence event: %s" % (event),) logging.debug("Received presence event: %s" % (event),)
user_profile = get_user_profile_by_id(event["user_profile_id"]) user_profile = get_user_profile_by_id(event["user_profile_id"])
client = get_client(event["client"]) client = get_client(event["client"])
@@ -325,22 +308,19 @@ class MissedMessageSendingWorker(EmailSendingWorker): # nocoverage
@assign_queue('missedmessage_mobile_notifications') @assign_queue('missedmessage_mobile_notifications')
class PushNotificationsWorker(QueueProcessingWorker): # nocoverage class PushNotificationsWorker(QueueProcessingWorker): # nocoverage
def consume(self, data): def consume(self, data: Mapping[str, Any]) -> None:
# type: (Mapping[str, Any]) -> None
handle_push_notification(data['user_profile_id'], data) handle_push_notification(data['user_profile_id'], data)
# We probably could stop running this queue worker at all if ENABLE_FEEDBACK is False # We probably could stop running this queue worker at all if ENABLE_FEEDBACK is False
@assign_queue('feedback_messages') @assign_queue('feedback_messages')
class FeedbackBot(QueueProcessingWorker): class FeedbackBot(QueueProcessingWorker):
def consume(self, event): def consume(self, event: Mapping[str, Any]) -> None:
# type: (Mapping[str, Any]) -> None
logging.info("Received feedback from %s" % (event["sender_email"],)) logging.info("Received feedback from %s" % (event["sender_email"],))
handle_feedback(event) handle_feedback(event)
@assign_queue('error_reports') @assign_queue('error_reports')
class ErrorReporter(QueueProcessingWorker): class ErrorReporter(QueueProcessingWorker):
def consume(self, event): def consume(self, event: Mapping[str, Any]) -> None:
# type: (Mapping[str, Any]) -> None
logging.info("Processing traceback with type %s for %s" % (event['type'], event.get('user_email'))) logging.info("Processing traceback with type %s for %s" % (event['type'], event.get('user_email')))
if settings.ERROR_REPORTING: if settings.ERROR_REPORTING:
do_report_error(event['report']['host'], event['type'], event['report']) do_report_error(event['report']['host'], event['type'], event['report'])
@@ -350,8 +330,7 @@ class SlowQueryWorker(LoopQueueProcessingWorker):
# Sleep 1 minute between checking the queue # Sleep 1 minute between checking the queue
sleep_delay = 60 * 1 sleep_delay = 60 * 1
def consume_batch(self, slow_queries): def consume_batch(self, slow_queries: List[Dict[str, Any]]) -> None:
# type: (List[Dict[str, Any]]) -> None
for query in slow_queries: for query in slow_queries:
logging.info("Slow query: %s" % (query)) logging.info("Slow query: %s" % (query))
@@ -371,15 +350,13 @@ class SlowQueryWorker(LoopQueueProcessingWorker):
@assign_queue("message_sender") @assign_queue("message_sender")
class MessageSenderWorker(QueueProcessingWorker): class MessageSenderWorker(QueueProcessingWorker):
def __init__(self): def __init__(self) -> None:
# type: () -> None
super().__init__() super().__init__()
self.redis_client = get_redis_client() self.redis_client = get_redis_client()
self.handler = BaseHandler() self.handler = BaseHandler()
self.handler.load_middleware() self.handler.load_middleware()
def consume(self, event): def consume(self, event: Mapping[str, Any]) -> None:
# type: (Mapping[str, Any]) -> None
server_meta = event['server_meta'] server_meta = event['server_meta']
environ = { environ = {
@@ -435,8 +412,7 @@ class MessageSenderWorker(QueueProcessingWorker):
class DigestWorker(QueueProcessingWorker): # nocoverage class DigestWorker(QueueProcessingWorker): # nocoverage
# Who gets a digest is entirely determined by the enqueue_digest_emails # Who gets a digest is entirely determined by the enqueue_digest_emails
# management command, not here. # management command, not here.
def consume(self, event): def consume(self, event: Mapping[str, Any]) -> None:
# type: (Mapping[str, Any]) -> None
logging.info("Received digest event: %s" % (event,)) logging.info("Received digest event: %s" % (event,))
handle_digest_email(event["user_profile_id"], event["cutoff"]) handle_digest_email(event["user_profile_id"], event["cutoff"])
@@ -444,8 +420,7 @@ class DigestWorker(QueueProcessingWorker): # nocoverage
class MirrorWorker(QueueProcessingWorker): class MirrorWorker(QueueProcessingWorker):
# who gets a digest is entirely determined by the enqueue_digest_emails # who gets a digest is entirely determined by the enqueue_digest_emails
# management command, not here. # management command, not here.
def consume(self, event): def consume(self, event: Mapping[str, Any]) -> None:
# type: (Mapping[str, Any]) -> None
message = force_str(event["message"]) message = force_str(event["message"])
mirror_email(email.message_from_string(message), mirror_email(email.message_from_string(message),
rcpt_to=event["rcpt_to"], pre_checked=True) rcpt_to=event["rcpt_to"], pre_checked=True)
@@ -456,8 +431,7 @@ class TestWorker(QueueProcessingWorker):
# creating significant side effects. It can be useful in development or # creating significant side effects. It can be useful in development or
# for troubleshooting prod/staging. It pulls a message off the test queue # for troubleshooting prod/staging. It pulls a message off the test queue
# and appends it to a file in /tmp. # and appends it to a file in /tmp.
def consume(self, event): # nocoverage def consume(self, event: Mapping[str, Any]) -> None: # nocoverage
# type: (Mapping[str, Any]) -> None
fn = settings.ZULIP_WORKER_TEST_FILE fn = settings.ZULIP_WORKER_TEST_FILE
message = ujson.dumps(event) message = ujson.dumps(event)
logging.info("TestWorker should append this message to %s: %s" % (fn, message)) logging.info("TestWorker should append this message to %s: %s" % (fn, message))
@@ -466,8 +440,7 @@ class TestWorker(QueueProcessingWorker):
@assign_queue('embed_links') @assign_queue('embed_links')
class FetchLinksEmbedData(QueueProcessingWorker): class FetchLinksEmbedData(QueueProcessingWorker):
def consume(self, event): def consume(self, event: Mapping[str, Any]) -> None:
# type: (Mapping[str, Any]) -> None
for url in event['urls']: for url in event['urls']:
url_preview.get_link_embed_data(url) url_preview.get_link_embed_data(url)
@@ -496,8 +469,7 @@ class FetchLinksEmbedData(QueueProcessingWorker):
@assign_queue('outgoing_webhooks') @assign_queue('outgoing_webhooks')
class OutgoingWebhookWorker(QueueProcessingWorker): class OutgoingWebhookWorker(QueueProcessingWorker):
def consume(self, event): def consume(self, event: Mapping[str, Any]) -> None:
# type: (Mapping[str, Any]) -> None
message = event['message'] message = event['message']
dup_event = cast(Dict[str, Any], event) dup_event = cast(Dict[str, Any], event)
dup_event['command'] = message['content'] dup_event['command'] = message['content']
@@ -512,12 +484,10 @@ class OutgoingWebhookWorker(QueueProcessingWorker):
@assign_queue('embedded_bots') @assign_queue('embedded_bots')
class EmbeddedBotWorker(QueueProcessingWorker): class EmbeddedBotWorker(QueueProcessingWorker):
def get_bot_api_client(self, user_profile): def get_bot_api_client(self, user_profile: UserProfile) -> EmbeddedBotHandler:
# type: (UserProfile) -> EmbeddedBotHandler
return EmbeddedBotHandler(user_profile) return EmbeddedBotHandler(user_profile)
def consume(self, event): def consume(self, event: Mapping[str, Any]) -> None:
# type: (Mapping[str, Any]) -> None
user_profile_id = event['user_profile_id'] user_profile_id = event['user_profile_id']
user_profile = get_user_profile_by_id(user_profile_id) user_profile = get_user_profile_by_id(user_profile_id)