首页 > 解决方案 > 当 aiohttp.ClientSession 异步失败时如何重试任务

问题描述

我正在努力理解这种行为,因为我是 Python 中异步函数的新手。

我正在尝试创建这个简单的下载工具并且我有这个功能

async def download_all_pages(sites):
    print('Running download all pages')
    try:
        async with aiohttp.ClientSession() as session:
            tasks = [asyncio.ensure_future(safe_download_page(session,url)) for url in sites]
            await asyncio.gather(*tasks, return_exceptions = True)
            try:
                await asyncio.sleep(0.25)
            except asyncio.CancelledError:
                print("Got CancelledError")
    except (aiohttp.ServerDisconnectedError, aiohttp.ClientResponseError,aiohttp.ClientConnectorError) as s:
        print("Oops, the server connection was dropped before we finished.")
        print(s)

我像下面这样初始化这个函数:

try:
    loop.run_until_complete(download_all_pages([url+'/'+str(i) for i in range(1, nb_pages+1)]))
    loop.run_until_complete(download_all_sites([result['href'] for result in results]))
finally:
    loop.run_until_complete(loop.shutdown_asyncgens())
    loop.close()
print('Finished at '+str(datetime.timestamp(datetime.now())))

每当我遇到错误时,在这个例子中主要是 aiohttp.ServerDisconnectedError; 输出显示

Oops, the server connection was dropped before we finished.
Server disconnected
Finished at 1606440807.007339
Task was destroyed but it is pending!
Task was destroyed but it is pending!
Task was destroyed but it is pending!
Task was destroyed but it is pending!
Task was destroyed but it is pending!
Task was destroyed but it is pending!

......只有一百万Task was destroyed but it is pending!

所以当这个错误发生时,我不希望函数完成,因为还有很多任务要完成;因此错误任务被破坏,但它正在等待处理!.

正如您所看到的,它甚至在调用loop.run_until_complete(download_all_sites([result['href'])之前调用了print('Finished at' ) ;它似乎完全退出了整个脚本。(编辑:我认为我发现了为什么会发生这种情况。由于上面的 try:,因为它失败了,所以它直接进入 finally: 子句,因此破坏了待处理的任务。仍然存在如何避免整个断开连接问题的问题)

您知道如何安全地重试出现 aiohttp.ServerDisconnectedError 错误的任务吗?

这与不使用有关if __name__ == "__main__":吗?

标签: pythonasynchronousasync-awaitpython-asyncioaiohttp

解决方案


这与不使用有关if __name__ == "__main__":吗?

这与不使用if __name__ == "__main__". 它与没有在正确的地方处理异常有关。asyncio.gather()启动给定任务并返回其结果的元组。如果这些任务中的任何一个引发异常,gather()则立即引发相同的异常,而无需等待其余任务完成。

您应该在未显示的函数中处理异常,safe_download_page. 在那里使用try,捕获您可以从中恢复的与 aiohttp 相关的异常,然后重试并重试(必要时使用循环,在迭代之间休眠)以防出错。像这样的东西(未经测试):

async def download_all_pages(sites):
    print('Running download all pages')
    async with aiohttp.ClientSession() as session:
        tasks = [asyncio.ensure_future(safe_download_page(session,url)) for url in sites]
        await asyncio.gather(*tasks)
        try:
            await asyncio.sleep(0.25)
        except asyncio.CancelledError:
            print("Got CancelledError")

async def safe_download_page(session, url):
    while True:
        try:
            async with sem:
                await download_page(session, url)
                break
        except (aiohttp.ServerDisconnectedError, aiohttp.ClientResponseError,aiohttp.ClientConnectorError) as s:
            print("Oops, the server connection was dropped on ", url, ": ", s)
            await asyncio.sleep(1)  # don't hammer the server

推荐阅读