首页 > 解决方案 > Kafka Streams Suppression 阻止转发

问题描述

我有一个看起来像这样的 kafka 流处理器:

return stream ->
    stream
        .transform(new MyTransformer())
        .groupByKey(Grouped.with(Serdes.String(), CustomSerdes.MyWrapper()))
        .windowedBy(TimeWindows.of(Duration.ofMillis(60000)).grace(Duration.ofSeconds(1)))
        .aggregate(
            MessageGroup::new,
            new MyAggregator(),
            Materialized.<String, MessageGroup, WindowStore<Bytes, byte[]>>as("my-group-store-v1")
                .withKeySerde(Serdes.String())
                .withValueSerde(CustomSerdes.MessageGroup())
                .withRetention(Duration.ofMillis(120000)))
        .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
        .toStream()
        .map((wkey, group) -> new KeyValue(((Windowed) wkey).key(), group))
        .to("destiniation-topic");

我可以看到消息在聚合器中被分组和累积,但我希望消息在the first message event time + window period + grace period到达后立即转发到我的目标主题(在我的情况下是 1 分钟 + 1 秒后)。但是,消息永远不会到达目标主题。

知道为什么会发生这种情况,或者我可以做些什么来使该过程对预期的窗口表现得更准确吗?

我试过改变:

.suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))

至:

.suppress(Suppressed.untilTimeLimit(Duration.ofMillis(timedWindowMs), Suppressed.BufferConfig.unbounded())

但这没有效果。

标签: apache-kafkaapache-kafka-streams

解决方案


推荐阅读