python - Python AsyncIO 中的异步生成器的产量
问题描述
我有一个简单的类,它利用异步生成器来检索 URL 列表:
import aiohttp
import asyncio
import logging
import sys
LOOP = asyncio.get_event_loop()
N_SEMAPHORE = 3
FORMAT = '[%(asctime)s] - %(message)s'
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format=FORMAT)
logger = logging.getLogger(__name__)
class ASYNC_GENERATOR(object):
def __init__(self, n_semaphore=N_SEMAPHORE, loop=LOOP):
self.loop = loop
self.semaphore = asyncio.Semaphore(n_semaphore)
self.session = aiohttp.ClientSession(loop=self.loop)
async def _get_url(self, url):
"""
Sends an http GET request to an API endpoint
"""
async with self.semaphore:
async with self.session.get(url) as response:
logger.info(f'Request URL: {url} [{response.status}]')
read_response = await response.read()
return {
'read': read_response,
'status': response.status,
}
def get_routes(self, urls):
"""
Wrapper around _get_url (multiple urls asynchronously)
This returns an async generator
"""
# Asynchronous http GET requests
coros = [self._get_url(url) for url in urls]
futures = asyncio.as_completed(coros)
for future in futures:
yield self.loop.run_until_complete(future)
def close(self):
self.session._connector.close()
当我执行代码的主要部分时:
if __name__ == '__main__':
ag = ASYNC_GENERATOR()
urls = [f'https://httpbin.org/get?x={i}' for i in range(10)]
responses = ag.get_routes(urls)
for response in responses:
response = next(ag.get_routes(['https://httpbin.org/get']))
ag.close()
日志打印出来:
[2018-05-15 12:59:49,228] - Request URL: https://httpbin.org/get?x=3 [200]
[2018-05-15 12:59:49,235] - Request URL: https://httpbin.org/get?x=2 [200]
[2018-05-15 12:59:49,242] - Request URL: https://httpbin.org/get?x=6 [200]
[2018-05-15 12:59:49,285] - Request URL: https://httpbin.org/get?x=5 [200]
[2018-05-15 12:59:49,290] - Request URL: https://httpbin.org/get?x=0 [200]
[2018-05-15 12:59:49,295] - Request URL: https://httpbin.org/get?x=7 [200]
[2018-05-15 12:59:49,335] - Request URL: https://httpbin.org/get?x=8 [200]
[2018-05-15 12:59:49,340] - Request URL: https://httpbin.org/get?x=4 [200]
[2018-05-15 12:59:49,347] - Request URL: https://httpbin.org/get?x=1 [200]
[2018-05-15 12:59:49,388] - Request URL: https://httpbin.org/get?x=9 [200]
[2018-05-15 12:59:49,394] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,444] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,503] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,553] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,603] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,650] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,700] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,825] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,875] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,922] - Request URL: https://httpbin.org/get [200]
由于responses
是异步生成器,我希望它从异步生成器产生一个响应(它应该只在实际产生时发送请求),向没有x
参数的端点发送一个单独的请求,然后从异步生成器产生下一个响应. x
这应该在带参数的请求和不带参数的请求之间来回切换。相反,它产生来自带有x
参数的异步生成器的所有响应,然后是所有没有参数的 https 请求。
当我这样做时会发生类似的事情:
ag = ASYNC_GENERATOR()
urls = [f'https://httpbin.org/get?x={i}' for i in range(10)]
responses = ag.get_routes(urls)
next(responses)
response = next(ag.get_routes(['https://httpbin.org/get']))
ag.close()
日志打印:
[2018-05-15 13:08:38,643] - Request URL: https://httpbin.org/get?x=8 [200]
[2018-05-15 13:08:38,656] - Request URL: https://httpbin.org/get?x=1 [200]
[2018-05-15 13:08:38,681] - Request URL: https://httpbin.org/get?x=3 [200]
[2018-05-15 13:08:38,695] - Request URL: https://httpbin.org/get?x=4 [200]
[2018-05-15 13:08:38,717] - Request URL: https://httpbin.org/get?x=6 [200]
[2018-05-15 13:08:38,741] - Request URL: https://httpbin.org/get?x=2 [200]
[2018-05-15 13:08:38,750] - Request URL: https://httpbin.org/get?x=0 [200]
[2018-05-15 13:08:38,773] - Request URL: https://httpbin.org/get?x=9 [200]
[2018-05-15 13:08:38,792] - Request URL: https://httpbin.org/get?x=7 [200]
[2018-05-15 13:08:38,803] - Request URL: https://httpbin.org/get?x=5 [200]
[2018-05-15 13:08:38,826] - Request URL: https://httpbin.org/get [200]
相反,我想要的是:
[2018-05-15 13:08:38,643] - Request URL: https://httpbin.org/get?x=8 [200]
[2018-05-15 13:08:38,826] - Request URL: https://httpbin.org/get [200]
有时我想在做任何其他事情之前先检索所有响应。但是,有时我想在生成器产生下一个项目之前插入并发出中间请求(即,生成器从分页搜索结果返回结果,并且我想在进入下一页之前处理每个页面的进一步链接)。
我需要改变什么才能达到所需的结果?
解决方案
撇开是否responses
是异步生成器的技术问题(它不是,因为 Python使用术语),您的问题在于as_completed
. 并行as_completed
启动一堆协程,并提供在它们完成时获取结果的方法。从文档中看,future 并行运行并不是很明显(在以后的版本中进行了改进),但是如果您认为原来的工作是围绕并行执行的基于线程的futures,那么它是有道理的。从概念上讲,异步期货也是如此。concurrent.futures.as_completed
您的代码仅获得第一个(最快到达)结果,然后开始执行其他操作,也使用 asyncio。传递给的剩余协程as_completed
不会仅仅因为没有人收集它们的结果而被冻结 - 它们在后台完成它们的工作,一旦完成就可以被await
编辑(在你的情况下as_completed
,你可以使用里面的代码访问它loop.run_until_complete()
)。我冒昧地猜测,没有参数的 URL 比只有参数的 URL 需要更长的时间来检索x
,这就是为什么它在所有其他协程之后被打印的原因。
换句话说,正在打印的那些日志行意味着它asyncio
正在完成它的工作并提供您请求的并行执行!如果您不想要并行执行,那么不要要求它,串行执行它们:
def get_routes(self, urls):
for url in urls:
yield loop.run_until_complete(self._get_url(url))
但这是使用 asyncio 的一种糟糕方式——它的主循环是不可重入的,因此为了确保可组合性,您几乎肯定希望循环在顶层只旋转一次。这通常使用类似loop.run_until_complete(main())
or的结构来完成loop.run_forever()
。正如 Martijn 指出的那样,您可以通过制作get_routes
一个实际的异步生成器来实现这一点,同时保留不错的生成器 API:
async def get_routes(self, urls):
for url in urls:
result = await self._get_url(url)
yield result
现在您可以拥有一个main()
如下所示的协程:
async def main():
ag = ASYNC_GENERATOR()
urls = [f'https://httpbin.org/get?x={i}' for i in range(10)]
responses = ag.get_routes(urls)
async for response in responses:
# simulate `next` with async iteration
async for other_response in ag.get_routes(['https://httpbin.org/get']):
break
ag.close()
loop.run_until_complete(main())
推荐阅读
- c# - 仅在 Mono 中无效的 SOAP 故障消息
- qt - 在 ColumnLayout 周围添加空间的最佳方法?
- r - 如何将 2 个 lapply 函数链接到 R 中的子集数据帧?
- node.js - findall 多对多续集
- ruby - 从此数组创建哈希
- php - 我应该把模型创建代码放在哪里?
- javascript - 嵌套对象:更新具有相同名称的所有属性
- java - 带有mysql db连接的jpa在lagom中不起作用
- angular - 如何在 catcherror 中从订阅返回翻译后的消息?
- java - 如何使用 AngularJs 创建嵌套 Spring Boot 的结构和显示注释