首页 > 解决方案 > Python:异步处理来自其他异步任务的结果的任务

问题描述

我正在尝试从 API获取多个地址的所有交易数据。每个地址可以有几页交易,我只有在我要求第一页时才发现。

我有方法api.get_address_data(address, page)api.get_transaction_data(tx).

我想做的同步代码如下所示:

def all_transaction_data(addresses):
    for address in addresses:
        data = api.get_address_data(address, page=0)
        transactions = data.transactions
        for n in range(1, data.total_pages):
            next_page = api.get_address_data(address, page=n)
            transactions += next_page.transactions
        for tx in data.transactions:
            yield api.get_transaction_data(tx)

我不关心收到的交易的顺序(当我准备好所有交易后,我将不得不重新排序)。我可以将所有数据都放在内存中,但这是很多非常小的请求,所以我想尽可能多地并行执行。

实现这一目标的最佳方法是什么?我在玩 asyncio(API 调用在我的控制之下,所以我可以将它们转换async为大批量交易。我希望在适当的输入数据准备好时立即安排每个处理步骤,并将结果收集到一个大列表中(或从单个生成器产生)。

似乎我需要某种开放式任务队列,其中任务“get-address”获取数据并将一堆“get-pages”任务排入队列,这些任务又将“get-transaction”任务排入队列,只有这些然后被收集到一个结果列表中?

这可以用 asyncio 完成吗?像 gevent 这样的东西会更合适,还是只是一个普通的 ThreadPoolExecutor?有没有比我到目前为止概述的更好的方法?

请注意,我想避免控制流的反转,或者至少将其隐藏为实现细节。即,此代码的调用者应该能够调用for tx in all_transaction_data(),或者最坏的情况async for

标签: pythonasynchronousparallel-processing

解决方案


推荐阅读