python - 使用 Dask 将中间体和结果写入文件
问题描述
我有一个数据脚本应用程序,它使用 dask 遍历数据库并产生一些中间体,然后将这些中间体组合起来产生结果。现在我想有效地写出中间体和结果,但正如你在下面看到的,我只发现了一种非常低效的方法,你计算的中间体多于中间体。
import dask.bag as db
from other_functions import *
input = db.read_text(file1)
processing_parameter = parse_mapping_parameters(file2)
intermediates = []
for p in mapping_parameter:
intermediate = input.map(lambda x: process(x, p))
intermediates.append(intermediate)
products = intermediates.pop(0)
for intermediate in intermediates:
products = product.products(i)
result = products.map(calc_result)
for i, intermediate in enumerate(intermediates):
intermediate.to_textfiles(f'./data/intermediate_{i}.*.txt')
result.to_textfiles(f'./data/result.*.txt')
我看到的另一种方法是将中间体写入文件,然后使用单独的脚本将它们再次读入内存并生成结果,但在 IO 方面也感觉效率低下。有没有更好的方法来做到这一点?
解决方案
最终以这种方式解决它:
import dask.bag as db
from other_functions import *
input = db.read_text(file1)
processing_parameter = parse_mapping_parameters(file2)
to_compute = []
intermediates = []
for i, p in enumerate(mapping_parameter):
intermediate = input.map(lambda x: process(x, p))
to_compute.append(
intermediate.to_textfiles(f'./data/intermediate_{i}.*.txt', compute=False)
intermediates.append(intermediate)
products = intermediates.pop(0)
for intermediate in intermediates:
products = product.products(i)
result = products.map(calc_result)
to_compute.append(result.to_textfiles(f'./data/result.*.txt', compute=False))
dask.compute(*to_compute)
没有意识到 to_text 方法有一个参数可以让你得到一个懒惰的作家。一旦我看到它有点明显。不过,不确定这有多有效。
推荐阅读
- c# - C# 当前时钟速度显示
- apache-kafka - 为什么Kafka消费者更新到1.1后输出INVALID_FETCH_SESSION_EPOCH?
- python-3.x - 张量流概率中非负参数的负值
- python - 查找匹配字符串的最pythonic方法
- css - Vuetify v-radio 最好的 CSS 标签边距值是多少?
- c - C Socket 程序不打印所需的输出(无错误)
- c# - 如何从 SQL 命令中的 SELECT 进行更新?
- postgresql - 存储 ID 数组以及如何在选择输出中正确解压缩它们
- reactjs - 在电子中同时运行前端和后端
- r - 在 R 中合并 1000 个 HTML 文件