首页 > 解决方案 > Spark Structured Streaming - 带有窗口时间事件的自定义聚合

问题描述

我正在尝试对具有事件时间窗口的结构化流进行自定义聚合。
首先,我尝试将#Aggregator 接口(typed-UDAF)与 .agg 函数一起使用,例如:

val aggregatedDataset = streamDataset
  .select($"id",$"time", $"eventType", $"eventValue"))
  .groupBy(window($"time", "1 hour"), $"id").agg(CustomAggregator("time","eventType","eventValue").toColumn.as("aggregation"))

然而,这种聚合(在 reduce 函数中)仅适用于新的输入元素,而不是整个组

所以我尝试使用 GroupState 函数(mapGroupsWithState,flapMapGroupWithState),甚至只是 mapGroups 函数(没有状态)来执行我的聚合

但是我的 groupBy 操作返回 RelationalGroupedDataset 并且我需要一个 KeyValueGroupedDataset 来使用地图函数。groupByKey 不适用于窗口化。

如何使用结构化流和定时事件进行自定义聚合?

谢谢!

标签: scalaapache-sparkapache-spark-sqlspark-structured-streaming

解决方案


GroupState 函数 - mapGroupsWithStateflapMapGroupWithStatemapGroups(不带状态)仅在我们需要在Update输出模式下操作时用于执行聚合。

但是如果我们使用Complete输出模式,那么我们不需要 GroupState 函数。

因此,如果您将aggregatedDatasetquery 的输出模式更改为Complete,那么它将按预期工作。

我希望它有帮助!


推荐阅读