python - Python asyncio 从“call_soon_threadsafe”收集返回值
问题描述
我试图理解python asyncio的call_soon_threadsafe API,但是失败了,下面的示例代码,如果我的simple
协程想要返回一些东西,我应该如何从调用方获取返回值?
import time
import asyncio as aio
import uvloop
from threading import Thread
aio.set_event_loop_policy(uvloop.EventLoopPolicy())
async def simple(a, fut:aio.Future):
await aio.sleep(a)
return fut.set_result(a)
def delegator(loop):
aio.set_event_loop(loop)
loop.run_forever()
loop_exec = aio.new_event_loop()
t = Thread(target=delegator, args=(loop_exec,))
t.start()
if __name__ == '__main__':
start_time = time.time()
fut = loop_exec.create_future() # tried to get back returned value by future
handle = loop_exec.call_soon_threadsafe(aio.ensure_future, simple(3, fut))
res = aio.wait_for(fut, 10)
print('Time consumed: {}s'.format(time.time() - start_time))
print('>>>>>>>>>>', res)
# Output
Time consumed: 3.2901763916015625e-05s
>>>>>>>>>> <generator object wait_for at 0x110bb9b48>
如您所见,我试图通过将未来传递给在不同线程中运行的协程来取回返回值,但仍然不知道如何正确获取它。
基本上两个问题:
- 使用上面的示例代码,我怎样才能从调用方取回返回值?
- 这个的实际用例是什么
call_soon_threadsafe
,只是感觉run_coroutine_threadsafe
使用起来更方便,几乎可以涵盖我能想象到的这种不同线程协程交互的所有情况。
解决方案
使用上面的示例代码,我怎样才能从调用方取回返回值?
由于事件循环在主线程之外运行,因此需要使用线程感知同步设备。例如:
async def simple(a, event):
await asyncio.sleep(a)
event.simple_result = a
event.set()
done = threading.Event()
loop_exec.call_soon_threadsafe(aio.ensure_future, simple(3, done))
done.wait(10)
res = done.simple_result
或者,您可以使用 进行同步concurrent.futures.Future
,这就像带有对象有效负载的一次性事件。(请注意,您不能使用 asyncio 未来,因为它不是线程安全的。)
async def simple(a, fut):
await asyncio.sleep(a)
fut.set_result(a)
done = concurrent.futures.Future()
loop_exec.call_soon_threadsafe(aio.ensure_future, simple(3, done))
res = done.result(10)
正如文森特在评论中指出的那样,这run_coroutine_threadsafe
将为您做:
async def simple(a):
await asyncio.sleep(a)
return a
fut = asyncio.run_coroutine_threadsafe(simple(3))
res = fut.result(10)
这个的实际用例是什么
call_soon_threadsafe
最简单的答案是,call_soon_threadsafe
当您只想告诉事件循环执行或开始执行某项操作时,您可以使用较低级别的 API。call_soon_threadsafe
是用于实现类似功能的构建块run_coroutine_threadsafe
,但还有许多其他功能。至于为什么您要自己使用该管道功能...
有时你想执行一个普通的函数,而不是一个协程。有时你的函数是一劳永逸的,你并不关心它的返回值。(或者该函数可能最终会通过某个侧通道通知您它的完成。)在这些情况下call_soon_threadsafe
,它是该工作的正确工具,因为它更轻量级,因为它不会尝试创建额外的concurrent.futures.Future
并将其附加到已执行的代码。例子:
loop.call_soon_threadsafe(loop.stop)
告诉事件循环停止运行loop.call_soon_threadsafe(queue.put_nowait, some_item)
向无界异步队列添加内容loop.call_soon_threadsafe(asyncio.create_task, coroutinefn())
将协程提交到事件循环而不等待它完成loop.call_soon_threadsafe(some_future.set_result, value)
从不同的线程设置 asyncio 未来的结果- 此答案中的低级代码