mirror of
				https://github.com/zulip/zulip.git
				synced 2025-11-03 21:43:21 +00:00 
			
		
		
		
	Delay connecting to RabbitMQ until it's necessary
Previously we were connecting at import-time. (imported from commit 9a638f0d238f3b6b00feb4aa524098a64953cb92)
This commit is contained in:
		@@ -230,10 +230,16 @@ class TornadoQueueClient(SimpleQueueClient):
 | 
				
			|||||||
            lambda: self.channel.basic_consume(wrapped_consumer, queue=queue_name,
 | 
					            lambda: self.channel.basic_consume(wrapped_consumer, queue=queue_name,
 | 
				
			||||||
                consumer_tag=self._generate_ctag(queue_name)))
 | 
					                consumer_tag=self._generate_ctag(queue_name)))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
if settings.RUNNING_INSIDE_TORNADO and settings.USING_RABBITMQ:
 | 
					queue_client = None
 | 
				
			||||||
    queue_client = TornadoQueueClient()
 | 
					def get_queue_client():
 | 
				
			||||||
elif settings.USING_RABBITMQ:
 | 
					    global queue_client
 | 
				
			||||||
    queue_client = SimpleQueueClient()
 | 
					    if queue_client is None:
 | 
				
			||||||
 | 
					        if settings.RUNNING_INSIDE_TORNADO and settings.USING_RABBITMQ:
 | 
				
			||||||
 | 
					            queue_client = TornadoQueueClient()
 | 
				
			||||||
 | 
					        elif settings.USING_RABBITMQ:
 | 
				
			||||||
 | 
					            queue_client = SimpleQueueClient()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    return queue_client
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def setup_tornado_rabbitmq():
 | 
					def setup_tornado_rabbitmq():
 | 
				
			||||||
    # When tornado is shut down, disconnect cleanly from rabbitmq
 | 
					    # When tornado is shut down, disconnect cleanly from rabbitmq
 | 
				
			||||||
@@ -250,7 +256,7 @@ queue_lock = threading.RLock()
 | 
				
			|||||||
def queue_json_publish(queue_name, event, processor):
 | 
					def queue_json_publish(queue_name, event, processor):
 | 
				
			||||||
    with queue_lock:
 | 
					    with queue_lock:
 | 
				
			||||||
        if settings.USING_RABBITMQ:
 | 
					        if settings.USING_RABBITMQ:
 | 
				
			||||||
            queue_client.json_publish(queue_name, event)
 | 
					            get_queue_client().json_publish(queue_name, event)
 | 
				
			||||||
        else:
 | 
					        else:
 | 
				
			||||||
            processor(event)
 | 
					            processor(event)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -26,7 +26,7 @@ from zerver.lib.socket import get_sockjs_router, respond_send_message
 | 
				
			|||||||
from zerver.middleware import async_request_stop
 | 
					from zerver.middleware import async_request_stop
 | 
				
			||||||
 | 
					
 | 
				
			||||||
if settings.USING_RABBITMQ:
 | 
					if settings.USING_RABBITMQ:
 | 
				
			||||||
    from zerver.lib.queue import queue_client
 | 
					    from zerver.lib.queue import get_queue_client
 | 
				
			||||||
 | 
					
 | 
				
			||||||
class Command(BaseCommand):
 | 
					class Command(BaseCommand):
 | 
				
			||||||
    option_list = BaseCommand.option_list + (
 | 
					    option_list = BaseCommand.option_list + (
 | 
				
			||||||
@@ -80,6 +80,7 @@ class Command(BaseCommand):
 | 
				
			|||||||
            print "Quit the server with %s." % (quit_command,)
 | 
					            print "Quit the server with %s." % (quit_command,)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            if settings.USING_RABBITMQ:
 | 
					            if settings.USING_RABBITMQ:
 | 
				
			||||||
 | 
					                queue_client = get_queue_client()
 | 
				
			||||||
                # Process notifications received via RabbitMQ
 | 
					                # Process notifications received via RabbitMQ
 | 
				
			||||||
                def process_notification(chan, method, props, data):
 | 
					                def process_notification(chan, method, props, data):
 | 
				
			||||||
                    tornado_callbacks.process_notification(data)
 | 
					                    tornado_callbacks.process_notification(data)
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user