scala - Spark Structured Streaming - 带有窗口时间事件的自定义聚合
问题描述
我正在尝试对具有事件时间窗口的结构化流进行自定义聚合。
首先,我尝试将#Aggregator 接口(typed-UDAF)与 .agg 函数一起使用,例如:
val aggregatedDataset = streamDataset
.select($"id",$"time", $"eventType", $"eventValue"))
.groupBy(window($"time", "1 hour"), $"id").agg(CustomAggregator("time","eventType","eventValue").toColumn.as("aggregation"))
然而,这种聚合(在 reduce 函数中)仅适用于新的输入元素,而不是整个组
所以我尝试使用 GroupState 函数(mapGroupsWithState,flapMapGroupWithState),甚至只是 mapGroups 函数(没有状态)来执行我的聚合
但是我的 groupBy 操作返回 RelationalGroupedDataset 并且我需要一个 KeyValueGroupedDataset 来使用地图函数。groupByKey 不适用于窗口化。
如何使用结构化流和定时事件进行自定义聚合?
谢谢!
解决方案
GroupState 函数 - mapGroupsWithState
、flapMapGroupWithState
或mapGroups
(不带状态)仅在我们需要在Update
输出模式下操作时用于执行聚合。
但是如果我们使用Complete
输出模式,那么我们不需要 GroupState 函数。
因此,如果您将aggregatedDataset
query 的输出模式更改为Complete
,那么它将按预期工作。
我希望它有帮助!
推荐阅读
- angular - 错误:没有 Toaster 容器被初始化来接收 toast
- javascript - HEX 到 Threejs rgb 转换与 THREE.MeshPhongMaterial 不起作用
- codeeffects - FieldAttribute Group 属性不适用于集合
- ruby-on-rails - 如何使用依赖:销毁而不造成无限循环
- powershell - 如何给出打算在 Jenkins Powershell 插件中执行的 Powershell 脚本的相对路径
- java - 调试本地 Maven 依赖
- android - Flutter,如何像弧线一样裁剪容器
- javascript - 将python文件中的数据读入javascript文件
- python - 如何使用查找轮廓打开 cv 来增加边界框的大小?
- mysql - 在 MySql 中使用 IF 或 CASE 与 INNER JOIN