asyncio如何暂停协程直到发送被调用

假设我有一辆公共汽车从某个地方接收消息。 每条消息都有targetmsg ,我想实现订阅mecahnism,所以其他协程可以subscribe特定的target

subscriptions = {}
async def subscribe(target):
    subscriptions[target]= wait_for_messages()

async def proc_msg(target,msg);
    subscriptions[target].send(msg)

async def wait_for_messages():
     while True:
        asyncio.sleep(1)

async def subscribe(target)
    async for msg in subscribe(target):
      print(msg)

我的问题是关于wait_for_messages ,即使睡眠类型的工作(代码仅用于演示文稿),我如何等待发送命令而不睡觉并不断醒来。


订阅机制来自回调世界和asyncio.Future--是一种从基于回调的到基于协同的世界的桥梁:通过它,您可以等待发生的事情。 在msg上使用set_resultawait Future在协程中接收msg。

例如:

import asyncio
from random import randint
from collections import defaultdict
from contextlib import suppress


# MSG SUBSCRIBE/SEND API:
_futures = defaultdict(asyncio.Future)


async def msg_for_target(target):
    """Await for this coroutine to recieve msg."""
    return await _futures[target]


def send_msg(target, msg):
    _futures[target].set_result(msg)
    del _futures[target]  # We would need new Future for future subscribers


# TEST:
async def random_sender():
    """Send random messages."""
    i = 0
    while True:
        i += 1
        await asyncio.sleep(0.5)

        target = randint(0, 3)
        msg = f'msg {i}'        
        print(f'Sending msg "{msg}" for target "{target}" ...')
        send_msg(target, msg)


async def main():
    task = asyncio.ensure_future(random_sender())

    for target in (2, 0, 3, 1):
        print(f'> Subscribed for target "{target}"')
        msg = await msg_for_target(target)
        print(f'> Recieved "{msg}"')
        print()

    # Cleanup, see https://stackoverflow.com/a/43810272/1113207
    task.cancel()
    with suppress(asyncio.CancelledError):
        await task


if __name__ ==  '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    finally:
        loop.run_until_complete(loop.shutdown_asyncgens())
        loop.close()

输出:

> Subscribed for target "2"
Sending msg "msg 1" for target "2" ...
> Recieved "msg 1"

> Subscribed for target "0"
Sending msg "msg 2" for target "3" ...
Sending msg "msg 3" for target "0" ...
> Recieved "msg 3"

> Subscribed for target "3"
Sending msg "msg 4" for target "2" ...
Sending msg "msg 5" for target "2" ...
Sending msg "msg 6" for target "2" ...
Sending msg "msg 7" for target "1" ...
Sending msg "msg 8" for target "3" ...
> Recieved "msg 8"

> Subscribed for target "1"
Sending msg "msg 9" for target "0" ...
Sending msg "msg 10" for target "0" ...
Sending msg "msg 11" for target "2" ...
Sending msg "msg 12" for target "2" ...
Sending msg "msg 13" for target "1" ...
> Recieved "msg 13"
链接地址: http://www.djcxy.com/p/53233.html

上一篇: asyncio how to pause coroutine until send is called

下一篇: Calling coroutine and getting future in asyncio.Protocol.data