python-3.x - 如何即时处理大量协程返回的值(异步问题)?
问题描述
我有一个非常庞大的 URL 列表,我想抓取它们。起初,我正在考虑做这样的事情:
async def main(username, password):
sem = asyncio.Semaphore(100)
async with aiohttp.ClientSession() as session:
await login(session, username, password)
tasks = [asyncio.create_task(r_search(sem, session, url)) for url in hugenumberofurls]
texts = await asyncio.gather(*tasks)
但是当然这花费了太长时间,我想实时处理结果(解析+写入文件)。实现这一目标的最有效方法是什么?
我是否应该分块 hugenumberofurls,然后执行以下操作:
async def main(username, password):
async with aiohttp.ClientSession() as session:
await login(session, username, password)
sem = asyncio.Semaphore(100)
for chunk in chunk(hugenumberofurls):
tasks = [asyncio.create_task(r_search(sem, session, url)) for url in chunk]
texts = await asyncio.gather(*tasks)
process(texts)
我相信这可能是浪费时间,因为 process() 可能需要几秒钟来继续下载 url...
我很乐意接受任何建议,asyncio 和 aiohttp 对于新手来说非常困难!
解决方案
让我们使用两个模拟操作构建一个工作示例:
import random
import asyncio
async def fetch(url):
"""Simulate a network request"""
await asyncio.sleep(random.random() / 10)
return int(url.split("/")[-1])
def process(chunk):
"""Simulate a CPU-intensive task"""
all(range(1, 10**8))
return sum(chunk)
首先,您需要一种在执行程序中运行 CPU 密集型处理的方法,以避免阻塞程序的其余部分。例如,可以使用ProcessPoolExecutor来并行化计算:
from concurrent.futures import ProcessPoolExecutor
async def process_in_executor(chunk, executor=ProcessPoolExecutor()):
"""Run `process` in parallel using a process pool executor"""
loop = asyncio.get_running_loop()
return await loop.run_in_executor(executor, process, chunk)
然后,您需要一种方法来同时运行 fetch 请求,同时限制连接数并将结果收集到块中。这可以使用asyncio.gather和/或asyncio.Semaphore来完成,但这确实很难做到。
相反,您可以考虑将aiostream库与专为此目的设计的stream.map和stream.chunks运算符一起使用:
from aiostream import stream, pipe
async def main():
"""Perform the following operations:
1. Fetch a bunch of urls concurrently, with a limit of 100 tasks
2. Gather the results into chunks of size 1000
3. Process the chunks in parallel using 4 different processes
4. Sum the results
"""
urls = [f"http://my.url/{i}" for i in range(10000)]
result = await (
stream.iterate(urls)
| pipe.map(fetch, ordered=False, task_limit=100)
| pipe.chunks(1000)
| pipe.map(process_in_executor, ordered=False, task_limit=4)
| pipe.accumulate()
| pipe.print("[progress - {}]")
)
print(f"> sum(range(10000)) = {result}")
该程序应该需要几秒钟才能运行。然后,您必须根据您的用例调整连接限制、块大小和进程数。
免责声明:我是aiostream项目的维护者。
推荐阅读
- python - 如何从 Spotipy 解决此客户端 ID 错误?
- php - 仅显示当前用户的帐单和发货国家/地区名称,而不是国家/地区代码
- c++ - 自 0000 年 1 月 1 日以来,哪些 C++ 函数支持以微秒为单位的时间?
- game-engine - Thimbleweed Park 游戏是用哪种语言制作的>
- c# - C# Convert long to bytes 出错:& 不能应用于 long 和 ulong
- python - Pandas 数据框无法在 docker 中运行
- java - proguard:无法读取 [C:\Program Files\AdoptOpenJDK\jdk-11.0.6.10-hotspot\lib\rt.jar]
- java - 如何在另一个活动中显示从一个活动传递的值?
- python - 如何检查传递给处理多线程的函数的返回值
- elasticsearch - 将 ElasticSearch 从 2 升级到 7.6