首页 > 解决方案 > 如何使用 asyncio 迭代阻塞迭代器?

问题描述

第 3 方 API 库提供了一个迭代器,用于列出项目和内置分页功能。它是阻塞的,我想并行进行多个列表。

async def list_multiple(params_list):
    async_tasks = []
    for params in params_list:
        async_tasks.append(list_one(**params))
    await asyncio.gather(*async_tasks)


async def list_one(**kwargs):
    blocking_iterator = some_library.get_api_list_iterator(**kwargs)
    async for item in iterate_blocking(blocking_iterator):
        pass  # do things


async def iterate_blocking(iterator):
    loop = asyncio.get_running_loop()
    while True:
        try:
            yield await loop.run_in_executor(None, iterator.next)
        except StopIteration:
            break

但是这样做会提高

TypeError: StopIteration interacts badly with generators and cannot be raised into a Future

并阻塞所有线程。如何正确迭代阻塞迭代器?

标签: pythoniteratorpython-asyncio

解决方案


请注意,用于迭代的方法__next__在 Python 3 中调用,而不是next. next可能是因为库设置了一些 Python 2 兼容性代码。

您可以通过在仍处于辅助线程中时捕获StopIteration并使用另一个异常(或另一种信号)来指示迭代结束来解决此问题。例如,这段代码使用了一个哨兵对象:

async def iterate_blocking(iterator):
    loop = asyncio.get_running_loop()
    DONE = object()
    def get_next():
        try:
            return iterator.__next__()
        except StopIteration:
            return DONE

    while True:
        obj = await loop.run_in_executor(None, get_next)
        if obj is DONE:
            break
        yield obj

这可以使用next内置的两个参数形式进一步简化,它与以下内容基本相同get_next

async def iterate_blocking(iterator):
    loop = asyncio.get_running_loop()
    DONE = object()
    while True:
        obj = await loop.run_in_executor(None, next, iterator, DONE)
        if obj is DONE:
            break
        yield obj

(以上两个示例都未经测试,因此可能存在拼写错误。)


推荐阅读