Is there a way to manually switch on asyncio event loop

I want to use the event loop to monitor any inserting data into my asyncio.Queue(you can find its source code here https://github.com/python/cpython/blob/3.6/Lib/asyncio/queues.py), but I run into some problems. Here is the following code:

import asyncio
import threading

async def recv(q):
    while True:
        msg = await q.get()
        print(msg)

async def checking_task():
    while True:
        await asyncio.sleep(0.1)

def loop_in_thread(loop,q):
    asyncio.set_event_loop(loop)
    asyncio.ensure_future(recv(q))
    asyncio.ensure_future(insert(q))
    # asyncio.ensure_future(checking_task()) comment this out, and it will work as intended
    loop.run_forever()

async def insert(q):
    print('invoked')
    await q.put('hello')

q = asyncio.Queue() 
loop = asyncio.get_event_loop()
t = threading.Thread(target=loop_in_thread, args=(loop, q,))
t.start()

The program has started and we can see the following result

invoked
hello
-> print(asyncio.Task.all_tasks())
{<Task pending coro=<recv() running at C:/Users/costa/untitled3.py:39>
wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x000001E215DCFAC8>()]>>}

But now if we manually add data into q by using q.put_nowait('test') , we would get the following result:

q.put_nowait('test') # a non-async way to add data into queue
-> print(asyncio.Task.all_tasks())
{<Task pending coro=<recv() running at C:/Users/costa/untitled3.py:39>
 wait_for=<Future finished result=None>>}

As you can see, the future is already finished, yet we still haven't print out the newly added string 'test' . In other words, msg = await q.get() is still waiting even though the Future related to q.get() is done and there are no other tasks running. This confuses me because in the official documentation(https://docs.python.org/3/library/asyncio-task.html), it says

result = await future or result = yield from future – suspends the coroutine until the future is done, then returns the future's result

It seemed that even though the Future is done, we still need some sort of await in other async function to make the event loop keep processing tasks.

I found a workaround to this problem, which is adding a checking_task() , and also add that coroutine into the event loop; then it will work as intended.

But adding a checking_task() coroutine is very costly for CPU since it just runs a while loop. I am wondering if there is some manual way for us to trigger that await event without using a async function. For example, something magical like

q.put_nowait('test')
loop.ok_you_can_start_running_other_pending_tasks()

Helps will be greatly appreciated! Thanks.


So I ended up with using

loop.call_soon_threadsafe(q.put_nowait, 'test')

and it will work as intended. After figure this out, I searched some information about . It turned out this post (Scheduling an asyncio coroutine from another thread) has the same problem. And @kfx's answer would also work, which is

loop.call_soon_threadsafe(loop.create_task, q.put('test'))

Notice asyncio.Queue.put() is a coroutine but asyncio.Queue.put_nowait() is a normal function.

链接地址: http://www.djcxy.com/p/53216.html

上一篇: 检测一个空闲的asyncio事件循环

下一篇: 有没有办法手动打开asyncio事件循环