首页 > 解决方案 > 卡夫卡流在窗口和重新启动期间丢弃消息

问题描述

我正在使用以下拓扑开发 Kafka Streams 应用程序:

private final Initializer<Set<String>> eventInitializer = () -> new HashSet<>();

final StreamsBuilder streamBuilder = new StreamsBuilder();

    final KStream<String, AggQuantityByPrimeValue> eventStreams = streamBuilder.stream("testTopic",
            Consumed.with(Serdes.String(), **valueSerde**));

    final  KStream<String, Value> filteredStreams = eventStreams
                .filter((key,clientRecord)->recordValidator.isAllowedByRules(clientRecord));

    final KGroupedStream<Integer, Value> groupedStreams = filteredStreams.groupBy(
        (key, transactionEntry) -> transactionEntry.getNodeid(),
        Serialized.with(Serdes.Integer(), **valueSerde**));

    /* Hopping window */
    final TimeWindowedKStream<Integer, Value> windowedGroupStreams = groupedStreams
        .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).advanceBy(Duration.ofSeconds(25))
            .grace(Duration.ofSeconds(0)));

    /* Aggregating the events */
    final KStream<Windowed<Integer>, Set<String>> suppressedStreams = windowedGroupStreams
        .aggregate(eventInitializer, countAggregator, Materialized.as("counts-aggregate")
        .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())
                .withName("suppress-window")
            .toStream();

    suppressedStreams.foreach((windowed, value) -> eventProcessor.publish(windowed.key(), value)); 

    return new KafkaStreams(streamBuilder.build(), config.getKafkaConfigForStreams());

我观察到在窗口期间/之后间歇性地很少有事件被丢弃。例如:

流的当前配置:

Properties streamsConfig = new Properties();
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG,"kafka-app-id"
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, <bootstraps-server>); 
streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 30000);
streamsConfig.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 5);
streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);
streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
streamsConfig.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 10485760);
streamsConfig.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 10485760);
streamsConfig.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 10485760);
/*For window buffering across all threads*/
streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 52428800);

streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, **customSerdesForSet**);

最初,我使用的是翻滚窗口,但我发现大多数情况下在窗口结束时很少有事件丢失,所以我改为跳跃窗口(复制比丢失更好)。然后丢弃的事件变为零。但今天又是在将近 4 天后,我看到很少有丢弃的事件,其中有一个模式,与一起制作的其他事件相比,它们晚了近一分钟。但随后的期望是,这些迟到的事件应该出现在未来的任何窗口中,但这并没有发生。如果我的理解不正确,请在这里纠正我。

同样正如我在主题中提到的那样,在重新启动流时(优雅地),尽管由 isAllowedByRules() 方法处理,但在聚合步骤中我可以看到很少有事件再次丢失。

我在堆栈溢出和其他网站上进行了很多搜索,但找不到这种行为的根本原因。是否与我缺少/未正确设置的某些配置有关,或者可能是由于其他原因?

标签: apache-kafkaapache-kafka-streamswindowing

解决方案


据我了解,您有一个空的宽限期:

 /* Hopping window */
...
            .grace(Duration.ofSeconds(0))

因此,您的窗口已关闭,不允许任何迟到。

然后关于您的子问题: But then expectation is that these late events should come in any of the future windows but that didn't happen. Correct me here if my understanding is not right.

也许您正在混合事件时间和处理时间。如果记录的时间戳(由生产者在生产时添加,或者如果生产者未设置,则在到达集群时由代理添加)在您当前的窗口之外,您的记录将被归类为“迟到”。

这是一个带有 2 条记录“*”的示例。

他们的事件时间(et1 和 et2)适合窗口:

 |    window       |
 t1                t2
 |      *    *     |
       et1  et2          

但是,et2 (pt2) 的处理时间实际上如下:

 |    window       |
 t1                t2
 |      *          |   *
       pt1            pt2

这里的窗口是 t1 和 t2 之间的时间片(处理时间) et1 和 et2 分别是 2 条记录“*”的事件时间。et1 和 et2 是记录本身中设置的时间戳。在这个例子中,et1 和 et2 在 t1 和 t2 之间,et2 是在窗口关闭后收到的,因为你的宽限期是 0,它会被跳过。

可能是个解释


推荐阅读