python - 如何使用 asyncio 迭代阻塞迭代器?
问题描述
第 3 方 API 库提供了一个迭代器,用于列出项目和内置分页功能。它是阻塞的,我想并行进行多个列表。
async def list_multiple(params_list):
async_tasks = []
for params in params_list:
async_tasks.append(list_one(**params))
await asyncio.gather(*async_tasks)
async def list_one(**kwargs):
blocking_iterator = some_library.get_api_list_iterator(**kwargs)
async for item in iterate_blocking(blocking_iterator):
pass # do things
async def iterate_blocking(iterator):
loop = asyncio.get_running_loop()
while True:
try:
yield await loop.run_in_executor(None, iterator.next)
except StopIteration:
break
但是这样做会提高
TypeError: StopIteration interacts badly with generators and cannot be raised into a Future
并阻塞所有线程。如何正确迭代阻塞迭代器?
解决方案
请注意,用于迭代的方法__next__
在 Python 3 中调用,而不是next
. next
可能是因为库设置了一些 Python 2 兼容性代码。
您可以通过在仍处于辅助线程中时捕获StopIteration
并使用另一个异常(或另一种信号)来指示迭代结束来解决此问题。例如,这段代码使用了一个哨兵对象:
async def iterate_blocking(iterator):
loop = asyncio.get_running_loop()
DONE = object()
def get_next():
try:
return iterator.__next__()
except StopIteration:
return DONE
while True:
obj = await loop.run_in_executor(None, get_next)
if obj is DONE:
break
yield obj
这可以使用next
内置的两个参数形式进一步简化,它与以下内容基本相同get_next
:
async def iterate_blocking(iterator):
loop = asyncio.get_running_loop()
DONE = object()
while True:
obj = await loop.run_in_executor(None, next, iterator, DONE)
if obj is DONE:
break
yield obj
(以上两个示例都未经测试,因此可能存在拼写错误。)
推荐阅读
- c# - 如果字符串为空,我的 Azure 函数应引发异常
- ruby-on-rails - 在 gem 更新后运行 rails s 会返回警告列表 - 警告:已初始化常量 Etc::SC_AIO_LISTIO_MAX
- ios - 如何在 Swift5 中使用逗号(,)设置字符串变量的值?
- json - Go : 如何在 json.Unmarshal 到 struct 时忽略类型不匹配错误?
- docker - Docker 入口点脚本按下标退出
- sql-server - 根据非 NULL 且早于行日期的最接近日期将值插入 NULL 行
- c++ - 字符串中的最小长度单词 (C++)
- facebook - 无法获得 Facebook API 调用的权限
- javascript - 未捕获的 RangeError:无效的数组长度 - JS 引擎错误?查看代码示例
- javascript - 从自定义 Javascript 数组中提取一个值,当它是数组中字符串值的一个组件时