首页 > 解决方案 > 如何将 AsyncIterator 转换为 Iterator?

问题描述

是否可以在 AsyncIterator 周围包装一个函数以获取(同步)迭代器?

我能够将 AsyncIterator 转换为 List 并将其用作 Iterator,但希望不转换为 Int

class AsyncToSyncIterableUtils(Generic[T]):

    @staticmethod
    def sync_all(iterable: AsyncIterable[T]) -> Iterable[T]:
        results:List[T] = []
        async def _wrapper():
            async for m in iterable:
                results.append(m)
        loop = asyncio.get_event_loop()
        try:
            loop.run_until_complete(_wrapper())
        finally:
            loop.run_until_complete(loop.shutdown_asyncgens()) # see: https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.shutdown_asyncgens
            loop.close()
        return results

标签: pythonpython-3.x

解决方案


推荐阅读