java - Java Spark:使用未知连接列名称连接的数据集的 Spark 错误解决方法
问题描述
我正在使用带有 Java 的 Spark 2.3.1。
我遇到了什么(我认为),是Spark 的这个已知错误。
这是我的代码:
public Dataset<Row> compute(Dataset<Row> df1, Dataset<Row> df2, List<String> columns){
Seq<String> columns_seq = JavaConverters.asScalaIteratorConverter(columns.iterator()).asScala().toSeq();
final Dataset<Row> join = df1.join(df2, columns_seq);
join.show()
join.withColumn("newColumn", abs(col("value1").minus(col("value2")))).show();
return join;
}
我这样称呼我的代码:
Dataset<Row> myNewDF = compute(MyDataset1, MyDataset2, Arrays.asList("field1","field2","field3","field4"));
注意:MyDataset1 和 MyDataset2 是来自同一个 Dataset MyDataset0 的两个数据集,具有多个不同的转换。
在线上join.show()
,我收到以下错误:
2018-08-03 18:48:43 - ERROR main Logging$class - - - failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 235, Column 21: Expression "project_isNull_2" is not an rvalue
org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 235, Column 21: Expression "project_isNull_2" is not an rvalue
at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11821)
at org.codehaus.janino.UnitCompiler.toRvalueOrCompileException(UnitCompiler.java:7170)
at org.codehaus.janino.UnitCompiler.getConstantValue2(UnitCompiler.java:5332)
at org.codehaus.janino.UnitCompiler.access$9400(UnitCompiler.java:212)
at org.codehaus.janino.UnitCompiler$13$1.visitAmbiguousName(UnitCompiler.java:5287)
at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:4053)
...
2018-08-03 18:48:47 - WARN main Logging$class - - - Whole-stage codegen disabled for plan (id=7):
但它不会停止执行,仍然会显示数据集的内容。
然后,上线join.withColumn("newColumn", abs(col("value1").minus(col("value2")))).show();
我得到错误:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Resolved attribute(s) 'value2,'value1 missing from field6#16,field7#3,field8#108,field5#0,field9#4,field10#28,field11#323,value1#298,field12#131,day#52,field3#119,value2#22,field2#35,field1#43,field4#144 in operator 'Project [field1#43, field2#35, field3#119, field4#144, field5#0, field6#16, value2#22, field7#3, field9#4, field10#28, day#52, field8#108, field12#131, value1#298, field11#323, abs(('value1 - 'value2)) AS newColumn#2579]. Attribute(s) with the same name appear in the operation: value2,value1. Please check if the right attribute(s) are used.;;
'Project [field1#43, field2#35, field3#119, field4#144, field5#0, field6#16, value2#22, field7#3, field9#4, field10#28, day#52, field8#108, field12#131, value1#298, field11#323, abs(('value1 - 'value2)) AS newColumn#2579]
+- AnalysisBarrier
...
此错误结束程序。
Mijung Kim 在 Jira 问题上提出的解决方法是通过 toDF(Columns) 创建一个数据集克隆。但在我的情况下,用于连接的列名事先不知道(我只有一个列表),我不能使用这种解决方法。
还有其他方法可以解决这个非常烦人的错误吗?
解决方案
尝试调用此方法:
private static Dataset<Row> cloneDataset(Dataset<Row> ds) {
List<Column> filterColumns = new ArrayList<>();
List<String> filterColumnsNames = new ArrayList<>();
scala.collection.Iterator<StructField> it = ds.exprEnc().schema().toIterator();
while (it.hasNext()) {
String columnName = it.next().name();
filterColumns.add(ds.col(columnName));
filterColumnsNames.add(columnName);
}
ds = ds.select(JavaConversions.asScalaBuffer(filterColumns).seq()).toDF(scala.collection.JavaConverters.asScalaIteratorConverter(filterColumnsNames.iterator()).asScala().toSeq());
return ds;
}
在加入之前的两个数据集上,如下所示:
df1 = cloneDataset(df1);
df2 = cloneDataset(df2);
final Dataset<Row> join = df1.join(df2, columns_seq);
// or ( based on Nakeuh comment )
final Dataset<Row> join = cloneDataset(df1.join(df2, columns_seq));
推荐阅读
- c++ - 使用 C++ 的 openssl 3des
- java - 我的变量 loginSuccesful 自动更改
- php - PHP:对象实例上的 new 运算符创建一个对象实例。为什么?
- javascript - 防止函数在 java 脚本中函数的最后一条语句中调用自身
- angular - 更新到 v8 后,我的代码中的 ng 构建失败并出现“未找到导出”错误
- jquery - 单击按钮图标行为奇怪
- excel - 如何在 VBA 中有效地使用具有多个条件的 If 语句,将用户输入与范围进行比较?
- python - 如何从特殊点切割字符串
- python - 使用 Keras 层 Lambda 时如何修复“NoneType”对象没有属性“_inbound_nodes”
- html - Angular如何使用(更改)检测器通过按钮发布数据?