mirror of
				https://github.com/zulip/zulip.git
				synced 2025-11-04 05:53:43 +00:00 
			
		
		
		
	queue: Take advantage of ABC for defining abstract worker base classes.
QueueProcessingWorker and LoopQueueProcessingWorker are abstract classes meant to be subclassed by a class that will define its own consume() or consume_batch() method. ABCs are suited for that and we can tag consume/consume_batch with the @abstractmethod wrapper which will prevent subclasses that don't define these methods properly to be impossible to even instantiate (as opposed to only crashing once consume() is called). It's also nicely detected by mypy, which will throw errors such as this on invalid use: error: Only concrete class can be given where "Type[TestWorker]" is expected error: Cannot instantiate abstract class 'TestWorker' with abstract attribute 'consume' Due to it being detected by mypy, we can remove the test test_worker_noconsume which just tested the old version of this - raising an exception when the unimplemented consume() gets called. Now it can be handled already on the linter level.
This commit is contained in:
		
				
					committed by
					
						
						Tim Abbott
					
				
			
			
				
	
			
			
			
						parent
						
							ec209a9bc9
						
					
				
				
					commit
					e90866876c
				
			@@ -1,4 +1,5 @@
 | 
			
		||||
# Documented in https://zulip.readthedocs.io/en/latest/subsystems/queuing.html
 | 
			
		||||
from abc import ABC, abstractmethod
 | 
			
		||||
from typing import Any, Callable, Dict, List, Mapping, Optional, cast, Tuple, TypeVar, Type
 | 
			
		||||
 | 
			
		||||
import copy
 | 
			
		||||
@@ -119,7 +120,7 @@ def retry_send_email_failures(
 | 
			
		||||
 | 
			
		||||
    return wrapper
 | 
			
		||||
 | 
			
		||||
class QueueProcessingWorker:
 | 
			
		||||
class QueueProcessingWorker(ABC):
 | 
			
		||||
    queue_name = None  # type: str
 | 
			
		||||
 | 
			
		||||
    def __init__(self) -> None:
 | 
			
		||||
@@ -127,8 +128,9 @@ class QueueProcessingWorker:
 | 
			
		||||
        if self.queue_name is None:
 | 
			
		||||
            raise WorkerDeclarationException("Queue worker declared without queue_name")
 | 
			
		||||
 | 
			
		||||
    @abstractmethod
 | 
			
		||||
    def consume(self, data: Dict[str, Any]) -> None:
 | 
			
		||||
        raise WorkerDeclarationException("No consumer defined!")
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
    def consume_wrapper(self, data: Dict[str, Any]) -> None:
 | 
			
		||||
        try:
 | 
			
		||||
@@ -184,8 +186,9 @@ class LoopQueueProcessingWorker(QueueProcessingWorker):
 | 
			
		||||
            if not self.sleep_only_if_empty or len(events) == 0:
 | 
			
		||||
                time.sleep(self.sleep_delay)
 | 
			
		||||
 | 
			
		||||
    @abstractmethod
 | 
			
		||||
    def consume_batch(self, events: List[Dict[str, Any]]) -> None:
 | 
			
		||||
        raise NotImplementedError
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
    def consume(self, event: Dict[str, Any]) -> None:
 | 
			
		||||
        """In LoopQueueProcessingWorker, consume is used just for automated tests"""
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user