首页 > 解决方案 > Kafka 删除(墓碑)不更新 Spark 结构化流中的最大聚合

问题描述

我正在对 Spark Structured Streaming (Spark 3.0) 作业中的计算聚合进行原型设计,并将更新发布到 Kafka。我需要计算每个组的最大日期和最大百分比(无窗口)。除了源流中的 Kafka 墓碑记录(删除)之外,该代码似乎很好。流接收具有有效键和空值的 Kafka 记录,但最大聚合继续在计算中包含该记录。当从 Kafka 使用删除时,在不删除记录的情况下重新计算的最佳选择是什么?

产生的示例
消息:

<"user1|1", {"user": "user1", "pct":30, "timestamp":"2021-01-01 01:00:00"}>  
<"user1|2", {"user": "user1", "pct":40, "timestamp":"2021-01-01 02:00:00"}>  
<"user1|2", null>

火花代码片段:

val usageStreamRaw = spark.readStream.format("kafka").option("kafka.bootstrap.servers", bootstrapServers).option("subscribe", usageTopic).load()

val usageStream = usageStreamRaw
    .select(col("key").cast(StringType).as("key"),
            from_json(col("value").cast(StringType), valueSchema).as("json"))
    .selectExpr("key", "json.*")

val usageAgg = usageStream.groupBy("user")
      .agg(
        max("timestamp").as("maxTime"),
        max("pct").as("maxPct")
      )

val sq = usageAgg.writeStream.outputMode("update").option("truncate","false").format("console").start()

sq.awaitTermination()

对于 user1,列中的结果pct是 40,但删除后应该是 30。有没有使用 Spark Structured Streaming 的好方法?

标签: apache-sparkapache-kafkaspark-structured-streaming

解决方案


您可以通过在每条消息中使用 Kafka 时间戳

val usageStream = usageStreamRaw
    .select(col("key").cast(StringType).as("key"),
            from_json(col("value").cast(StringType), valueSchema).as("json"),
            col("timestamp"))
    .selectExpr("key", "json.*", "timestamp")

然后

  • 仅选择每个键的最新值,并且
  • 过滤掉null

在对最大时间和 pct 应用您的聚合之前。


推荐阅读