首页 > 解决方案 > 将 asyncio.ensure_future 和 loop.run_until_complete 组合在一个 api 调用中?

问题描述

我编写了一个异步函数,它收集多个文本数据并批量处理数据。之后,它返回输出,如下所示:

import sys
import asyncio

Model_runner():
    '''
    The model runner combines all the input coming to it and combines in a batch of 10 or 1 sec, which ever duration is less. 
    After combining, it does processing and returns the output 
    '''


loop = asyncio.get_event_loop()
model_obj = ModelRunner(loop)
loop.create_task(model_obj.model_runner())


async def process_text(text):
        out_ = await model_obj.process_input(text)
        return out_

为了获得输出,我正在运行以下代码:

task1 = asyncio.ensure_future(process_text(text1))
task2 = asyncio.ensure_future(process_text(text2))
task3 = asyncio.ensure_future(process_text(text3))
task4 = asyncio.ensure_future(process_text(text4))
async_tasks = [task1, task2, task3, task4]
out1, out2 ,out3 ,out4 = loop.run_until_complete(asyncio.gather(*async_tasks))

这里,out1、out2、out3、out4是处理文本数据后的输出。

在这里,我不想像[task1,task2,task3,task4]这样组合任务然后调用loop.run_until_complete来获取输出。相反,我正在寻找这样的功能:

out1 = func(text1)
out2 = func(text2) 
etc..

但是,它们应该像 asyncio.ensure_future 那样以非阻塞方式工作。我怎样才能做到这一点。提前致谢。

标签: pythonpython-asyncio

解决方案


两个明显的选择:

  • 如果您已经有多个线程,为什么还要使用 asyncio 呢?只需制作process_text一个常规的阻塞函数并从这些线程中调用它。
  • 相反,如果您使用的是 asyncio,为什么还要使用多个线程呢?使您的顶级任务异步并在一个线程中运行它们。

如果你真的必须使用多线程和异步函数:

  • 让一个线程运行您的 asyncio 循环和您已经提到的工作线程,并loop.call_soon_threadsafe在线程中使用以强制 asyncs 函数在异步线程中运行。如果您想将结果返回给线程,您可以使用 aqueue.Queue将结果(或结果)发回。
  • 最后一个选项可能是最糟糕的,几乎可以肯定不是你想要的,但我提到它是为了完整性:从每个需要它的线程启动一个单独的 asyncio 事件循环,并使用它们直接在工作线程中运行你的异步函数。

推荐阅读