apache-kafka - Kafka Stream 甚至在窗口关闭之前创建新的聚合
问题描述
我正在使用 Kafka 流进行窗口聚合。这是逻辑:
KStream<String, String> stream = builder.stream(topics, Consumed.with(stringSerde, stringSerde));
Stream.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(2)).grace(Duration.ZERO))
.aggregate(()-> "",
(key, word, aggregate) -> {
logger.info("(key, word, aggregate): ({}, {}, {})" , key, word, aggregate);
aggregate = aggregate+ "-" + word;
return aggregate;
}, Materialized.with(stringSerde, stringSerde))
.toStream((k, v) -> k.key())
.foreach((k, v) -> logger.info("Aggregated to Stream. (k,v): ({}, {})" , k, v));
尽管这在大多数情况下都有效,但我观察到了以下问题:
- 聚合过早刷新
- 甚至在窗口关闭之前就创建了新的聚合桶
这些日志(标记线)可以明显看出这些问题:
[2019-08-14 14:10:38,855] [ INFO] [prc-client-StreamThread-1] Aggregation (118) - (key, word, aggregate): (1, a, )
(1)[2019-08-14 14:11:24,503] [ INFO] [prc-client-StreamThread-1] Aggregation (124) - Aggregated to Stream. (k,v): (1, -a)
[2019-08-14 14:11:27,735] [ INFO] [prc-client-StreamThread-1] Aggregation (118) - (key, word, aggregate): (1, b, -a)
[2019-08-14 14:11:43,298] [ INFO] [prc-client-StreamThread-1] Aggregation (118) - (key, word, aggregate): (1, f, -a-b)
[2019-08-14 14:11:59,373] [ INFO] [prc-client-StreamThread-1] Aggregation (118) - (key, word, aggregate): (1, b, -a-b-f)
(2)[2019-08-14 14:12:14,196] [ INFO] [prc-client-StreamThread-1] Aggregation (118) - (key, word, aggregate): (1, r, )
[2019-08-14 14:13:24,808] [ INFO] [prc-client-StreamThread-1] Aggregation (124) - Aggregated to Stream. (k,v): (1, -a-b-f-b)
[2019-08-14 14:13:24,808] [ INFO] [prc-client-StreamThread-1] Aggregation (124) - Aggregated to Stream. (k,v): (1, -r)
有没有办法解决这些问题?
解决方案
推荐阅读
- c# - C# 使用 ExecuteReader 时从存储的过程中捕获返回值
- snappydata - 使用 aws 外部 IP 地址连接到 snappy 数据
- javascript - 验证 JavaScript 中任意两个特定字母“a”和“i”的输入,并将“a”替换为@,将“i”替换为?
- sas - 在 WHEN-DO 语句 SAS 中创建一个变量
- php - 字符串替换功能并不总是有效
- c# - 例外:未配置 RegionEndpoint 或 ServiceURL
- typescript - Ionic 4 图像选择器崩溃 android 应用程序
- c# - 找不到 FullSerializer 和 fsSerializer - Unity
- javascript - 如何在不转到新主页的情况下更改网站布局
- c# - 计算c#列表中元素的实例