python - 异步 python itertools 链多个生成器
问题描述
更新后的问题:
假设我有 2 个处理生成器函数:
def gen1(): # just for examples,
yield 1 # yields actually carry
yield 2 # different computation weight
yield 3 # in my case
def gen2():
yield 4
yield 5
yield 6
我可以用 itertools 链接它们
from itertools import chain
mix = chain(gen1(), gen2())
然后我可以用它创建另一个生成器函数对象,
def mix_yield():
for item in mix:
yield item
或者只是如果我只是想next(mix)
,它就在那里。
我的问题是,我怎样才能在异步代码中做同样的事情?
因为我需要它:
- 以产量(一个接一个)或使用
next
迭代器返回 - 最快解决的产量优先(异步)
上一篇 更新:
经过试验和研究,我发现了 aiostream库,它声明为 itertools 的异步版本,所以我做了什么:
import asyncio
from aiostream import stream
async def gen1():
await asyncio.sleep(0)
yield 1
await asyncio.sleep(0)
yield 2
await asyncio.sleep(0)
yield 3
async def gen2():
await asyncio.sleep(0)
yield 4
await asyncio.sleep(0)
yield 5
await asyncio.sleep(0)
yield 6
a_mix = stream.combine.merge(gen1(),gen2())
async def a_mix_yield():
for item in a_mix:
yield item
但我还是做不到next(a_mix)
TypeError: 'merge' object is not an iterator
或者next(await a_mix)
raise StreamEmpty()
虽然我仍然可以将其列入列表:
print(await stream.list(a_mix))
# [1, 2, 4, 3, 5, 6]
所以一个目标完成了,还有一个目标:
以产量(一个接一个)或使用
next
迭代器返回- 最快解决的产量优先(异步)
解决方案
Python 的next
内置函数只是一种__next__
在对象上调用底层方法的便捷方式。的异步等价物__next__
是__anext__
异步迭代器上的方法。没有anext
全局函数,但可以很容易地编写它:
async def anext(aiterator):
return await aiterator.__anext__()
但是节省的费用是如此之少,以至于在极少数情况下需要这样做时,不妨__anext__
直接调用。异步迭代器又是通过调用(类似于常规迭代器提供的)从异步迭代器中获得的。手动驱动的异步迭代如下所示:__aiter__
__iter__
a_iterator = obj.__aiter__() # regular method
elem1 = await a_iterator.__anext__() # async method
elem2 = await a_iterator.__anext__() # async method
...
__anext__
StopAsyncIteration
当没有更多元素可用时将引发。要遍历异步迭代器,应该使用async for
.
这是一个可运行的示例,基于您的代码,同时使用__anext__
和async for
耗尽设置的流aiostream.stream.combine.merge
:
async def main():
a_mix = stream.combine.merge(gen1(), gen2())
async with a_mix.stream() as streamer:
mix_iter = streamer.__aiter__()
print(await mix_iter.__anext__())
print(await mix_iter.__anext__())
print('remaining:')
async for x in mix_iter:
print(x)
asyncio.get_event_loop().run_until_complete(main())
推荐阅读
- wordpress - 帖子不会出现在 WP Admin 中
- vim - 如何使用 BufWritePre 更新我的 cpp 评论
- php - 如何在 youtube 直播中使用自定义石板图像?
- apache - Heroku 上的 SSL 重定向配置在访问根域时导致错误 403 (Forbidden)
- php - 限制对 php ajax 脚本的调用
- java - 我需要帮助来创建带有结果集的可观察列表吗?
- python-3.x - 将 Python3 转换为 Python2
- r - 我正在尝试识别数据集行中缺失值的模式
- c# - 将 2D 图像作为输入的 3D 地图映射器
- javascript - 在 ES6 类中添加上下文/私有变量(不引用 this。)