python - 尽管使用了 asyncio.Semaphore,但仍获得“ValueError:select() 中的文件描述符过多”
问题描述
我正在向 Azure Maps 提出一些请求。我有一个订阅密钥 ( subscriptionKey
) 和一个我要查找的地址列表 ( addresses
):
query_template = 'https://atlas.microsoft.com/search/address/json?&subscription-key={}&api-version=1.0&language=en-US&query={}'
queries = [query_template.format(subscriptionKey, address) for address in addresses]
我来自这个问题(不必阅读它来理解以下内容)并且在我的 1k 查询示例中一切正常。但是,当我尝试 10k 查询时,我得到了ValueError: too many file descriptors in select()
. 我从这里添加了一些答案,现在我的代码如下所示:
import asyncio
from aiohttp import ClientSession
from ssl import SSLContext
from sys import platform
import nest_asyncio
nest_asyncio.apply()
# Function to get a JSON from the result of a query
async def fetch(url, session):
async with session.get(url, ssl=SSLContext()) as response:
return await response.json()
# Function to run 'fetch()' with a Semaphore and check that the result is a dictionary (JSON)
async def fetch_sem(sem, attempts, url, session):
semaphore = asyncio.Semaphore(sem)
async with semaphore:
for _ in range(attempts):
result = await fetch(url, session)
if isinstance(result, dict):
break
return result
# Function to search for all queries
async def fetch_all(sem, attempts, urls):
async with ClientSession() as session:
return await asyncio.gather(*[fetch_sem(sem, attempts, url, session) for url in urls], return_exceptions=True)
# Making the queries
if __name__ == '__main__':
if platform == 'win32':
loop = asyncio.ProactorEventLoop()
asyncio.set_event_loop(loop)
loop = asyncio.get_event_loop()
results = loop.run_until_complete(fetch_all(1000, 3, queries))
请注意,我同时包含了asyncio.Semaphore
和asyncio.ProactorEventLoop()
。但尽管有这些补充,我仍然得到ValueError: too many file descriptors in select()
.
我能在这个问题上得到一些帮助吗?谢谢!
解决方案
信号量的目的是计算fetch
当前正在运行的操作数量并强制执行上限。这就是为什么你需要一个信号量:
您可以在其中创建它fetch_all
并传递给fetch_sem
:
async def fetch_sem(semaphore, attempts, url, session):
async with semaphore:
...
return result
async def fetch_all(limit, attempts, urls):
semaphore = asyncio.Semaphore(limit)
async with ClientSession() as session:
return await asyncio.gather(*[fetch_sem(semaphore, attempts, url, session) for url in urls], return_exceptions=True)
....
results = loop.run_until_complete(fetch_all(1000, 3, queries))
推荐阅读
- kubernetes - 是否可以在 gcp 永久磁盘上编辑文件?
- azure - 在可能存在多个选项的情况下,如何创建策略以强制标记?
- java - ArrayDeque<>(int capacity) - Scanner.nextInt() == 4 和 (int) 4 有什么区别?
- python - button.when_pressed 中引用函数的返回值
- javascript - 无法使用 JSON 数据作为图表的输入在 Codeigniter 页面上显示 Morris.JS 图表
- javascript - 使用 Fetch API 调用带有 Auth Token 的 URL
- java - 有没有办法通过硒自动化在gmail中撰写邮件?
- swift - 如何将空输入值更改为空数组?
- sql - 在 postgres 中转换时间戳格式
- azure - 从 azure 存储容器下载多个小文件时速度慢、延迟高