python - Python:异步处理来自其他异步任务的结果的任务
问题描述
我正在尝试从 API获取多个地址的所有交易数据。每个地址可以有几页交易,我只有在我要求第一页时才发现。
我有方法api.get_address_data(address, page)
和api.get_transaction_data(tx)
.
我想做的同步代码如下所示:
def all_transaction_data(addresses):
for address in addresses:
data = api.get_address_data(address, page=0)
transactions = data.transactions
for n in range(1, data.total_pages):
next_page = api.get_address_data(address, page=n)
transactions += next_page.transactions
for tx in data.transactions:
yield api.get_transaction_data(tx)
我不关心收到的交易的顺序(当我准备好所有交易后,我将不得不重新排序)。我可以将所有数据都放在内存中,但这是很多非常小的请求,所以我想尽可能多地并行执行。
实现这一目标的最佳方法是什么?我在玩 asyncio(API 调用在我的控制之下,所以我可以将它们转换async
为大批量交易。我希望在适当的输入数据准备好时立即安排每个处理步骤,并将结果收集到一个大列表中(或从单个生成器产生)。
似乎我需要某种开放式任务队列,其中任务“get-address”获取数据并将一堆“get-pages”任务排入队列,这些任务又将“get-transaction”任务排入队列,只有这些然后被收集到一个结果列表中?
这可以用 asyncio 完成吗?像 gevent 这样的东西会更合适,还是只是一个普通的 ThreadPoolExecutor?有没有比我到目前为止概述的更好的方法?
请注意,我想避免控制流的反转,或者至少将其隐藏为实现细节。即,此代码的调用者应该能够调用for tx in all_transaction_data()
,或者最坏的情况async for
。
解决方案
推荐阅读
- javascript - 带有条件的 Vue.js 禁用按钮不起作用
- amazon-web-services - 我们可以通过标记策略在 CloudFormation 堆栈(级别)上强制执行某些 aws 标签吗?
- sql - 查找订阅日期重叠的用户
- c++ - 了解基于惰性范围的函数的组成
- c++ - 多线程读取导致 Cassandra 会话数据损坏
- expression - 在可视表达式生成器中按组移动 YTD MAX(ADF 中的数据流)
- xamarin.android - vs 2017 和 xamarin.android 中的目标框架版本 9
- python - 用不同的数据类型python覆盖变量是不好的做法吗
- solidity - Solidity 变量定义:(bool sent, )
- django - 如何在 django 模板中拆分字符串