首页 > 解决方案 > 中间 dask 计算的批处理结果

问题描述

我有一个大(10 GB)的 CSV 文件,我想加载到 CSV 文件中dask,并为每一行执行一些计算。我还想将经过处理的 CSV 的结果写入 BigQuery,但最好将网络请求分批成组发送到 BigQuery,例如每组 10,000 行,这样我就不会每行产生网络开销。

我一直在查看dask delayed并看到您可以创建任意计算图,但我不确定这是否是正确的方法:如何根据某些组大小(或者可能经过的时间)收集和触发中间计算. 有人可以提供一个简单的例子吗?为简单起见,我们有以下功能:

def change_row(r):
    # Takes 10ms
    r = some_computation(r)
    return r

def send_to_bigquery(rows): 
    # Ideally, in large-ish groups, say 10,000 rows at a time
    make_network_request(rows)

# And here's how I'd use it
import dask.dataframe as dd
df = dd.read_csv('my_large_dataset.csv') # 20 GB
# run change_row(r) for each r in df
# run send_to_big_query(rows) for each appropriate size group based on change_row(r)

谢谢!

标签: dask

解决方案


您可以做的最简单的事情是向 提供块大小参数read_csv,这将使您获得大约正确的每个块的行数。您可能需要测量一些数据或进行实验才能做到这一点。

您的任务的其余部分将与任何其他“对数据框块执行此通用操作”的方式相同:`map_partitions' 方法(docs)。

def alter_and_send(df):
    rows = [change_row(r) for r in df.iterrows()]
    send_to_big_query(rows)
    return df

df.map_partitions(alter_and_send)

基本上,您在逻辑 dask 数据帧的每一块上运行该函数,这些数据帧是真正的 pandas 数据帧。您可能实际上需要在函数中使用 map、apply 或其他数据框方法。

这是一种方法——你并不真正需要地图的“输出”,你可以使用它to_delayed()


推荐阅读