queue: Use a thread-local Pika connection.

According to the documentation: “Pika does not have any notion of
threading in the code. If you want to use Pika with threading, make
sure you have a Pika connection per thread, created in that thread. It
is not safe to share one Pika connection across threads, with one
exception: you may call the connection method add_callback_threadsafe
from another thread to schedule a callback within an active pika
connection.”

https://pika.readthedocs.io/en/stable/faq.html

This also means that synchronous Django code running in Tornado will
use its own synchronous SimpleQueueClient rather than sharing the
asynchronous TornadoQueueClient, which is unfortunate but necessary as
they’re about to be on different threads.

Signed-off-by: Anders Kaseorg <anders@zulip.com>
(cherry picked from commit c263bfdb41)
This commit is contained in:
Anders Kaseorg
2022-04-15 17:30:58 -07:00
committed by Tim Abbott
parent ad9187d9f7
commit b4a0684201
3 changed files with 24 additions and 34 deletions

View File

@@ -393,28 +393,20 @@ class TornadoQueueClient(QueueClient[Channel]):
)
queue_client: Optional[Union[SimpleQueueClient, TornadoQueueClient]] = None
thread_data = threading.local()
def get_queue_client() -> Union[SimpleQueueClient, TornadoQueueClient]:
global queue_client
if queue_client is None:
if settings.RUNNING_INSIDE_TORNADO and settings.USING_RABBITMQ:
queue_client = TornadoQueueClient()
elif settings.USING_RABBITMQ:
queue_client = SimpleQueueClient()
else:
if not hasattr(thread_data, "queue_client"):
if not settings.USING_RABBITMQ:
raise RuntimeError("Cannot get a queue client without USING_RABBITMQ")
thread_data.queue_client = SimpleQueueClient()
return queue_client
return thread_data.queue_client
# We using a simple lock to prevent multiple RabbitMQ messages being
# sent to the SimpleQueueClient at the same time; this is a workaround
# for an issue with the pika BlockingConnection where using
# BlockingConnection for multiple queues causes the channel to
# randomly close.
queue_lock = threading.RLock()
def set_queue_client(queue_client: Union[SimpleQueueClient, TornadoQueueClient]) -> None:
thread_data.queue_client = queue_client
def queue_json_publish(
@@ -422,16 +414,15 @@ def queue_json_publish(
event: Dict[str, Any],
processor: Optional[Callable[[Any], None]] = None,
) -> None:
with queue_lock:
if settings.USING_RABBITMQ:
get_queue_client().json_publish(queue_name, event)
elif processor:
processor(event)
else:
# Must be imported here: A top section import leads to circular imports
from zerver.worker.queue_processors import get_worker
if settings.USING_RABBITMQ:
get_queue_client().json_publish(queue_name, event)
elif processor:
processor(event)
else:
# Must be imported here: A top section import leads to circular imports
from zerver.worker.queue_processors import get_worker
get_worker(queue_name).consume_single_event(event)
get_worker(queue_name).consume_single_event(event)
def retry_event(