java - Kafka Stream windowedBy 聚合 Materialized withRetention Out Of Memory
问题描述
我有一个 KStream<String,Event> 应该是 windowedBy 并且聚合结果导致内存不足:
java.lang.OutOfMemoryError: Java heap space
KStream DSL如下:
TimeWindows timeWindows = TimeWindows.of(Duration.ofDays(1)).advanceBy(Duration.ofMillis(1));
Initializer<History> historyInitializer = History::new;
Aggregator<String, Event, History> historyAggregator = (key, value, aggregate) -> {
aggregate.key = value.uuid;
aggregate.addHistoryEventWindow(value);
return aggregate;
};
KTable<String, History> historyWindowed = eventStreamRaw
.filter((key, value) -> value != null)
.groupByKey(Grouped.with(Serdes.String(), this.eventSerde))
// segment our messages into 1-day windows
.windowedBy(timeWindows)
.aggregate(historyInitializer, historyAggregator, Named.as("name"), Materialized.with(Serdes.String(), this.historySerde))
.suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
.groupBy(
(key, value) -> new KeyValue<String, History>(
value.key + "|+|" + key.window().start() + "|+|" + key.window().end(), value),
Grouped.with(Serdes.String(), this.historySerde))
.aggregate(History::new, (key, value, aggValue) -> value, (key, value, aggValue) -> value,
Materialized.with(Serdes.String(), this.historySerde));
阅读一些文章(例如Kafka Streams Window By & RocksDB Tuning),我注意到我可能必须将商店“Materialized”配置为保留“1 天 + 1 Milli”。
但是尝试添加对我不起作用:
final Materialized<String, History, WindowStore<Bytes, byte[]>> store = Materialized.<String, History, WindowStore<Bytes, byte[]>>as("eventstore")
.withKeySerde(Serdes.String())
.withValueSerde(this.historySerde)
.withRetention(Duration.ofDays(1).plus(Duration.ofMillis(1)));
KTable<String, History> historyWindowed = eventStreamRaw
...
.aggregate(historyInitializer, historyAggregator, Named.as("name"), store)
Java 编译抛出以下错误:
The method
aggregate(Initializer<VR>, Aggregator<? super String,? super Event,VR>, Named, Materialized<String,VR,WindowStore<Bytes,byte[]>>)
in the type TimeWindowedKStream<String,Event> is not applicable for the arguments
(Initializer<History>, Aggregator<String,Event,History>, Named, Materialized<String,History,WindowStore<Bytes,byte[]>>)
老实说,我不明白。参数正确;VR 类型是“历史”。
那么,你知道我错过了什么吗?
这个 windowedBy KTable 的想法是有一个状态,它可以为一个“事物”保存一天的所有事件。假设产生了一个新警报,我想将某一天的“事物”的所有事件附加到警报中。然后我会从 KStream Alert 到 KTable History 进行 leftJoin。这是将历史数据添加到 Kafka 事件的最佳方式吗?有没有办法“查找”KStream 事件的最后 x 天?我已经检查了 KStream Alert-KStream 事件 leftJoin 但这会为每个新的 KStream 事件产生一个输出。所以,从我的观点来看,这是不切实际的。
非常感谢您的帮助。我希望这只是一个简单的修复。高度赞赏!
解决方案
查看以下帖子Kafka Streams App - count and sum aggregate我导入了错误的“字节”类。所以,一定要导入下面的类“org.apache.kafka.common.utils.Bytes”。
但是,也许您有一个更好的主意,可以使用来自另一个流的历史数据来丰富来自一个流的 Kafka 消息,该流与(外)键相关。
多谢你们。
推荐阅读
- regex - 用正则表达式分隔起始数字
- akka-http - 如果要对所有 CRUD 操作使用 post HTTP 方法,是否会出现任何问题
- c# - 单击按钮发送电子邮件
- azure - TeamCity - FTP 上传到 Azure 失败
- rest - HBase REST API 批处理属性不起作用
- javascript - 如何在antd中获取FormItem更改的字段值
- vue.js - 如何在 VUE 中构建嵌套的响应式数据
- java - 从 Android 到 Web 服务器的间歇性超时问题
- ios - 在设备上运行应用程序时查看本地数据库
- solr - 如何使用 ext:solr 索引整个 TYPO3 存储?