apache-kafka - Kafka 流聚合复制
问题描述
有一个拓扑:
.mapValues((key, messages) -> remoteService.sendMessages(messages))
.flatMapValues(results -> results)
.map((key, result) -> KeyValue.pair(getAggregationKey(result), getAggregationResult(systemClock, result)))
.groupByKey(Grouped.with(createJsonSerde(AggregationKey.class), createJsonSerde(AggregationResult.class)))
.windowedBy(timeWindows)
.reduce((aggregatedResult, v) -> {
int count = aggregatedResult.getCount();
return aggregatedResult.toBuilder().count(count + 1).build();
})
.suppress(untilWindowCloses(Suppressed.BufferConfig.unbounded()))
时间窗口:
Duration duration = Duration.ofSeconds(60);
TimeWindows timeWindows = TimeWindows.of(duration).grace(Duration.ZERO);
我的假设是必须每 60 秒左右将聚合结果发送到接收主题,但我注意到有时它会发送重复项(数字不准确):第一个事件在第 50 秒发送,计数器 1000,然后在第 58 秒发送事件计数器 1050 发送了相同的密钥。它不是每分钟都发生,而是经常发生。为什么会发生这种情况?
我还注意到,第二个事件的时间戳总是小于第一个,但偏移量更大。内部减少主题也是如此。
解决方案
推荐阅读
- python - RuntimeError:事件循环已关闭 - JanusGraph
- php - VS Code 上的 intelephense 错误:预期类型为“字符串”。找到'字符串 []'
- reactjs - React + GSAP:动画元素onClick
- game-development - 如何实例化一个场景并让它只面向 x 或 z 方向?
- php - 如何在Mysql上比较2个不同表的2列
- python - Python/Django 请求 JSONDecodeError:期望值:第 1 行第 1 列(字符 0)
- haskell - 获取用户输入并添加到列表中
- ffmpeg - 使用 ffprobe 确定 wtv -> mp4 转换的 crf
- flutter - 带附件的颤振表单(文件上传)
- r - 删除完全不适用的行