mirror of
https://github.com/zulip/zulip.git
synced 2025-11-06 23:13:25 +00:00
Add queue support for draining a queue completely
(imported from commit 50cc52f8b5b74b274024222596c6f2bd27832c89)
This commit is contained in:
@@ -106,6 +106,24 @@ class SimpleQueueClient(object):
|
||||
return callback(ch, method, properties, simplejson.loads(body))
|
||||
return self.register_consumer(queue_name, wrapped_callback)
|
||||
|
||||
def drain_queue(self, queue_name, json=False):
|
||||
"Returns all messages in the desired queue"
|
||||
messages =[]
|
||||
def opened():
|
||||
while True:
|
||||
(meta, _, message) = self.channel.basic_get(queue_name)
|
||||
|
||||
if not message:
|
||||
break;
|
||||
|
||||
self.channel.basic_ack(meta.delivery_tag)
|
||||
if json:
|
||||
message = simplejson.loads(message)
|
||||
messages.append(message)
|
||||
|
||||
self.ensure_queue(queue_name, opened)
|
||||
return messages
|
||||
|
||||
def start_consuming(self):
|
||||
self.channel.start_consuming()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user