首页 > 解决方案 > 在 Dask 数据帧子集上强制局部性

问题描述

我正在尝试在多台机器上分发一个大型 Dask 数据帧,以便(稍后)在数据帧上进行分布式计算。我正在为此使用 dask-distributed。

我看到的所有 dask 分布式示例/文档都是从网络资源(hdfs、s3 等)填充初始数据负载,并且似乎没有将 DAG 优化扩展到负载部分(似乎假设网络负载是必要的邪恶并且只吃初始成本。)这在另一个问题的答案中得到了强调:Dask 是否与 HDFS 通信以优化数据局部性?

但是,我可以看到我们想要这个的情况。例如,如果我们有一个分片数据库 + dask 工作人员位于该数据库的节点上,我们希望仅将本地分片中的记录强制填充到本地 dask 工作人员中。从文档/示例来看,网络纵横交错似乎是一个必然的假设成本。是否可以强制从特定工作人员处获取单个数据帧的部分内容?

我尝试过的替代方法是尝试强制每个工作人员运行一个函数(迭代地提交给每个工作人员),其中该函数仅加载该机器/分片的本地数据。这行得通,而且我有一堆具有相同列架构的最佳本地数据框——但是——现在我没有一个数据框,而是n 个数据框。是否可以跨多台机器合并/融合数据帧,以便有一个数据帧引用,但部分与特定机器具有亲和力(在合理范围内,由任务 DAG 决定)?

标签: daskdask-distributed

解决方案


您可以生成 dask “集合”,例如来自期货和延迟对象的数据帧,它们可以很好地相互操作。

对于每个分区,你知道哪台机器应该加载它,你可以产生如下的未来:

f = c.submit(make_part_function, args, workers={'my.worker.ip'})

dask 客户端在哪里c,地址是您希望看到它发生的机器。你也可以给出allow_other_workers=True这是一个偏好而不是一个要求。

要从此类期货列表中制作数据框,您可以这样做

df = dd.from_delayed([dask.delayed(f) for f in futures])

理想情况下,提供一个meta=,给出预期数据帧的描述。现在,给定分区上的进一步操作将更喜欢安排在已经保存数据的同一个工作人员上。


推荐阅读