首页 > 解决方案 > 消除 Streaming DataFrame 中的重复项(去重)

问题描述

我有一个 Spark 流处理器。数据框 dfNewExceptions 有重复项(由“ExceptionId”重复)。由于这是一个流数据集,因此以下查询失败:

val dfNewUniqueExceptions = dfNewExceptions.sort(desc("LastUpdateTime"))
                                    .coalesce(1)
                                    .dropDuplicates("ExceptionId")
                                    
val dfNewExceptionCore = dfNewUniqueExceptions.select("ExceptionId", "LastUpdateTime")
    dfNewExceptionCore.writeStream
      .format("console")
//      .outputMode("complete")
      .option("truncate", "false")
      .option("numRows",5000)
      .start()
      .awaitTermination(1000)
  

** 线程“main” org.apache.spark.sql.AnalysisException 中的异常:流数据帧/数据集不支持排序,除非它在完整输出模式下的聚合数据帧/数据集上;;**

这也记录在这里:https ://home.apache.org/~pwendell/spark-nightly/spark-branch-2.0-docs/latest/structured-streaming-programming-guide.html

关于如何从 dfNewExceptions 中删除重复项的任何建议?

标签: scalaapache-sparkapache-spark-sqlspark-structured-streamingdelta-lake

解决方案


我建议遵循 Structured Streaming Guide on Streaming Deduplication中解释的方法。那里说:

您可以使用事件中的唯一标识符对数据流中的记录进行重复数据删除。这与使用唯一标识符列的静态重复数据删除完全相同。该查询将存储来自先前记录的必要数量的数据,以便它可以过滤重复记录。与聚合类似,您可以使用带或不带水印的重复数据删除。

使用水印- 如果重复记录的到达时间有上限,则您可以在事件时间列上定义水印并使用 guid 和事件时间列进行重复数据删除。该查询将使用水印从过去的记录中删除旧的状态数据,这些数据预计不会再有任何重复。这限制了查询必须保持的状态量。

还给出了 Scala 中的示例:

val dfExceptions = spark.readStream. ... // columns: ExceptionId, LastUpdateTime, ... 

dfExceptions 
  .withWatermark("LastUpdateTime", "10 seconds") 
  .dropDuplicates("ExceptionId", "LastUpdateTime")

推荐阅读