asyncio how to pause coroutine until send is called
Lets say that I have a bus that receives messages from somewhere. Every message has target
and msg
, and I want to implement subscription mecahnism, so other coroutines can subscribe
to specific target
subscriptions = {}
async def subscribe(target):
subscriptions[target]= wait_for_messages()
async def proc_msg(target,msg);
subscriptions[target].send(msg)
async def wait_for_messages():
while True:
asyncio.sleep(1)
async def subscribe(target)
async for msg in subscribe(target):
print(msg)
My question is regarding the wait_for_messages
, Even though sleep kind of works (the code is for presentation only), How can I await for the send command without sleeping and awaking constantly.
Subscription mechanism is from callback world and asyncio.Future - is some sort of bridge from callback-based to coroutine-based world: with it you can await of something happened. Use set_result
on msg coming and await
for Future to recieve msg inside coroutine.
For example:
import asyncio
from random import randint
from collections import defaultdict
from contextlib import suppress
# MSG SUBSCRIBE/SEND API:
_futures = defaultdict(asyncio.Future)
async def msg_for_target(target):
"""Await for this coroutine to recieve msg."""
return await _futures[target]
def send_msg(target, msg):
_futures[target].set_result(msg)
del _futures[target] # We would need new Future for future subscribers
# TEST:
async def random_sender():
"""Send random messages."""
i = 0
while True:
i += 1
await asyncio.sleep(0.5)
target = randint(0, 3)
msg = f'msg {i}'
print(f'Sending msg "{msg}" for target "{target}" ...')
send_msg(target, msg)
async def main():
task = asyncio.ensure_future(random_sender())
for target in (2, 0, 3, 1):
print(f'> Subscribed for target "{target}"')
msg = await msg_for_target(target)
print(f'> Recieved "{msg}"')
print()
# Cleanup, see https://stackoverflow.com/a/43810272/1113207
task.cancel()
with suppress(asyncio.CancelledError):
await task
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
Output:
> Subscribed for target "2"
Sending msg "msg 1" for target "2" ...
> Recieved "msg 1"
> Subscribed for target "0"
Sending msg "msg 2" for target "3" ...
Sending msg "msg 3" for target "0" ...
> Recieved "msg 3"
> Subscribed for target "3"
Sending msg "msg 4" for target "2" ...
Sending msg "msg 5" for target "2" ...
Sending msg "msg 6" for target "2" ...
Sending msg "msg 7" for target "1" ...
Sending msg "msg 8" for target "3" ...
> Recieved "msg 8"
> Subscribed for target "1"
Sending msg "msg 9" for target "0" ...
Sending msg "msg 10" for target "0" ...
Sending msg "msg 11" for target "2" ...
Sending msg "msg 12" for target "2" ...
Sending msg "msg 13" for target "1" ...
> Recieved "msg 13"
链接地址: http://www.djcxy.com/p/53234.html
下一篇: asyncio如何暂停协程直到发送被调用