apache-flink - Flink 滑动计数窗口行为
问题描述
假设我们有这样的数据结构:
Tuple2<ArryaList<Long>, Integer>
第一个字段是一个ArrayList
长度为 one 的包含时间戳的字段,而 Integer 字段是一个介于 1 和 40 之间的数字,名为channel
。目标是使用相同的键 ( channel
) 聚合每 400 条消息并ReduceFunction
对其应用(它只是将 400 条消息的时间戳合并到元组的第一个字段中)。我将该channel
字段设置为消息的键并创建一个 400 的计数窗口。例如,如果我们有160000条消息作为输入,它应该输出160000/400 = 400
行并且计数窗口按需要工作。问题是当我使用滑动计数窗口时,我的预期行为是:
Flink 为每个channel
数字创建逻辑窗口并ReduceFunction
第一次应用,如果逻辑窗口的长度达到 400,之后每 100 个输入数据,使用与逻辑窗口的 key 相同的 key,将调用ReduceFunction
for last 400 消息窗口也是如此,所以我们应该有:
160000 - 400 = 159600
//前 400 个输入将第一次调用 reduce 函数159600 / 100 = 1596
//在前 400 个输入之后,对于每 100 个输入,Flink 调用最后 400 个输入的 reduce 函数1 + 1596 = 1597
// 输出行数
但是运行滑动计数窗口,它会输出 1600 行长度可变的行。(我预计输出长度仅为 400)
要点:说长度我是指ArrayList的大小(Tuple2的第一个字段)
- 前 40 个通道 --> 长度为 100
- 第二个40通道-->长度为299
- 第三个40通道-->长度598
- 第四个40通道-->长度997
- 剩下 40 个通道 --> 长度为 400
我怎样才能证明这种行为并实现我想要的滑动计数窗口?
这是源代码:
DataStream<Tuple2<ArrayList<Long>, Integer>> data ;
data.keyBy(1).countWindow(400, 100)
.reduce(new ReduceFunction<Tuple2<ArrayList<Long>, Integer>>() {
@Override
public Tuple2<ArrayList<Long>, Integer> reduce(Tuple2<ArrayList<Long>, Integer> t0, Tuple2<ArrayList<Long>, Integer> t1) throws Exception {
t0.f0.add(t1.f0.get(0));
return t0;
}
}).writeAsText("results400").setParallelism(1);
更新:根据@DavidAnderson 的建议,我也尝试在ReduceFunstion
而不是修改中创建一个新的元组t0
,但它产生了相同的输出。
public Tuple2<ArrayList<Long>, Integer> reduce(Tuple2<ArrayList<Long>, Integer> t0, Tuple2<ArrayList<Long>, Integer> t1) throws Exception {
ArrayList<Long> times = t0.f0;
times.addAll(t1.f0);
return new Tuple2<>(times, t0.f1) ;
}
解决方案
这是countWindow的实现
public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {
return window(GlobalWindows.create())
.evictor(CountEvictor.of(size))
.trigger(CountTrigger.of(slide));
}
它的行为与您期望的不太一样。窗口每 100 个元素(幻灯片)触发一次,无论它是否包含 400 个元素(大小)。大小控制最多保留多少个元素。
推荐阅读
- windows - Windows CMD 解析
- java - 求解涉及大整数的方程
- equality - 我们可以在没有模式匹配的情况下推导出 Agda 中相等/身份证明的唯一性(仅使用 J & K)吗?
- ringcentral - 如何使用 RingCentral 一次从多个呼叫队列中添加和删除用户?
- asp.net-core - 登录后无法使用 Controller.User
- javascript - 如果将命名函数声明放在 return 语句中,为什么不提升它?
- javascript - html5视频结束时淡入元素
- c - C中的字符常量和初始化
- python - Python:如果列表同时包含数字和字符串,如何排序?
- c# - 挂起 HtmlAgilityPack 任务