scala - 消除 Streaming DataFrame 中的重复项(去重)
问题描述
我有一个 Spark 流处理器。数据框 dfNewExceptions 有重复项(由“ExceptionId”重复)。由于这是一个流数据集,因此以下查询失败:
val dfNewUniqueExceptions = dfNewExceptions.sort(desc("LastUpdateTime"))
.coalesce(1)
.dropDuplicates("ExceptionId")
val dfNewExceptionCore = dfNewUniqueExceptions.select("ExceptionId", "LastUpdateTime")
dfNewExceptionCore.writeStream
.format("console")
// .outputMode("complete")
.option("truncate", "false")
.option("numRows",5000)
.start()
.awaitTermination(1000)
** 线程“main” org.apache.spark.sql.AnalysisException 中的异常:流数据帧/数据集不支持排序,除非它在完整输出模式下的聚合数据帧/数据集上;;**
关于如何从 dfNewExceptions 中删除重复项的任何建议?
解决方案
我建议遵循 Structured Streaming Guide on Streaming Deduplication中解释的方法。那里说:
您可以使用事件中的唯一标识符对数据流中的记录进行重复数据删除。这与使用唯一标识符列的静态重复数据删除完全相同。该查询将存储来自先前记录的必要数量的数据,以便它可以过滤重复记录。与聚合类似,您可以使用带或不带水印的重复数据删除。
使用水印- 如果重复记录的到达时间有上限,则您可以在事件时间列上定义水印并使用 guid 和事件时间列进行重复数据删除。该查询将使用水印从过去的记录中删除旧的状态数据,这些数据预计不会再有任何重复。这限制了查询必须保持的状态量。
还给出了 Scala 中的示例:
val dfExceptions = spark.readStream. ... // columns: ExceptionId, LastUpdateTime, ...
dfExceptions
.withWatermark("LastUpdateTime", "10 seconds")
.dropDuplicates("ExceptionId", "LastUpdateTime")
推荐阅读
- docusignapi - DocuSign 以前的签名者信息
- java - Spark2 shell 退出,并在线程“main”java.lang.IllegalArgumentException 中出现异常:MALFORMED 错误
- ios - 使用贝塞尔路径快速绘制圆圈
- javascript - Angular 组件与 Input() 的交互未同步
- c++ - Qt - 为 QML 定义全局函数
- r - 如何将文本文件中的“Inf”字符串转换为数字
- netsuite - Netsuite 脚本组合/合并多个语句
- javascript - Vue.js 中 JSON 对象的值
- node.js - 从Angular 5中的http请求获取数据
- python - 异常类型:MissingSchema / beautifulsoup