mypy: Correct annotations in queue_processors.py.

This commit is contained in:
neiljp (Neil Pilgrim)
2018-03-09 18:29:20 +00:00
committed by Tim Abbott
parent 87f1516f5d
commit 6fc4d5bf40

View File

@@ -1,5 +1,5 @@
# Documented in https://zulip.readthedocs.io/en/latest/subsystems/queuing.html
from typing import Any, Callable, Dict, List, Mapping, Optional, cast
from typing import Any, Callable, Dict, List, Mapping, Optional, cast, TypeVar, Type
import copy
import signal
@@ -63,10 +63,12 @@ logger = logging.getLogger(__name__)
class WorkerDeclarationException(Exception):
pass
ConcreteQueueWorker = TypeVar('ConcreteQueueWorker', bound='QueueProcessingWorker')
def assign_queue(queue_name, enabled=True, queue_type="consumer"):
# type: (str, bool, str) -> Callable[[QueueProcessingWorker], QueueProcessingWorker]
# type: (str, bool, str) -> Callable[[Type[ConcreteQueueWorker]], Type[ConcreteQueueWorker]]
def decorate(clazz):
# type: (QueueProcessingWorker) -> QueueProcessingWorker
# type: (Type[ConcreteQueueWorker]) -> Type[ConcreteQueueWorker]
clazz.queue_name = queue_name
if enabled:
register_worker(queue_name, clazz, queue_type)
@@ -74,9 +76,9 @@ def assign_queue(queue_name, enabled=True, queue_type="consumer"):
return decorate
worker_classes = {} # type: Dict[str, Any] # Any here should be QueueProcessingWorker type
queues = {} # type: Dict[str, Dict[str, QueueProcessingWorker]]
queues = {} # type: Dict[str, Dict[str, Type[QueueProcessingWorker]]]
def register_worker(queue_name, clazz, queue_type):
# type: (str, QueueProcessingWorker, str) -> None
# type: (str, Type[QueueProcessingWorker], str) -> None
if queue_type not in queues:
queues[queue_type] = {}
queues[queue_type][queue_name] = clazz