首页 > 解决方案 > 如何即时处理大量协程返回的值(异步问题)?

问题描述

我有一个非常庞大的 URL 列表,我想抓取它们。起初,我正在考虑做这样的事情:

async def main(username, password):
    sem = asyncio.Semaphore(100)
    async with aiohttp.ClientSession() as session:
        await login(session, username, password)
        tasks = [asyncio.create_task(r_search(sem, session, url)) for url in hugenumberofurls]
        texts = await asyncio.gather(*tasks)

但是当然这花费了太长时间,我想实时处理结果(解析+写入文件)。实现这一目标的最有效方法是什么?

我是否应该分块 hugenumberofurls,然后执行以下操作:

async def main(username, password):
    async with aiohttp.ClientSession() as session:
        await login(session, username, password)
        sem = asyncio.Semaphore(100)
        for chunk in chunk(hugenumberofurls):
            tasks = [asyncio.create_task(r_search(sem, session, url)) for url in chunk]
            texts = await asyncio.gather(*tasks)
            process(texts)

我相信这可能是浪费时间,因为 process() 可能需要几秒钟来继续下载 url...

我很乐意接受任何建议,asyncio 和 aiohttp 对于新手来说非常困难!

标签: python-3.xpython-asynciopython-3.7aiohttp

解决方案


让我们使用两个模拟操作构建一个工作示例:

import random
import asyncio

async def fetch(url):
    """Simulate a network request"""
    await asyncio.sleep(random.random() / 10)
    return int(url.split("/")[-1])


def process(chunk):
    """Simulate a CPU-intensive task"""
    all(range(1, 10**8))
    return sum(chunk)

首先,您需要一种在执行程序中运行 CPU 密集型处理的方法,以避免阻塞程序的其余部分。例如,可以使用ProcessPoolExecutor来并行化计算:

from concurrent.futures import ProcessPoolExecutor

async def process_in_executor(chunk, executor=ProcessPoolExecutor()):
    """Run `process` in parallel using a process pool executor"""
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(executor, process, chunk)

然后,您需要一种方法来同时运行 fetch 请求,同时限制连接数并将结果收集到块中。这可以使用asyncio.gather和/或asyncio.Semaphore来完成,但这确实很难做到。

相反,您可以考虑将aiostream库与专为此目的设计的stream.mapstream.chunks运算符一起使用:

from aiostream import stream, pipe

async def main():
    """Perform the following operations:
    1. Fetch a bunch of urls concurrently, with a limit of 100 tasks
    2. Gather the results into chunks of size 1000
    3. Process the chunks in parallel using 4 different processes
    4. Sum the results
    """
    urls = [f"http://my.url/{i}" for i in range(10000)]
    result = await (
        stream.iterate(urls)
        | pipe.map(fetch, ordered=False, task_limit=100)
        | pipe.chunks(1000)
        | pipe.map(process_in_executor, ordered=False, task_limit=4)
        | pipe.accumulate()
        | pipe.print("[progress - {}]")
    )
    print(f"> sum(range(10000)) = {result}")

该程序应该需要几秒钟才能运行。然后,您必须根据您的用例调整连接限制、块大小和进程数。

免责声明:我是aiostream项目的维护者。


推荐阅读