java - 从列表中创建数据帧结果抛出 NullPointerException
问题描述
我正在使用 spark-sql-2.4.1 版本。我有一个类似下面的代码。
val dataDs = ///have dataset
val part_dataDs = dataDs.repartition(col("fieldX"));
StructType schemaType = part_dataDs.schema();
part_part_dataDs.foreachPartition(itr ->{
Iterable<Row> rowIt = () -> itr;
List<Row> objs = StreamSupport.stream(rowIt.spliterator(), false)
.collect(Collectors.toList());
System.out.println("inrow.length: " + objs.size());
Dataset<Row> partitionData = sparkSession.createDataFrame(objs, schemaType);
partitionData.show;
}
错误 :
[Executor task launch worker for task 418] ERROR org.apache.spark.executor.Executor - Exception in task 1.0 in stage 21.0 (TID 418)
java.lang.NullPointerException
at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:143)
at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:141)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:77)
at org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:380)
解决方案
所以,你似乎误解了一些基本的东西。这两个功能:foreachPartition
和mapPartition
对数据集中的每个分区进行操作。你的 - itr
变量
part_part_dataDs.foreachPartition(itr -> ....
指的是分区的迭代器。您可以使用此迭代器来迭代行列表,就像它是字符串列表一样。
原则上,你可以这样写:
part_part_dataDs.foreachPartition(itr ->{
itr.foreach(row -> {
System.out.println(row.getString(0));
})
})
尽管我必须强调这段代码根本没有意义。该println
语句将在某个随机工作节点上执行,因此您不会看到它,除非您在单个节点上运行。此外,这个例子应该简单地使用foreach
而不是foreachPartition
,但由于这似乎是一个人为的玩具例子,我无法判断你是否确实需要foreachPartition
。
推荐阅读
- javafx - javafx中paintComponent的替代品
- orocommerce - Admin 中是否有任何方法可以知道谁在线并在该站点登录?
- python - 努力让“获取绝对网址”工作(Python - Django)
- spring-boot - 将 cognito 与 spring-boot 微服务集成
- android - 滚动视图在片段上重叠
- sql - 通过比较不同的行和相同的列从表中获取数据
- c - 如何在 C 程序中评估增量运算符?
- javascript - 如何防止物体堆积?
- python - 删除具有多个列值的行
- scala - Scala 项目中的 java.lang.ClassNotFoundException