首页 > 解决方案 > 使用 UDF 函数的 Apache Spark 数据帧重新分区

问题描述

如果与 UDF 调用一起重新分区,我对正确使用 Apache Spark 有误解

假设有一个简单的注册 UDF 函数:

public static DataFrame compute(SQLContext sqlContext, DataFrame df) {
    sqlContext.udf().register("compute", new MyUdf(), DataTypes.StringType);
    return df.withColumn("CLIENT", functions.callUDF("compute", new Column("CLIENT")));
}

class MyUdf implements UDF1<String, String> {
Set<String> cache = new HashSet<>();
@Override
public String call(String client) {
    cache.add(client);
    System.out.println("cache size: " + cache.size() + ", obj: " + this.toString() + ", client: " + client);
    return client;
}

}

如您所见,我将传递一个client字段,将其添加到函数的本地缓存中以跟踪其执行情况。

现在我将创建一个由客户端重新分区的数据框:

df = df.repartition(col("CLIENT"));
df = compute(sqlContext, df);
df.show(false);

我在repartition没有第一个numPartitions参数的情况下调用,默认情况下它是 200:

System.out.println(df.toJavaRDD().getNumPartitions()); // will show 200

该函数的日志似乎很好(每个函数都使用它自己的客户端运行,对吧?):

cache size: 1, obj: demo.MyUdf@3a9de0bf, client: cl1
cache size: 1, obj: demo.MyUdf@3a9de0bf, client: cl1
cache size: 1, obj: demo.MyUdf@3a9de0bf, client: cl1
cache size: 1, obj: demo.MyUdf@3a9de0bf, client: cl1
cache size: 1, obj: demo.MyUdf@11633b65, client: cl2
cache size: 1, obj: demo.MyUdf@11633b65, client: cl2
cache size: 1, obj: demo.MyUdf@11633b65, client: cl2

但是,如果我尝试将 a 设置numPartitions为:

df = df.repartition(2, col("CLIENT"));

我看到了“意外”的结果(两个分区的单一功能):

cache size: 1, obj: demo.MyUdf@3a9de0bf, client: cl1
cache size: 1, obj: demo.MyUdf@3a9de0bf, client: cl1
cache size: 1, obj: demo.MyUdf@3a9de0bf, client: cl1
cache size: 1, obj: demo.MyUdf@3a9de0bf, client: cl1
cache size: 2, obj: demo.MyUdf@3a9de0bf, client: cl2
cache size: 2, obj: demo.MyUdf@3a9de0bf, client: cl2
cache size: 2, obj: demo.MyUdf@3a9de0bf, client: cl2

为什么会这样?

标签: javaapache-spark

解决方案


推荐阅读