apache-kafka - Kafka Streams Suppression 阻止转发
问题描述
我有一个看起来像这样的 kafka 流处理器:
return stream ->
stream
.transform(new MyTransformer())
.groupByKey(Grouped.with(Serdes.String(), CustomSerdes.MyWrapper()))
.windowedBy(TimeWindows.of(Duration.ofMillis(60000)).grace(Duration.ofSeconds(1)))
.aggregate(
MessageGroup::new,
new MyAggregator(),
Materialized.<String, MessageGroup, WindowStore<Bytes, byte[]>>as("my-group-store-v1")
.withKeySerde(Serdes.String())
.withValueSerde(CustomSerdes.MessageGroup())
.withRetention(Duration.ofMillis(120000)))
.suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
.toStream()
.map((wkey, group) -> new KeyValue(((Windowed) wkey).key(), group))
.to("destiniation-topic");
我可以看到消息在聚合器中被分组和累积,但我希望消息在the first message event time + window period + grace period
到达后立即转发到我的目标主题(在我的情况下是 1 分钟 + 1 秒后)。但是,消息永远不会到达目标主题。
知道为什么会发生这种情况,或者我可以做些什么来使该过程对预期的窗口表现得更准确吗?
我试过改变:
.suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
至:
.suppress(Suppressed.untilTimeLimit(Duration.ofMillis(timedWindowMs), Suppressed.BufferConfig.unbounded())
但这没有效果。
解决方案
推荐阅读
- mysql - 查询JSON列以提取Mysql 5.7的Array List中的选定映射
- admob - Admob 的 RewardedAd 显示有限制吗?
- java - 这个河内塔解决方案中的递归是如何工作的?
- swift - 视图加载时单元格不应自动突出显示
- arrays - 带有 IF、AND 和 OR 的表格数组公式
- javascript - Javascript - 在水平滚动菜单中自动滚动到页面加载时的活动菜单项
- linux - Linux At Command 运行 docker-compose
- c# - 我可以在 sqlite-net-pcl 中使用 Linq 语法进行连接查询吗?
- iis - 从 IIS 中删除绑定的命令
- mongodb - 通过 docker compose 在不同的服务中使用/引用容器名称