From fed51666d6dcd524ff8d016509daf9ef1d1134b5 Mon Sep 17 00:00:00 2001 From: "neiljp (Neil Pilgrim)" Date: Sat, 10 Mar 2018 07:29:46 +0000 Subject: [PATCH] mypy: Migrate queue_processors.py to python3 syntax. Forward-declarations of QueueProcessingWorker allow code order to remain unchanged. Note added re use of Dict vs Mapping. --- zerver/worker/queue_processors.py | 108 +++++++++++------------------- 1 file changed, 39 insertions(+), 69 deletions(-) diff --git a/zerver/worker/queue_processors.py b/zerver/worker/queue_processors.py index 9e2331897c..fa7048debf 100644 --- a/zerver/worker/queue_processors.py +++ b/zerver/worker/queue_processors.py @@ -65,10 +65,10 @@ class WorkerDeclarationException(Exception): ConcreteQueueWorker = TypeVar('ConcreteQueueWorker', bound='QueueProcessingWorker') -def assign_queue(queue_name, enabled=True, queue_type="consumer"): - # type: (str, bool, str) -> Callable[[Type[ConcreteQueueWorker]], Type[ConcreteQueueWorker]] - def decorate(clazz): - # type: (Type[ConcreteQueueWorker]) -> Type[ConcreteQueueWorker] +def assign_queue( + queue_name: str, enabled: bool=True, queue_type: str="consumer" +) -> Callable[[Type[ConcreteQueueWorker]], Type[ConcreteQueueWorker]]: + def decorate(clazz: Type[ConcreteQueueWorker]) -> Type[ConcreteQueueWorker]: clazz.queue_name = queue_name if enabled: register_worker(queue_name, clazz, queue_type) @@ -77,26 +77,22 @@ def assign_queue(queue_name, enabled=True, queue_type="consumer"): worker_classes = {} # type: Dict[str, Any] # Any here should be QueueProcessingWorker type queues = {} # type: Dict[str, Dict[str, Type[QueueProcessingWorker]]] -def register_worker(queue_name, clazz, queue_type): - # type: (str, Type[QueueProcessingWorker], str) -> None +def register_worker(queue_name: str, clazz: Type['QueueProcessingWorker'], queue_type: str) -> None: if queue_type not in queues: queues[queue_type] = {} queues[queue_type][queue_name] = clazz worker_classes[queue_name] = clazz -def get_worker(queue_name): - # type: (str) -> QueueProcessingWorker +def get_worker(queue_name: str) -> 'QueueProcessingWorker': return worker_classes[queue_name]() -def get_active_worker_queues(queue_type=None): - # type: (Optional[str]) -> List[str] +def get_active_worker_queues(queue_type: Optional[str]=None) -> List[str]: """Returns all the non-test worker queues.""" if queue_type is None: return list(worker_classes.keys()) return list(queues[queue_type].keys()) -def check_and_send_restart_signal(): - # type: () -> None +def check_and_send_restart_signal() -> None: try: if not connection.is_usable(): logging.warning("*** Sending self SIGUSR1 to trigger a restart.") @@ -104,20 +100,19 @@ def check_and_send_restart_signal(): except Exception: pass -def retry_send_email_failures(func): - # type: (Callable[[Any, Dict[str, Any]], None]) -> Callable[[QueueProcessingWorker, Dict[str, Any]], None] +def retry_send_email_failures( + func: Callable[[Any, Dict[str, Any]], None] +) -> Callable[['QueueProcessingWorker', Dict[str, Any]], None]: # If we don't use cast() and use QueueProcessingWorker instead of Any in # function type annotation then mypy complains. func = cast(Callable[[QueueProcessingWorker, Dict[str, Any]], None], func) @wraps(func) - def wrapper(worker, data): - # type: (QueueProcessingWorker, Dict[str, Any]) -> None + def wrapper(worker: 'QueueProcessingWorker', data: Dict[str, Any]) -> None: try: func(worker, data) except (smtplib.SMTPServerDisconnected, socket.gaierror, EmailNotDeliveredException): - def on_failure(event): - # type: (Dict[str, Any]) -> None + def on_failure(event: Dict[str, Any]) -> None: logging.exception("Event {} failed".format(event)) retry_event(worker.queue_name, data, on_failure) @@ -127,18 +122,15 @@ def retry_send_email_failures(func): class QueueProcessingWorker: queue_name = None # type: str - def __init__(self): - # type: () -> None + def __init__(self) -> None: self.q = None # type: SimpleQueueClient if self.queue_name is None: raise WorkerDeclarationException("Queue worker declared without queue_name") - def consume(self, data): - # type: (Dict[str, Any]) -> None + def consume(self, data: Dict[str, Any]) -> None: raise WorkerDeclarationException("No consumer defined!") - def consume_wrapper(self, data): - # type: (Dict[str, Any]) -> None + def consume_wrapper(self, data: Dict[str, Any]) -> None: try: self.consume(data) except Exception: @@ -156,21 +148,17 @@ class QueueProcessingWorker: finally: reset_queries() - def _log_problem(self): - # type: () -> None + def _log_problem(self) -> None: logging.exception("Problem handling data on queue %s" % (self.queue_name,)) - def setup(self): - # type: () -> None + def setup(self) -> None: self.q = SimpleQueueClient() - def start(self): - # type: () -> None + def start(self) -> None: self.q.register_json_consumer(self.queue_name, self.consume_wrapper) self.q.start_consuming() - def stop(self): # nocoverage - # type: () -> None + def stop(self) -> None: # nocoverage self.q.stop_consuming() class LoopQueueProcessingWorker(QueueProcessingWorker): @@ -195,8 +183,8 @@ class LoopQueueProcessingWorker(QueueProcessingWorker): @assign_queue('signups') class SignupWorker(QueueProcessingWorker): - def consume(self, data): - # type: (Dict[str, Any]) -> None + def consume(self, data: Dict[str, Any]) -> None: + # TODO: This is the only implementation with Dict cf Mapping; should we simplify? user_profile = get_user_profile_by_id(data['user_id']) logging.info("Processing signup for user %s in realm %s" % ( user_profile.email, user_profile.realm.string_id)) @@ -218,9 +206,7 @@ class SignupWorker(QueueProcessingWorker): @assign_queue('invites') class ConfirmationEmailWorker(QueueProcessingWorker): - def consume(self, data): - # type: (Mapping[str, Any]) -> None - + def consume(self, data: Mapping[str, Any]) -> None: if "email" in data: # When upgrading from a version up through 1.7.1, there may be # existing items in the queue with `email` instead of `prereg_id`. @@ -255,8 +241,7 @@ class ConfirmationEmailWorker(QueueProcessingWorker): @assign_queue('user_activity') class UserActivityWorker(QueueProcessingWorker): - def consume(self, event): - # type: (Mapping[str, Any]) -> None + def consume(self, event: Mapping[str, Any]) -> None: user_profile = get_user_profile_by_id(event["user_profile_id"]) client = get_client(event["client"]) log_time = timestamp_to_datetime(event["time"]) @@ -265,16 +250,14 @@ class UserActivityWorker(QueueProcessingWorker): @assign_queue('user_activity_interval') class UserActivityIntervalWorker(QueueProcessingWorker): - def consume(self, event): - # type: (Mapping[str, Any]) -> None + def consume(self, event: Mapping[str, Any]) -> None: user_profile = get_user_profile_by_id(event["user_profile_id"]) log_time = timestamp_to_datetime(event["time"]) do_update_user_activity_interval(user_profile, log_time) @assign_queue('user_presence') class UserPresenceWorker(QueueProcessingWorker): - def consume(self, event): - # type: (Mapping[str, Any]) -> None + def consume(self, event: Mapping[str, Any]) -> None: logging.debug("Received presence event: %s" % (event),) user_profile = get_user_profile_by_id(event["user_profile_id"]) client = get_client(event["client"]) @@ -325,22 +308,19 @@ class MissedMessageSendingWorker(EmailSendingWorker): # nocoverage @assign_queue('missedmessage_mobile_notifications') class PushNotificationsWorker(QueueProcessingWorker): # nocoverage - def consume(self, data): - # type: (Mapping[str, Any]) -> None + def consume(self, data: Mapping[str, Any]) -> None: handle_push_notification(data['user_profile_id'], data) # We probably could stop running this queue worker at all if ENABLE_FEEDBACK is False @assign_queue('feedback_messages') class FeedbackBot(QueueProcessingWorker): - def consume(self, event): - # type: (Mapping[str, Any]) -> None + def consume(self, event: Mapping[str, Any]) -> None: logging.info("Received feedback from %s" % (event["sender_email"],)) handle_feedback(event) @assign_queue('error_reports') class ErrorReporter(QueueProcessingWorker): - def consume(self, event): - # type: (Mapping[str, Any]) -> None + def consume(self, event: Mapping[str, Any]) -> None: logging.info("Processing traceback with type %s for %s" % (event['type'], event.get('user_email'))) if settings.ERROR_REPORTING: do_report_error(event['report']['host'], event['type'], event['report']) @@ -350,8 +330,7 @@ class SlowQueryWorker(LoopQueueProcessingWorker): # Sleep 1 minute between checking the queue sleep_delay = 60 * 1 - def consume_batch(self, slow_queries): - # type: (List[Dict[str, Any]]) -> None + def consume_batch(self, slow_queries: List[Dict[str, Any]]) -> None: for query in slow_queries: logging.info("Slow query: %s" % (query)) @@ -371,15 +350,13 @@ class SlowQueryWorker(LoopQueueProcessingWorker): @assign_queue("message_sender") class MessageSenderWorker(QueueProcessingWorker): - def __init__(self): - # type: () -> None + def __init__(self) -> None: super().__init__() self.redis_client = get_redis_client() self.handler = BaseHandler() self.handler.load_middleware() - def consume(self, event): - # type: (Mapping[str, Any]) -> None + def consume(self, event: Mapping[str, Any]) -> None: server_meta = event['server_meta'] environ = { @@ -435,8 +412,7 @@ class MessageSenderWorker(QueueProcessingWorker): class DigestWorker(QueueProcessingWorker): # nocoverage # Who gets a digest is entirely determined by the enqueue_digest_emails # management command, not here. - def consume(self, event): - # type: (Mapping[str, Any]) -> None + def consume(self, event: Mapping[str, Any]) -> None: logging.info("Received digest event: %s" % (event,)) handle_digest_email(event["user_profile_id"], event["cutoff"]) @@ -444,8 +420,7 @@ class DigestWorker(QueueProcessingWorker): # nocoverage class MirrorWorker(QueueProcessingWorker): # who gets a digest is entirely determined by the enqueue_digest_emails # management command, not here. - def consume(self, event): - # type: (Mapping[str, Any]) -> None + def consume(self, event: Mapping[str, Any]) -> None: message = force_str(event["message"]) mirror_email(email.message_from_string(message), rcpt_to=event["rcpt_to"], pre_checked=True) @@ -456,8 +431,7 @@ class TestWorker(QueueProcessingWorker): # creating significant side effects. It can be useful in development or # for troubleshooting prod/staging. It pulls a message off the test queue # and appends it to a file in /tmp. - def consume(self, event): # nocoverage - # type: (Mapping[str, Any]) -> None + def consume(self, event: Mapping[str, Any]) -> None: # nocoverage fn = settings.ZULIP_WORKER_TEST_FILE message = ujson.dumps(event) logging.info("TestWorker should append this message to %s: %s" % (fn, message)) @@ -466,8 +440,7 @@ class TestWorker(QueueProcessingWorker): @assign_queue('embed_links') class FetchLinksEmbedData(QueueProcessingWorker): - def consume(self, event): - # type: (Mapping[str, Any]) -> None + def consume(self, event: Mapping[str, Any]) -> None: for url in event['urls']: url_preview.get_link_embed_data(url) @@ -496,8 +469,7 @@ class FetchLinksEmbedData(QueueProcessingWorker): @assign_queue('outgoing_webhooks') class OutgoingWebhookWorker(QueueProcessingWorker): - def consume(self, event): - # type: (Mapping[str, Any]) -> None + def consume(self, event: Mapping[str, Any]) -> None: message = event['message'] dup_event = cast(Dict[str, Any], event) dup_event['command'] = message['content'] @@ -512,12 +484,10 @@ class OutgoingWebhookWorker(QueueProcessingWorker): @assign_queue('embedded_bots') class EmbeddedBotWorker(QueueProcessingWorker): - def get_bot_api_client(self, user_profile): - # type: (UserProfile) -> EmbeddedBotHandler + def get_bot_api_client(self, user_profile: UserProfile) -> EmbeddedBotHandler: return EmbeddedBotHandler(user_profile) - def consume(self, event): - # type: (Mapping[str, Any]) -> None + def consume(self, event: Mapping[str, Any]) -> None: user_profile_id = event['user_profile_id'] user_profile = get_user_profile_by_id(user_profile_id)