dask - 中间 dask 计算的批处理结果
问题描述
我有一个大(10 GB)的 CSV 文件,我想加载到 CSV 文件中dask
,并为每一行执行一些计算。我还想将经过处理的 CSV 的结果写入 BigQuery,但最好将网络请求分批成组发送到 BigQuery,例如每组 10,000 行,这样我就不会每行产生网络开销。
我一直在查看dask delayed
并看到您可以创建任意计算图,但我不确定这是否是正确的方法:如何根据某些组大小(或者可能经过的时间)收集和触发中间计算. 有人可以提供一个简单的例子吗?为简单起见,我们有以下功能:
def change_row(r):
# Takes 10ms
r = some_computation(r)
return r
def send_to_bigquery(rows):
# Ideally, in large-ish groups, say 10,000 rows at a time
make_network_request(rows)
# And here's how I'd use it
import dask.dataframe as dd
df = dd.read_csv('my_large_dataset.csv') # 20 GB
# run change_row(r) for each r in df
# run send_to_big_query(rows) for each appropriate size group based on change_row(r)
谢谢!
解决方案
您可以做的最简单的事情是向 提供块大小参数read_csv
,这将使您获得大约正确的每个块的行数。您可能需要测量一些数据或进行实验才能做到这一点。
您的任务的其余部分将与任何其他“对数据框块执行此通用操作”的方式相同:`map_partitions' 方法(docs)。
def alter_and_send(df):
rows = [change_row(r) for r in df.iterrows()]
send_to_big_query(rows)
return df
df.map_partitions(alter_and_send)
基本上,您在逻辑 dask 数据帧的每一块上运行该函数,这些数据帧是真正的 pandas 数据帧。您可能实际上需要在函数中使用 map、apply 或其他数据框方法。
这是一种方法——你并不真正需要地图的“输出”,你可以使用它to_delayed()
。
推荐阅读
- python - python 中的 json.dump() 是否重写或附加 JSON 文件
- java - 如何将 Java jsp 数组设置为我的 Index.html 表
- c++ - Qt:.pro 文件丢失?
- java - 无法使用 System.currentTimeMillis() 在 Android 中计算时间差
- python - ValueError:无法将大小为 1 的数组重塑为形状 (1,4)
- python-3.x - Pytesseract 始终错误地读取特定数字
- python - '无法连接到任何服务器' 。拒绝连接
- c - 将 c 中的数字递减作为参数
- apache-spark - 使用 Spark submit Operator 将气流连接到 Spark 作业
- python - Python3 控制流语句 elif 不允许在它上面工作