首页 > 解决方案 > Kafka 流聚合复制

问题描述

有一个拓扑:

.mapValues((key, messages) -> remoteService.sendMessages(messages))
.flatMapValues(results -> results)
.map((key, result) -> KeyValue.pair(getAggregationKey(result), getAggregationResult(systemClock, result)))
.groupByKey(Grouped.with(createJsonSerde(AggregationKey.class), createJsonSerde(AggregationResult.class)))
.windowedBy(timeWindows)
.reduce((aggregatedResult, v) -> {
    int count = aggregatedResult.getCount();
    return aggregatedResult.toBuilder().count(count + 1).build();
})
.suppress(untilWindowCloses(Suppressed.BufferConfig.unbounded()))

时间窗口:

Duration duration = Duration.ofSeconds(60);    
TimeWindows timeWindows = TimeWindows.of(duration).grace(Duration.ZERO);

我的假设是必须每 60 秒左右将聚合结果发送到接收主题,但我注意到有时它会发送重复项(数字不准确):第一个事件在第 50 秒发送,计数器 1000,然后在第 58 秒发送事件计数器 1050 发送了相同的密钥。它不是每分钟都发生,而是经常发生。为什么会发生这种情况?

我还注意到,第二个事件的时间戳总是小于第一个,但偏移量更大。内部减少主题也是如此。

标签: apache-kafkaapache-kafka-streams

解决方案


推荐阅读