java - 使用 UDF 函数的 Apache Spark 数据帧重新分区
问题描述
如果与 UDF 调用一起重新分区,我对正确使用 Apache Spark 有误解
假设有一个简单的注册 UDF 函数:
public static DataFrame compute(SQLContext sqlContext, DataFrame df) {
sqlContext.udf().register("compute", new MyUdf(), DataTypes.StringType);
return df.withColumn("CLIENT", functions.callUDF("compute", new Column("CLIENT")));
}
class MyUdf implements UDF1<String, String> {
Set<String> cache = new HashSet<>();
@Override
public String call(String client) {
cache.add(client);
System.out.println("cache size: " + cache.size() + ", obj: " + this.toString() + ", client: " + client);
return client;
}
}
如您所见,我将传递一个client
字段,将其添加到函数的本地缓存中以跟踪其执行情况。
现在我将创建一个由客户端重新分区的数据框:
df = df.repartition(col("CLIENT"));
df = compute(sqlContext, df);
df.show(false);
我在repartition
没有第一个numPartitions
参数的情况下调用,默认情况下它是 200:
System.out.println(df.toJavaRDD().getNumPartitions()); // will show 200
该函数的日志似乎很好(每个函数都使用它自己的客户端运行,对吧?):
cache size: 1, obj: demo.MyUdf@3a9de0bf, client: cl1
cache size: 1, obj: demo.MyUdf@3a9de0bf, client: cl1
cache size: 1, obj: demo.MyUdf@3a9de0bf, client: cl1
cache size: 1, obj: demo.MyUdf@3a9de0bf, client: cl1
cache size: 1, obj: demo.MyUdf@11633b65, client: cl2
cache size: 1, obj: demo.MyUdf@11633b65, client: cl2
cache size: 1, obj: demo.MyUdf@11633b65, client: cl2
但是,如果我尝试将 a 设置numPartitions
为:
df = df.repartition(2, col("CLIENT"));
我看到了“意外”的结果(两个分区的单一功能):
cache size: 1, obj: demo.MyUdf@3a9de0bf, client: cl1
cache size: 1, obj: demo.MyUdf@3a9de0bf, client: cl1
cache size: 1, obj: demo.MyUdf@3a9de0bf, client: cl1
cache size: 1, obj: demo.MyUdf@3a9de0bf, client: cl1
cache size: 2, obj: demo.MyUdf@3a9de0bf, client: cl2
cache size: 2, obj: demo.MyUdf@3a9de0bf, client: cl2
cache size: 2, obj: demo.MyUdf@3a9de0bf, client: cl2
为什么会这样?
解决方案
推荐阅读
- c# - 将模型属性从 Html 表单发送到 [HttpPost] 控制器时出现问题
- javascript - 使用 JS 向父级添加子级
- arraylist - 进行“列表”添加的语言
- botframework - 如何暂时禁用机器人的语音?
- c# - 将段落或文本框附加到 RichTextBox
- amazon-ec2 - 使用非标准 centos 用户名在 Centos 7 上安装 AWS CLI V2 时出现问题 - 权限被拒绝。需要 Sudo
- javascript - Discord 机器人命令错误
- sql - 为单个字段更新 postgres 创建自动更新触发器
- python - 在递归函数中附加后,Python全局列表不断被清除
- node.js - 测试 Firestore 安全规则:为什么即使在安全规则中的 debug 语句之后,firestore-debug.log 也是空的