首页 > 解决方案 > 使用 SparkSession 读取、过滤和计算 CSV 文件的行数时如何处理 NullPointerException?

问题描述

我正在尝试使用sparkSession并计算行数并在控制台上打印值来读取存储在 HDFS 上的 CSV 文件。但是,我NullPointerException在计算计数时不断得到。下面是代码片段,

val validEmployeeIds = Set("12345", "6789")

val count =  sparkSession
    .read
    .option("escape", "\"")
    .option("quote", "\"")
    .csv(inputPath)
    .filter(row => validEmployeeIds.contains(row.getString(0)))
    .distinct()
    .count()

println(count)

我得到一个 NPE 完全在.filter条件。如果我在代码中删除.filter,它运行良好并打印计数。我该如何处理这个 NPE?

inputPath是一个包含多个 CSV 文件的文件夹。每个 CSV 文件有两列,一列代表 Id,另一列代表员工姓名。CSV 提取示例如下:

12345,Employee1
AA888,Employee2

我正在使用 Spark 2.3.1 版。

标签: apache-sparkapache-spark-sqlapache-spark-dataset

解决方案


尝试使用isin功能。

import spark.implicits._

val validEmployeeIds = List("12345", "6789")

val df =  // Read CSV

df.filter('_c0.isin(validEmployeeIds:_*)).distinct().count()

推荐阅读