首页 > 解决方案 > 使用 asyncio 异步运行两个并发任务组

问题描述

我正在尝试使用asyncio并面向此博客文章编写程序。我想要做的是同时获取一些 JSON 数据。对于一个输入数据帧。但是,我想在请求的数据可用后立即对其进行进一步处理。

所以基本上有两组任务:

它们或多或少相互独立,但我也想同时运行这组任务。两个任务组完成后,我想进一步处理它们。

我的问题是,如果我的实现在asyncio模式方面设计得当,我只使用了两个收集语句?或者这是否是错误的概念?这里有一个小插曲:

import asyncio
import aiohttp
from aiohttp import ClientSession

async def fetch_json(url: str, session: ClientSession, data: json.dumps) -> Dict:
    resp = await session.get(url=url, headers={"content-type": "application/json"}, data=data)
    resp.raise_for_status()
    logger.info("Got response [%s] for URL: %s", resp.status, url)
    json = await resp.json()
    return json

async def some_calc(url: str, session: ClientSession, data: json.dumps):
    res = await fetch_json(url=url, session=session, data=data)
    return [float(x) for x in res]

async def process_data(df: Dict, url: str, session: ClientSession):
    async with session:
        tasks = []
        for data in df:
            try:
                if df1:
                    task = some_calc(url=url, session=session, data=data)
                else:
                    task = fetch_json(url=url, session=session, data=data)

            except Exception as e:
                # ...
            tasks.append(
                task
            )
        res = await asyncio.gather(*tasks)
    return res

async def bulk_execute(df1, df2):
    url = "http://some.url/"

    async with ClientSession() as session:
        res = await asyncio.gather(process_data(df1, url, session), process_data(df2, url, session))

    return res

if __name__ == "__main__":
    res = asyncio.run(bulk_execute(df1, df2))

标签: pythonpython-asynciopython-3.7aiohttp

解决方案


推荐阅读