首页 > 解决方案 > 异步 python itertools 链多个生成器

问题描述

更新后的问题:

假设我有 2 个处理生成器函数:

def gen1(): # just for examples,
  yield 1   # yields actually carry 
  yield 2   # different computation weight 
  yield 3   # in my case

def gen2():
  yield 4
  yield 5
  yield 6

我可以用 itertools 链接它们

from itertools import chain

mix = chain(gen1(), gen2())

然后我可以用它创建另一个生成器函数对象,

def mix_yield():
   for item in mix:
      yield item

或者只是如果我只是想next(mix),它就在那里。

我的问题是,我怎样才能在异步代码中做同样的事情?

因为我需要它:

上一篇 更新:

经过试验和研究,我发现了 aiostream库,它声明为 itertools 的异步版本,所以我做了什么:

import asyncio
from aiostream import stream

async def gen1(): 
     await asyncio.sleep(0) 
     yield 1 
     await asyncio.sleep(0) 
     yield 2 
     await asyncio.sleep(0) 
     yield 3 

async def gen2(): 
     await asyncio.sleep(0) 
     yield 4 
     await asyncio.sleep(0) 
     yield 5 
     await asyncio.sleep(0) 
     yield 6 

a_mix = stream.combine.merge(gen1(),gen2())

async def a_mix_yield():
   for item in a_mix:
      yield item

但我还是做不到next(a_mix)

TypeError: 'merge' object is not an iterator

或者next(await a_mix)

raise StreamEmpty()

虽然我仍然可以将其列入列表:

print(await stream.list(a_mix))
# [1, 2, 4, 3, 5, 6]

所以一个目标完成了,还有一个目标:

标签: pythonpython-3.xasynchronouspython-asynciosequence-generators

解决方案


Python 的next内置函数只是一种__next__在对象上调用底层方法的便捷方式。的异步等价物__next____anext__异步迭代器上的方法。没有anext全局函数,但可以很容易地编写它:

async def anext(aiterator):
    return await aiterator.__anext__()

但是节省的费用是如此之少,以至于在极少数情况下需要这样做时,不妨__anext__直接调用。异步迭代器又是通过调用(类似于常规迭代器提供的)从异步迭代器中获得的。手动驱动的异步迭代如下所示:__aiter____iter__

a_iterator = obj.__aiter__()          # regular method
elem1 = await a_iterator.__anext__()  # async method
elem2 = await a_iterator.__anext__()  # async method
...

__anext__StopAsyncIteration当没有更多元素可用时将引发。要遍历异步迭代器,应该使用async for.

这是一个可运行的示例,基于您的代码,同时使用__anext__async for耗尽设置的流aiostream.stream.combine.merge

async def main():
    a_mix = stream.combine.merge(gen1(), gen2())
    async with a_mix.stream() as streamer:
        mix_iter = streamer.__aiter__()    
        print(await mix_iter.__anext__())
        print(await mix_iter.__anext__())
        print('remaining:')
        async for x in mix_iter:
            print(x)

asyncio.get_event_loop().run_until_complete(main())

推荐阅读