首页 > 解决方案 > 尽管使用了 asyncio.Semaphore,但仍获得“ValueError:select() 中的文件描述符过多”

问题描述

我正在向 Azure Maps 提出一些请求。我有一个订阅密钥 ( subscriptionKey) 和一个我要查找的地址列表 ( addresses):

query_template = 'https://atlas.microsoft.com/search/address/json?&subscription-key={}&api-version=1.0&language=en-US&query={}'
queries = [query_template.format(subscriptionKey, address) for address in addresses]

我来自这个问题(不必阅读它来理解以下内容)并且在我的 1k 查询示例中一切正常。但是,当我尝试 10k 查询时,我得到了ValueError: too many file descriptors in select(). 我从这里添加了一些答案,现在我的代码如下所示:

import asyncio
from aiohttp import ClientSession
from ssl import SSLContext
from sys import platform
import nest_asyncio
nest_asyncio.apply()

# Function to get a JSON from the result of a query
async def fetch(url, session):
    async with session.get(url, ssl=SSLContext()) as response:
        return await response.json()

# Function to run 'fetch()' with a Semaphore and check that the result is a dictionary (JSON)
async def fetch_sem(sem, attempts, url, session):
    semaphore = asyncio.Semaphore(sem)
    async with semaphore:
        for _ in range(attempts):
            result = await fetch(url, session)
            if isinstance(result, dict):
                break
        return result

# Function to search for all queries
async def fetch_all(sem, attempts, urls):
    async with ClientSession() as session:
        return await asyncio.gather(*[fetch_sem(sem, attempts, url, session) for url in urls], return_exceptions=True)

# Making the queries
if __name__ == '__main__':
    if platform == 'win32':
        loop = asyncio.ProactorEventLoop()
        asyncio.set_event_loop(loop)
    loop = asyncio.get_event_loop()
    results = loop.run_until_complete(fetch_all(1000, 3, queries))

请注意,我同时包含了asyncio.Semaphoreasyncio.ProactorEventLoop()。但尽管有这些补充,我仍然得到ValueError: too many file descriptors in select().

我能在这个问题上得到一些帮助吗?谢谢!

标签: pythonasynchronouspython-asynciosynchronousaiohttp

解决方案


信号量的目的是计算fetch当前正在运行的操作数量并强制执行上限。这就是为什么你需要一个信号量:

您可以在其中创建它fetch_all并传递给fetch_sem

async def fetch_sem(semaphore, attempts, url, session):
    async with semaphore:
        ... 
        return result

async def fetch_all(limit, attempts, urls):
    semaphore = asyncio.Semaphore(limit)
    async with ClientSession() as session:
        return await asyncio.gather(*[fetch_sem(semaphore, attempts, url, session) for url in urls], return_exceptions=True)

....
results = loop.run_until_complete(fetch_all(1000, 3, queries))

推荐阅读