首页 > 解决方案 > Kafka Stream 甚至在窗口关闭之前创建新的聚合

问题描述

我正在使用 Kafka 流进行窗口聚合。这是逻辑:

    KStream<String, String> stream = builder.stream(topics, Consumed.with(stringSerde, stringSerde));
    Stream.groupByKey()
    .windowedBy(TimeWindows.of(Duration.ofMinutes(2)).grace(Duration.ZERO))
    .aggregate(()-> "",
         (key, word, aggregate) -> {
           logger.info("(key, word, aggregate): ({}, {}, {})" , key, word, aggregate);
           aggregate = aggregate+ "-" + word;
           return aggregate;
         }, Materialized.with(stringSerde, stringSerde))
     .toStream((k, v) -> k.key())
     .foreach((k, v) -> logger.info("Aggregated to Stream. (k,v): ({}, {})" , k, v));

尽管这在大多数情况下都有效,但我观察到了以下问题:

  1. 聚合过早刷新
  2. 甚至在窗口关闭之前就创建了新的聚合桶

这些日志(标记线)可以明显看出这些问题:

[2019-08-14 14:10:38,855] [ INFO] [prc-client-StreamThread-1] Aggregation (118) - (key, word, aggregate): (1, a, )

(1)[2019-08-14 14:11:24,503] [ INFO] [prc-client-StreamThread-1] Aggregation (124) - Aggregated to Stream. (k,v): (1, -a)

[2019-08-14 14:11:27,735] [ INFO] [prc-client-StreamThread-1] Aggregation (118) - (key, word, aggregate): (1, b, -a)
[2019-08-14 14:11:43,298] [ INFO] [prc-client-StreamThread-1] Aggregation (118) - (key, word, aggregate): (1, f, -a-b)
[2019-08-14 14:11:59,373] [ INFO] [prc-client-StreamThread-1] Aggregation (118) - (key, word, aggregate): (1, b, -a-b-f)

(2)[2019-08-14 14:12:14,196] [ INFO] [prc-client-StreamThread-1] Aggregation (118) - (key, word, aggregate): (1, r, )

[2019-08-14 14:13:24,808] [ INFO] [prc-client-StreamThread-1] Aggregation (124) - Aggregated to Stream. (k,v): (1, -a-b-f-b)
[2019-08-14 14:13:24,808] [ INFO] [prc-client-StreamThread-1] Aggregation (124) - Aggregated to Stream. (k,v): (1, -r)

有没有办法解决这些问题?

标签: apache-kafkaapache-kafka-streamswindowed

解决方案


推荐阅读