首页 > 解决方案 > Spark Dataframe 中的过滤操作

问题描述

我有一个 Spark 数据框,我想根据特定列的匹配值从中选择几行/记录。我想我可以在地图转换中使用过滤器操作或选择操作来做到这一点。

但是,我想针对那些在应用过滤器时未选择的行/记录更新状态列。

在应用过滤器操作时,我将返回一个包含匹配记录的新数据框。

那么,如何知道和更新未被选中的行的列值呢?

标签: apache-sparkapache-spark-sql

解决方案


在应用过滤器操作时,您将获得匹配记录的新数据框。

然后,您可以在Scala中使用except函数从输入数据框中获取不匹配的记录。

scala> val inputDF = Seq(("a", 1),("b", 2), ("c", 3), ("d", 4), ("e", 5)).toDF("id", "count")
inputDF: org.apache.spark.sql.DataFrame = [id: string, count: int]
scala> val filterDF = inputDF.filter($"count" > 3)
filterDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: string, count: int]

scala> filterDF.show()
+---+-----+
| id|count|
+---+-----+
|  d|    4|
|  e|    5|
+---+-----+

scala> val unmatchDF = inputDF.except(filterDF)
unmatchDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: string, count: int]

scala> unmatchDF.show()
+---+-----+
| id|count|
+---+-----+
|  b|    2|
|  a|    1|
|  c|    3|
+---+-----+

在 PySpark 中,您可以使用减法函数实现相同的目的。


推荐阅读