java - java.lang.RuntimeException:编码时出错:java.lang.ArrayIndexOutOfBoundsException:1
问题描述
当我尝试加入来自数据库和 csv 文件的两个数据集时出现错误,错误消息如下所示:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 14.0 failed 1 times, most recent failure: Lost task 0.0 in stage 14.0 (TID 14, localhost, executor driver): java.lang.RuntimeException: Error while encoding: java.lang.ArrayIndexOutOfBoundsException: 1
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, targetString), StringType), true, false) AS targetString#205
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, deviceName), StringType), true, false) AS deviceName#206
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, alarmDetectionCode), StringType), true, false) AS alarmDetectionCode#207
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:292)
at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:593)
at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:593)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write
当 spark 应用程序加入具有不同模式的两个数据集时,看起来好像发生了不匹配,但我不知道它是如何发生的。我的java代码是这样的:
Dataset result = null;
result = deviceInfoDataset.join(searchInfo,deviceInfoDataset.col("deviceName").equalTo(searchInfo.col("deviceName")));
result.show();
数据集架构:
device
+--------+----------+----------+
|ctgry_cd|deviceInfo|deviceName|
+--------+----------+----------+
searchinfo
+------------+----------+------------------+
|targetString|deviceName|alarmDetectionCode|
+------------+----------+------------------+
解决方案
这个问题似乎比我想象的要复杂。在我的场合,原因有两个。1.我的数据集有一个来自 csv 的空行。在这种情况下,我可以使用以下代码创建并显示此数据集:
SparkSession ss = sparkContextManager.createThreadLocalSparkSession(functionId);
JavaSparkContext jsc = new JavaSparkContext(ss.sparkContext());
for (String fieldName : columns) {
StructField field = DataTypes
.createStructField(fieldName, DataTypes.StringType, true);
fields.add(field);
}
StructType schema = DataTypes.createStructType(fields);
List<String[]> tmpContent = LocalFileUtilsCustomize.readCsv(tempPath);
List<Row> content = jsc.parallelize(tmpContent).map(l -> RowFactory.create((Object[])l)).collect();
Dataset<Row> searchInfo= ss.createDataFrame(content,schema);
searchInfo.show();
但是当我尝试加入两个数据集并显示它们时,我得到了这个错误。然后我尝试删除空行,但仍然出错。至少,我意识到,即使我设置了“nullable=true”,我也必须确保 csv 的所有行都具有相同数量的“模式”列。所以这个问题的解决方案是这样的:
SparkSession ss = sparkContextManager.createThreadLocalSparkSession(functionId);
JavaSparkContext jsc = new JavaSparkContext(ss.sparkContext());
for (String fieldName : columns) {
StructField field = DataTypes
.createStructField(fieldName, DataTypes.StringType, true);
fields.add(field);
}
StructType schema = DataTypes.createStructType(fields);
List<String[]> tmpContent = LocalFileUtilsCustomize.readCsv(tempPath);
List<Row> content = new ArrayList<>();
for(String[] s :tmpContent) {
Row r = null;
if(s[0].isEmpty() ) {
continue;
}
if(s.length < columns.size()) {
String[] tmpS = new String[columns.size()];
System.arraycopy(s, 0, tmpS, 0, s.length);
r = RowFactory.create((Object[])tmpS);
}else {
r = RowFactory.create((Object[])s);
}
content.add(r);
}
Dataset<Row> searchInfo= ss.createDataFrame(content,schema);
searchInfo.show();
Dataset result = null;
result = deviceInfoDataset.join(searchInfo,deviceInfoDataset.col("deviceName").equalTo(searchInfo.col("deviceName")));
result.show();
推荐阅读
- html - 如何使 Bootstrap 容器的行背景变为 100% 窗口宽度?
- mysql - 在 MySQL 中的大表中按范围分组
- wordpress - 在一个页面上显示多个产品的每个产品显示一个元项目(即不仅仅是单个产品页面)
- ios - 创建事件后将信息推送到 Firebase(Firebase-Realtime-Database)
- oracle - 如何显示来自对象的数据
- mongodb - mongoexport 错误:失败:解析失败 + 无法识别的字段“快照”
- java - 如何判断服务器或客户端身份验证是否以两种方式 SSL 握手失败
- c# - 如何在不转换时间的情况下更改 ZonedDateTime 的 Timzone
- mysql - 根据单元格值更新 MySQL 中的列
- wordpress - Wordpress Gutenberg Anchor 对动态块的支持