首页 > 解决方案 > Python asyncio 从“call_soon_threadsafe”收集返回值

问题描述

我试图理解python asyncio的call_soon_threadsafe API,但是失败了,下面的示例代码,如果我的simple协程想要返回一些东西,我应该如何从调用方获取返回值?

import time
import asyncio as aio
import uvloop

from threading import Thread

aio.set_event_loop_policy(uvloop.EventLoopPolicy())

async def simple(a, fut:aio.Future):
  await aio.sleep(a)
  return fut.set_result(a)

def delegator(loop):
  aio.set_event_loop(loop)
  loop.run_forever()

loop_exec = aio.new_event_loop()

t = Thread(target=delegator, args=(loop_exec,))
t.start()


if __name__ == '__main__':
  start_time = time.time()

  fut = loop_exec.create_future() # tried to get back returned value by future
  handle = loop_exec.call_soon_threadsafe(aio.ensure_future, simple(3, fut))
  res = aio.wait_for(fut, 10)

  print('Time consumed: {}s'.format(time.time() - start_time))
  print('>>>>>>>>>>', res)

# Output
Time consumed: 3.2901763916015625e-05s
>>>>>>>>>> <generator object wait_for at 0x110bb9b48>

如您所见,我试图通过将未来传递给在不同线程中运行的协程来取回返回值,但仍然不知道如何正确获取它。

基本上两个问题:

  1. 使用上面的示例代码,我怎样才能从调用方取回返回值?
  2. 这个的实际用例是什么call_soon_threadsafe,只是感觉run_coroutine_threadsafe使用起来更方便,几乎可以涵盖我能想象到的这种不同线程协程交互的所有情况。

标签: pythonpython-3.xpython-asynciocoroutine

解决方案


使用上面的示例代码,我怎样才能从调用方取回返回值?

由于事件循环在主线程之外运行,因此需要使用线程感知同步设备。例如:

async def simple(a, event):
    await asyncio.sleep(a)
    event.simple_result = a
    event.set()

done = threading.Event()
loop_exec.call_soon_threadsafe(aio.ensure_future, simple(3, done))
done.wait(10)
res = done.simple_result

或者,您可以使用 进行同步concurrent.futures.Future,这就像带有对象有效负载的一次性事件。(请注意,您不能使用 asyncio 未来,因为它不是线程安全的。)

async def simple(a, fut):
    await asyncio.sleep(a)
    fut.set_result(a)

done = concurrent.futures.Future()
loop_exec.call_soon_threadsafe(aio.ensure_future, simple(3, done))
res = done.result(10)

正如文森特在评论中指出的那样,这run_coroutine_threadsafe将为您做:

async def simple(a):
    await asyncio.sleep(a)
    return a

fut = asyncio.run_coroutine_threadsafe(simple(3))
res = fut.result(10)

这个的实际用例是什么call_soon_threadsafe

最简单的答案是,call_soon_threadsafe当您只想告诉事件循环执行或开始执行某项操作时,您可以使用较低级别的 API。call_soon_threadsafe是用于实现类似功能的构建块run_coroutine_threadsafe,但还有许多其他功能。至于为什么您要自己使用该管道功能.​​..

有时你想执行一个普通的函数,而不是一个协程。有时你的函数是一劳永逸的,你并不关心它的返回值。(或者该函数可能最终会通过某个侧通道通知您它的完成。)在这些情况下call_soon_threadsafe,它是该工作的正确工具,因为它更轻量级,因为它不会尝试创建额外的concurrent.futures.Future并将其附加到已执行的代码。例子:

  • loop.call_soon_threadsafe(loop.stop)告诉事件循环停止运行
  • loop.call_soon_threadsafe(queue.put_nowait, some_item)向无界异步队列添加内容
  • loop.call_soon_threadsafe(asyncio.create_task, coroutinefn())将协程提交到事件循环而不等待它完成
  • loop.call_soon_threadsafe(some_future.set_result, value)从不同的线程设置 asyncio 未来的结果
  • 此答案中的低级代码

推荐阅读