python - 如何将 dask.dataframe 预缓存到所有工作人员和分区以减少通信需求
问题描述
有时它很适合dask.dataframe.map_partitions
用于合并等操作。在某些情况下,在 aleft_df
和 a right_df
using之间进行合并时map_partitions
,我想在执行合并之前进行预缓存right_df
,以减少网络开销/本地改组。有什么明确的方法可以做到这一点吗?感觉应该可以使用 , 或其他一些智能广播中的一个或client.scatter(the_df)
组合client.run(func_to_cache_the_df)
。
left_df
在对一个大得多的大的right_df
(本质上是一个查找表)进行左连接的情况下,这一点尤为突出。感觉这right_df
应该能够读入内存并持久化/分散到合并前的所有工作人员/分区,以减少对跨分区通信的需求,直到最后。我怎样才能分散right_df
成功地做到这一点?
以下是使用 cuDF 和 Dask 进行这种不平衡合并的一个较小示例(但从概念上讲,这与 pandas 和 Dask 相同):
import pandas as pd
import cudf
import dask_cudf
import numpy as np
from dask.distributed import Client
from dask_cuda import LocalCUDACluster
# create a local CUDA cluster
cluster = LocalCUDACluster()
client = Client(cluster)
np.random.seed(12)
nrows_left = 1000000
nrows_right = 1000
left = cudf.DataFrame({'a': np.random.randint(0,nrows_right,nrows_left), 'left_value':np.arange(nrows_left)})
right = cudf.DataFrame({'a': np.arange(nrows_right), 'lookup_val': np.random.randint(0,1000,nrows_right)})
print(left.shape, right.shape) # (1000000, 2) (1000, 2)
ddf_left = dask_cudf.from_cudf(left, npartitions=500)
ddf_right = dask_cudf.from_cudf(right, npartitions=2)
def dask_merge(L, R):
return L.merge(R, how='left', on='a')
result = ddf_left.map_partitions(dask_merge, R=ddf_right).compute()
result.head()
<cudf.DataFrame ncols=3 nrows=5 >
a left_value lookup_val
0 219 1952 822
1 873 1953 844
2 908 1954 142
3 290 1955 810
4 863 1956 910
解决方案
如果您执行以下任何操作,那么事情应该没问题:
- 与单分区 dask 数据框的合并
- 与非 dask 数据框(如 Pandas 或 cuDF)的合并
- 具有非 dask 数据框的 map_partitions(如 Pandas 或 cuDF)
会发生什么:
- 单个分区被推送到单个工作人员
- 在执行期间,一些工作人员将复制该数据,然后其他工作人员将从这些工作人员那里复制,依此类推,将数据传递到树中
- 工人将按预期进行合并
这与预期的一样快。但是,如果您正在执行基准测试之类的操作,并且想要将步骤 1,2 和 3 分开,那么您可以使用client.replicate
:
left = ... # multi-partition dataframe
right = ... # single-partition dataframe
right = right.persist() # make sure it exists in one worker
client.replicate(right) # replicate it across many workers
... proceed as normal
这不会更快,但步骤 1,2 将被拉到复制步骤中。
在您的示例中,它看起来right
有两个分区。您可能希望将其更改为一个。Dask 采用不同的代码路径,map_partitions
在这种情况下本质上就是 .
推荐阅读
- javascript - 在 Django 中提交表单后显示 javascript 样式警报
- javascript - 如何将Javascript数字转换为字符串并将其传递给输入框
- python - MoviePy 动画文本
- python - 如何使用 != 运算符比较两个字符串并将结果存储在名为 my_boolean 的变量中?
- qr-code - 如何使用 Zxing 生成 GS1_QR 码
- php - PHP-GD:试图使文本居中,但水平偏离中心显示
- python - How can I scrape text using selenium by class? Error message: An invalid or illegal selector was specified
- c++ - 如何计算楔形与 3d 空间中射线之间的相交距离
- django - 在大表中使用预取和过滤器优化 Django
- regex - PCRE 查找所有没有文件扩展名的 url,包括 #、? 等