apache-kafka - 卡夫卡流在窗口和重新启动期间丢弃消息
问题描述
我正在使用以下拓扑开发 Kafka Streams 应用程序:
private final Initializer<Set<String>> eventInitializer = () -> new HashSet<>();
final StreamsBuilder streamBuilder = new StreamsBuilder();
final KStream<String, AggQuantityByPrimeValue> eventStreams = streamBuilder.stream("testTopic",
Consumed.with(Serdes.String(), **valueSerde**));
final KStream<String, Value> filteredStreams = eventStreams
.filter((key,clientRecord)->recordValidator.isAllowedByRules(clientRecord));
final KGroupedStream<Integer, Value> groupedStreams = filteredStreams.groupBy(
(key, transactionEntry) -> transactionEntry.getNodeid(),
Serialized.with(Serdes.Integer(), **valueSerde**));
/* Hopping window */
final TimeWindowedKStream<Integer, Value> windowedGroupStreams = groupedStreams
.windowedBy(TimeWindows.of(Duration.ofSeconds(30)).advanceBy(Duration.ofSeconds(25))
.grace(Duration.ofSeconds(0)));
/* Aggregating the events */
final KStream<Windowed<Integer>, Set<String>> suppressedStreams = windowedGroupStreams
.aggregate(eventInitializer, countAggregator, Materialized.as("counts-aggregate")
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())
.withName("suppress-window")
.toStream();
suppressedStreams.foreach((windowed, value) -> eventProcessor.publish(windowed.key(), value));
return new KafkaStreams(streamBuilder.build(), config.getKafkaConfigForStreams());
我观察到在窗口期间/之后间歇性地很少有事件被丢弃。例如:
- 所有记录都可以在 isAllowedByRules() 方法中查看/打印,这些记录是有效的(过滤器允许)并由流使用。
- 但是当在countAggregator中打印事件时,我可以看到很少有事件没有通过它。
流的当前配置:
Properties streamsConfig = new Properties();
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG,"kafka-app-id"
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, <bootstraps-server>);
streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 30000);
streamsConfig.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 5);
streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);
streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
streamsConfig.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 10485760);
streamsConfig.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 10485760);
streamsConfig.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 10485760);
/*For window buffering across all threads*/
streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 52428800);
streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, **customSerdesForSet**);
最初,我使用的是翻滚窗口,但我发现大多数情况下在窗口结束时很少有事件丢失,所以我改为跳跃窗口(复制比丢失更好)。然后丢弃的事件变为零。但今天又是在将近 4 天后,我看到很少有丢弃的事件,其中有一个模式,与一起制作的其他事件相比,它们晚了近一分钟。但随后的期望是,这些迟到的事件应该出现在未来的任何窗口中,但这并没有发生。如果我的理解不正确,请在这里纠正我。
同样正如我在主题中提到的那样,在重新启动流时(优雅地),尽管由 isAllowedByRules() 方法处理,但在聚合步骤中我可以看到很少有事件再次丢失。
我在堆栈溢出和其他网站上进行了很多搜索,但找不到这种行为的根本原因。是否与我缺少/未正确设置的某些配置有关,或者可能是由于其他原因?
解决方案
据我了解,您有一个空的宽限期:
/* Hopping window */
...
.grace(Duration.ofSeconds(0))
因此,您的窗口已关闭,不允许任何迟到。
然后关于您的子问题:
But then expectation is that these late events should come in any of the future windows but that didn't happen. Correct me here if my understanding is not right.
也许您正在混合事件时间和处理时间。如果记录的时间戳(由生产者在生产时添加,或者如果生产者未设置,则在到达集群时由代理添加)在您当前的窗口之外,您的记录将被归类为“迟到”。
这是一个带有 2 条记录“*”的示例。
他们的事件时间(et1 和 et2)适合窗口:
| window |
t1 t2
| * * |
et1 et2
但是,et2 (pt2) 的处理时间实际上如下:
| window |
t1 t2
| * | *
pt1 pt2
这里的窗口是 t1 和 t2 之间的时间片(处理时间) et1 和 et2 分别是 2 条记录“*”的事件时间。et1 和 et2 是记录本身中设置的时间戳。在这个例子中,et1 和 et2 在 t1 和 t2 之间,et2 是在窗口关闭后收到的,因为你的宽限期是 0,它会被跳过。
可能是个解释
推荐阅读
- reactjs - 反应,测试输入占位符
- elasticsearch - Elasticsearch - 复合聚合最大值
- webpack - 蚂蚁设计主题的故事书不太行不通
- sql - 如何查看退货记录
- android-studio - 如何从代码中将文本颜色设置为 Android Studio 中的默认值?
- angular - RxJS mergeMap 等待内部 Observables
- c# - 如何将记录从 gridview 控件传递到 pdf 查看器 Devexpress
- r - R TaskScheduler:在执行任务时在 cmd 控制台上打印文本
- c# - 如何统一检查最近物体的颜色?
- asp.net-mvc - Blazor WebAssembly 可以合并到 .NET Framework 4.8 MVC 站点中吗?