Asynchronous RabbitMQ consumer with aioamqp
I'm trying to write an asynchronous consumer using asyncio/aioamqp. My problem is, the callback coroutine (below) is blocking. I set the channel to do a basic_consume(), and assign the callback as callback(). The callback has a "yield from asyncio.sleep" statement (to simulate "work"), which takes an integer from the publisher and sleeps for that amount of time before printing the message.
If I published two messages, one with a time of "10", immediately followed by one with a time of "1", I expected the second message would print first, since it has a shorter sleep time. Instead, the callback blocks for 10 seconds, prints the first message, and then prints the second.
It appears either basic_consume, or the callback, is blocking somewhere. Is there another way this could be handled?
@asyncio.coroutine
def callback(body, envelope, properties):
yield from asyncio.sleep(int(body))
print("consumer {} recved {} ({})".format(envelope.consumer_tag, body, envelope.delivery_tag))
@asyncio.coroutine
def receive_log():
try:
transport, protocol = yield from aioamqp.connect('localhost', 5672, login="login", password="password")
except:
print("closed connections")
return
channel = yield from protocol.channel()
exchange_name = 'cloudstack-events'
exchange_name = 'test-async-exchange'
queue_name = 'async-queue-%s' % random.randint(0, 10000)
yield from channel.exchange(exchange_name, 'topic', auto_delete=True, passive=False, durable=False)
yield from asyncio.wait_for(channel.queue(queue_name, durable=False, auto_delete=True), timeout=10)
binding_keys = ['mykey']
for binding_key in binding_keys:
print("binding", binding_key)
yield from asyncio.wait_for(channel.queue_bind(exchange_name=exchange_name,
queue_name=queue_name,
routing_key=binding_key), timeout=10)
print(' [*] Waiting for logs. To exit press CTRL+C')
yield from channel.basic_consume(queue_name, callback=callback)
loop = asyncio.get_event_loop()
loop.create_task(receive_log())
loop.run_forever()
For those interested, I figured out a way to do this. I'm not sure if it's best practice, but it's accomplishing what I need.
Rather than do the "work" (in this case, async.sleep) inside the callback, I create a new task on the loop, and schedule a separate co-routine to run do_work(). Presumably this is working, because it's freeing up callback() to return immediately.
I loaded up a few hundred events in Rabbit with different sleep timers, and they were interleaved when printed by the code below. So it seems to be working. Hope this helps someone!
@asyncio.coroutine
def do_work(envelope, body):
yield from asyncio.sleep(int(body))
print("consumer {} recved {} ({})".format(envelope.consumer_tag, body, envelope.delivery_tag))
@asyncio.coroutine
def callback(body, envelope, properties):
loop = asyncio.get_event_loop()
loop.create_task(do_work(envelope, body))
@asyncio.coroutine
def receive_log():
try:
transport, protocol = yield from aioamqp.connect('localhost', 5672, login="login", password="password")
except:
print("closed connections")
return
channel = yield from protocol.channel()
exchange_name = 'cloudstack-events'
exchange_name = 'test-async-exchange'
queue_name = 'async-queue-%s' % random.randint(0, 10000)
yield from channel.exchange(exchange_name, 'topic', auto_delete=True, passive=False, durable=False)
yield from asyncio.wait_for(channel.queue(queue_name, durable=False, auto_delete=True), timeout=10)
binding_keys = ['mykey']
for binding_key in binding_keys:
print("binding", binding_key)
yield from asyncio.wait_for(channel.queue_bind(exchange_name=exchange_name,
queue_name=queue_name,
routing_key=binding_key), timeout=10)
print(' [*] Waiting for logs. To exit press CTRL+C')
yield from channel.basic_consume(queue_name, callback=callback)
loop = asyncio.get_event_loop()
loop.create_task(receive_log())
loop.run_forever()
链接地址: http://www.djcxy.com/p/86572.html