mirror of
				https://github.com/zulip/zulip.git
				synced 2025-11-04 14:03:30 +00:00 
			
		
		
		
	process_queue: For threaded workers, create them when they start.
Creating the QueueProcessingWorker objects when the ThreadedWorker is
created can lead to a race which caused confusing error messages:
1. A thread tries to call `self.worker = get_worker()`
2. This call raises an exception, which is caught by
   `log_and_exit_if_exception`
3. `log_and_exit_if_exception` sends our process a SIGUSR1, _but
    otherwise swallows the error_.
4. The thread's `.run()` is called, which tries to access
   `self.worker`, which was never set, and throws another exception.
5. The process handles the SIGUSR1, restarting.
Move the creation of the worker to when it is started, so the worker
object does not need to be stored, and possibly have a decoupled
failure.
			
			
This commit is contained in:
		
				
					committed by
					
						
						Tim Abbott
					
				
			
			
				
	
			
			
			
						parent
						
							be13557ead
						
					
				
				
					commit
					e1acd7b974
				
			@@ -119,15 +119,13 @@ class ThreadedWorker(threading.Thread):
 | 
				
			|||||||
        self.logger = logger
 | 
					        self.logger = logger
 | 
				
			||||||
        self.queue_name = queue_name
 | 
					        self.queue_name = queue_name
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        with log_and_exit_if_exception(logger, queue_name, threaded=True):
 | 
					 | 
				
			||||||
            self.worker = get_worker(queue_name, threaded=True)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    @override
 | 
					    @override
 | 
				
			||||||
    def run(self) -> None:
 | 
					    def run(self) -> None:
 | 
				
			||||||
        with configure_scope() as scope, log_and_exit_if_exception(
 | 
					        with configure_scope() as scope, log_and_exit_if_exception(
 | 
				
			||||||
            self.logger, self.queue_name, threaded=True
 | 
					            self.logger, self.queue_name, threaded=True
 | 
				
			||||||
        ):
 | 
					        ):
 | 
				
			||||||
            scope.set_tag("queue_worker", self.worker.queue_name)
 | 
					            scope.set_tag("queue_worker", self.queue_name)
 | 
				
			||||||
            self.worker.setup()
 | 
					            worker = get_worker(self.queue_name, threaded=True)
 | 
				
			||||||
            logging.debug("starting consuming %s", self.worker.queue_name)
 | 
					            worker.setup()
 | 
				
			||||||
            self.worker.start()
 | 
					            logging.debug("starting consuming %s", self.queue_name)
 | 
				
			||||||
 | 
					            worker.start()
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user