首页 > 解决方案 > Python AsyncIO 中的异步生成器的产量

问题描述

我有一个简单的类,它利用异步生成器来检索 URL 列表:

import aiohttp
import asyncio
import logging
import sys

LOOP = asyncio.get_event_loop()
N_SEMAPHORE = 3

FORMAT = '[%(asctime)s] - %(message)s'
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format=FORMAT)
logger = logging.getLogger(__name__)

class ASYNC_GENERATOR(object):
    def __init__(self, n_semaphore=N_SEMAPHORE, loop=LOOP):
        self.loop = loop
        self.semaphore = asyncio.Semaphore(n_semaphore)
        self.session = aiohttp.ClientSession(loop=self.loop)

    async def _get_url(self, url):
        """
        Sends an http GET request to an API endpoint
        """

        async with self.semaphore:
            async with self.session.get(url) as response:
                logger.info(f'Request URL: {url} [{response.status}]')
                read_response = await response.read()

                return {
                    'read': read_response,
                    'status': response.status,
                }

    def get_routes(self, urls):
        """
        Wrapper around _get_url (multiple urls asynchronously)

        This returns an async generator
        """

        # Asynchronous http GET requests
        coros = [self._get_url(url) for url in urls]
        futures = asyncio.as_completed(coros)
        for future in futures:
            yield self.loop.run_until_complete(future)

    def close(self):
        self.session._connector.close()

当我执行代码的主要部分时:

if __name__ == '__main__':
    ag = ASYNC_GENERATOR()
    urls = [f'https://httpbin.org/get?x={i}' for i in range(10)]
    responses = ag.get_routes(urls)
    for response in responses:
        response = next(ag.get_routes(['https://httpbin.org/get']))
    ag.close()

日志打印出来:

[2018-05-15 12:59:49,228] - Request URL: https://httpbin.org/get?x=3 [200]
[2018-05-15 12:59:49,235] - Request URL: https://httpbin.org/get?x=2 [200]
[2018-05-15 12:59:49,242] - Request URL: https://httpbin.org/get?x=6 [200]
[2018-05-15 12:59:49,285] - Request URL: https://httpbin.org/get?x=5 [200]
[2018-05-15 12:59:49,290] - Request URL: https://httpbin.org/get?x=0 [200]
[2018-05-15 12:59:49,295] - Request URL: https://httpbin.org/get?x=7 [200]
[2018-05-15 12:59:49,335] - Request URL: https://httpbin.org/get?x=8 [200]
[2018-05-15 12:59:49,340] - Request URL: https://httpbin.org/get?x=4 [200]
[2018-05-15 12:59:49,347] - Request URL: https://httpbin.org/get?x=1 [200]
[2018-05-15 12:59:49,388] - Request URL: https://httpbin.org/get?x=9 [200]
[2018-05-15 12:59:49,394] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,444] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,503] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,553] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,603] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,650] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,700] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,825] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,875] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,922] - Request URL: https://httpbin.org/get [200]

由于responses是异步生成器,我希望它从异步生成器产生一个响应(它应该只在实际产生时发送请求),向没有x参数的端点发送一个单独的请求,然后从异步生成器产生下一个响应. x这应该在带参数的请求和不带参数的请求之间来回切换。相反,它产生来自带有x参数的异步生成器的所有响应,然后是所有没有参数的 https 请求。

当我这样做时会发生类似的事情:

ag = ASYNC_GENERATOR()
urls = [f'https://httpbin.org/get?x={i}' for i in range(10)]
responses = ag.get_routes(urls)
next(responses)
response = next(ag.get_routes(['https://httpbin.org/get']))
ag.close()

日志打印:

[2018-05-15 13:08:38,643] - Request URL: https://httpbin.org/get?x=8 [200]
[2018-05-15 13:08:38,656] - Request URL: https://httpbin.org/get?x=1 [200]
[2018-05-15 13:08:38,681] - Request URL: https://httpbin.org/get?x=3 [200]
[2018-05-15 13:08:38,695] - Request URL: https://httpbin.org/get?x=4 [200]
[2018-05-15 13:08:38,717] - Request URL: https://httpbin.org/get?x=6 [200]
[2018-05-15 13:08:38,741] - Request URL: https://httpbin.org/get?x=2 [200]
[2018-05-15 13:08:38,750] - Request URL: https://httpbin.org/get?x=0 [200]
[2018-05-15 13:08:38,773] - Request URL: https://httpbin.org/get?x=9 [200]
[2018-05-15 13:08:38,792] - Request URL: https://httpbin.org/get?x=7 [200]
[2018-05-15 13:08:38,803] - Request URL: https://httpbin.org/get?x=5 [200]
[2018-05-15 13:08:38,826] - Request URL: https://httpbin.org/get [200]

相反,我想要的是:

[2018-05-15 13:08:38,643] - Request URL: https://httpbin.org/get?x=8 [200]
[2018-05-15 13:08:38,826] - Request URL: https://httpbin.org/get [200]

有时我想在做任何其他事情之前先检索所有响应。但是,有时我想在生成器产生下一个项目之前插入并发出中间请求(即,生成器从分页搜索结果返回结果,并且我想在进入下一页之前处理每个页面的进一步链接)。

我需要改变什么才能达到所需的结果?

标签: pythonpython-3.xpython-asyncioaiohttp

解决方案


撇开是否responses是异步生成器的技术问题(它不是,因为 Python使用术语),您的问题在于as_completed. 并行as_completed启动一堆协程,并提供在它们完成时获取结果的方法。从文档中看,future 并行运行并不是很明显(在以后的版本中进行了改进),但是如果您认为原来的工作是围绕并行执行的基于线程的futures,那么它是有道理的。从概念上讲,异步期货也是如此。concurrent.futures.as_completed

您的代码仅获得第一个(最快到达)结果,然后开始执行其他操作,也使用 asyncio。传递给的剩余协程as_completed不会仅仅因为没有人收集它们的结果而被冻结 - 它们在后台完成它们的工作,一旦完成就可以被await编辑(在你的情况下as_completed,你可以使用里面的代码访问它loop.run_until_complete())。我冒昧地猜测,没有参数的 URL 比只有参数的 URL 需要更长的时间来检索x,这就是为什么它在所有其他协程之后被打印的原因。

换句话说,正在打印的那些日志行意味着它asyncio正在完成它的工作并提供您请求的并行执行!如果您不想要并行执行,那么不要要求它,串行执行它们:

def get_routes(self, urls):
    for url in urls:
        yield loop.run_until_complete(self._get_url(url))

但这是使用 asyncio 的一种糟糕方式——它的主循环是不可重入的,因此为了确保可组合性,您几乎肯定希望循环在顶层只旋转一​​次。这通常使用类似loop.run_until_complete(main())or的结构来完成loop.run_forever()。正如 Martijn 指出的那样,您可以通过制作get_routes一个实际的异步生成器来实现这一点,同时保留不错的生成器 API:

async def get_routes(self, urls):
    for url in urls:
        result = await self._get_url(url)
        yield result

现在您可以拥有一个main()如下所示的协程:

async def main():
    ag = ASYNC_GENERATOR()
    urls = [f'https://httpbin.org/get?x={i}' for i in range(10)]
    responses = ag.get_routes(urls)
    async for response in responses:
        # simulate `next` with async iteration
        async for other_response in ag.get_routes(['https://httpbin.org/get']):
            break
    ag.close()

loop.run_until_complete(main())

推荐阅读