apache-spark - Kafka 删除(墓碑)不更新 Spark 结构化流中的最大聚合
问题描述
我正在对 Spark Structured Streaming (Spark 3.0) 作业中的计算聚合进行原型设计,并将更新发布到 Kafka。我需要计算每个组的最大日期和最大百分比(无窗口)。除了源流中的 Kafka 墓碑记录(删除)之外,该代码似乎很好。流接收具有有效键和空值的 Kafka 记录,但最大聚合继续在计算中包含该记录。当从 Kafka 使用删除时,在不删除记录的情况下重新计算的最佳选择是什么?
产生的示例
消息:
<"user1|1", {"user": "user1", "pct":30, "timestamp":"2021-01-01 01:00:00"}>
<"user1|2", {"user": "user1", "pct":40, "timestamp":"2021-01-01 02:00:00"}>
<"user1|2", null>
火花代码片段:
val usageStreamRaw = spark.readStream.format("kafka").option("kafka.bootstrap.servers", bootstrapServers).option("subscribe", usageTopic).load()
val usageStream = usageStreamRaw
.select(col("key").cast(StringType).as("key"),
from_json(col("value").cast(StringType), valueSchema).as("json"))
.selectExpr("key", "json.*")
val usageAgg = usageStream.groupBy("user")
.agg(
max("timestamp").as("maxTime"),
max("pct").as("maxPct")
)
val sq = usageAgg.writeStream.outputMode("update").option("truncate","false").format("console").start()
sq.awaitTermination()
对于 user1,列中的结果pct
是 40,但删除后应该是 30。有没有使用 Spark Structured Streaming 的好方法?
解决方案
您可以通过在每条消息中使用 Kafka 时间戳
val usageStream = usageStreamRaw
.select(col("key").cast(StringType).as("key"),
from_json(col("value").cast(StringType), valueSchema).as("json"),
col("timestamp"))
.selectExpr("key", "json.*", "timestamp")
然后
- 仅选择每个键的最新值,并且
- 过滤掉
null
值
在对最大时间和 pct 应用您的聚合之前。
推荐阅读
- python - WM_Class 被忽略并且正在运行的应用程序的图标不与停靠/最喜欢的应用程序图标结合
- python - 破折号显示卡片边框和卡片标题之间的空白
- python - Python 3.x evdev 确定键盘和鼠标
- c# - 如何重构这些 switch case 以处理自然语言中的用户选择?
- outlook - 如何创建具有自定义值的 Outlook 日历事件?
- mysql - 如何选择仅按一个特定列分组的其他列的最大日期
- python - Lark 匹配自定义分隔符多行字符串
- javascript - 在 Mat-Select 中滚动到 Mat-0 选项的末尾时触发事件:Angular 6
- amazon-web-services - 创建统一的实例组集群,为实例组中的每个实例类型指定唯一的自定义 AMI
- c++ - 为什么这个模板不编译?