首页 > 解决方案 > 从列表中创建数据帧结果抛出 NullPointerException

问题描述

我正在使用 spark-sql-2.4.1 版本。我有一个类似下面的代码。

    val dataDs = ///have dataset

    val part_dataDs = dataDs.repartition(col("fieldX"));

    StructType schemaType = part_dataDs.schema();

    part_part_dataDs.foreachPartition(itr ->{

    Iterable<Row> rowIt = () -> itr;
    List<Row> objs = StreamSupport.stream(rowIt.spliterator(), false)
                      .collect(Collectors.toList());

    System.out.println("inrow.length: " + objs.size());

    Dataset<Row> partitionData = sparkSession.createDataFrame(objs, schemaType);

    partitionData.show;

}

错误 :

[Executor task launch worker for task 418] ERROR org.apache.spark.executor.Executor - Exception in task 1.0 in stage 21.0 (TID 418)
java.lang.NullPointerException
at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:143)
at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:141)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:77)
at org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:380)

标签: javascaladataframeapache-sparkapache-spark-sql

解决方案


所以,你似乎误解了一些基本的东西。这两个功能:foreachPartitionmapPartition对数据集中的每个分区进行操作。你的 - itr变量

part_part_dataDs.foreachPartition(itr -> .... 

指的是分区的迭代器。您可以使用此迭代器来迭代行列表,就像它是字符串列表一样。

原则上,你可以这样写:

part_part_dataDs.foreachPartition(itr ->{
  itr.foreach(row -> {
    System.out.println(row.getString(0));
  })
})

尽管我必须强调这段代码根本没有意义。该println语句将在某个随机工作节点上执行,因此您不会看到它,除非您在单个节点上运行。此外,这个例子应该简单地使用foreach而不是foreachPartition,但由于这似乎是一个人为的玩具例子,我无法判断你是否确实需要foreachPartition


推荐阅读