首页 > 解决方案 > 如何正确使用 run_in_executor?

问题描述

我尝试使用run_in_executor并有一些问题。这是代码(基本上是从文档中复制过来的)

import asyncio
import concurrent.futures


def cpu_bound(val):
    # CPU-bound operations will block the event loop:
    # in general it is preferable to run them in a
    # process pool.
    print(f'Start task: {val}')
    sum(i * i for i in range(10 ** 7))
    print(f'End task: {val}')


async def async_task(val):
    print(f'Start async task: {val}')
    while True:
        print(f'Tick: {val}')
        await asyncio.sleep(1)


async def main():
    loop = asyncio.get_running_loop()

    ## Options:

    for i in range(5):
        loop.create_task(async_task(i))

    # 1. Run in the default loop's executor:
    # for i in range(10):
    #     loop.run_in_executor(
    #         None, cpu_bound, i)
    # print('default thread pool')

    # 2. Run in a custom thread pool:
    # with concurrent.futures.ThreadPoolExecutor(max_workers=10) as pool:
    #     for i in range(10):
    #         loop.run_in_executor(
    #             pool, cpu_bound, i)
    #     print('custom thread pool')

    # 3. Run in a custom process pool:
    with concurrent.futures.ProcessPoolExecutor(max_workers = 10) as pool:
        for i in range(10):
            loop.run_in_executor(
                pool, cpu_bound, i)
        print('custom process pool')

    while True:
        await asyncio.sleep(1)


asyncio.run(main())

情况 1:run_in_executorwhere executoris None: async_task与 'execute 同时cpu_bound执行。

在其他情况下async_task, ' 将在cpu_bound' 完成后执行。我认为当我们使用ProcessPoolExecutor任务时不应该阻塞循环。我哪里错了?

标签: pythonpython-asyncio

解决方案


在其他情况下async_task, ' 将在cpu_bound' 完成后执行。我认为当我们使用ProcessPoolExecutor任务时不应该阻塞循环。我哪里错了?

问题是with XXXPoolExecutor()在块的末尾关闭了池with。池关闭等待挂起的任务完成,这会阻塞事件循环并且与 asyncio 不兼容。由于您的第一个变体不涉及with语句,因此不存在此问题。

解决方案是简单地删除with语句并创建一次池(例如在顶级或 in main()),然后在函数中使用它。pool.shutdown()如果您愿意,您可以通过在完成后调用来显式关闭池asyncio.run()

另请注意,您永远不会等待loop.run_in_executor. 这是一个错误,asyncio 可能会警告你;您可能应该将返回的值收集在一个列表中,并使用类似results = await asyncio.gather(*tasks). 这不仅会收集结果,还会确保发生在线程外函数中的异常正确地传播到您的代码中,而不是被丢弃。


推荐阅读