首页 > 解决方案 > Kafka Stream windowedBy 聚合 Materialized withRetention Out Of Memory

问题描述

我有一个 KStream<String,Event> 应该是 windowedBy 并且聚合结果导致内存不足:

java.lang.OutOfMemoryError: Java heap space

KStream DSL如下:

TimeWindows timeWindows = TimeWindows.of(Duration.ofDays(1)).advanceBy(Duration.ofMillis(1));
Initializer<History> historyInitializer = History::new;
        Aggregator<String, Event, History> historyAggregator = (key, value, aggregate) -> {
            aggregate.key = value.uuid;
            aggregate.addHistoryEventWindow(value);
            return aggregate;
        };

KTable<String, History> historyWindowed = eventStreamRaw
.filter((key, value) -> value != null)
    .groupByKey(Grouped.with(Serdes.String(), this.eventSerde))
    // segment our messages into 1-day windows
    .windowedBy(timeWindows)
    .aggregate(historyInitializer, historyAggregator, Named.as("name"), Materialized.with(Serdes.String(), this.historySerde))
    .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
    .groupBy(
            (key, value) -> new KeyValue<String, History>(
                    value.key + "|+|" + key.window().start() + "|+|" + key.window().end(), value),
            Grouped.with(Serdes.String(), this.historySerde))
    .aggregate(History::new, (key, value, aggValue) -> value, (key, value, aggValue) -> value,
            Materialized.with(Serdes.String(), this.historySerde));

阅读一些文章(例如Kafka Streams Window By & RocksDB Tuning),我注意到我可能必须将商店“Materialized”配置为保留“1 天 + 1 Milli”。

但是尝试添加对我不起作用:

final Materialized<String, History, WindowStore<Bytes, byte[]>> store = Materialized.<String, History, WindowStore<Bytes, byte[]>>as("eventstore")
        .withKeySerde(Serdes.String())
        .withValueSerde(this.historySerde)
        .withRetention(Duration.ofDays(1).plus(Duration.ofMillis(1)));

KTable<String, History> historyWindowed = eventStreamRaw
    ...
    .aggregate(historyInitializer, historyAggregator, Named.as("name"), store)

Java 编译抛出以下错误:

The method 
aggregate(Initializer<VR>, Aggregator<? super String,? super Event,VR>, Named, Materialized<String,VR,WindowStore<Bytes,byte[]>>) 
in the type TimeWindowedKStream<String,Event> is not applicable for the arguments 
(Initializer<History>, Aggregator<String,Event,History>, Named, Materialized<String,History,WindowStore<Bytes,byte[]>>)

老实说,我不明白。参数正确;VR 类型是“历史”。

那么,你知道我错过了什么吗?

这个 windowedBy KTable 的想法是有一个状态,它可以为一个“事物”保存一天的所有事件。假设产生了一个新警报,我想将某一天的“事物”的所有事件附加到警报中。然后我会从 KStream Alert 到 KTable History 进行 leftJoin。这是将历史数据添加到 Kafka 事件的最佳方式吗?有没有办法“查找”KStream 事件的最后 x 天?我已经检查了 KStream Alert-KStream 事件 leftJoin 但这会为每个新的 KStream 事件产生一个输出。所以,从我的观点来看,这是不切实际的。

非常感谢您的帮助。我希望这只是一个简单的修复。高度赞赏!

标签: javaapache-kafkaapache-kafka-streams

解决方案


查看以下帖子Kafka Streams App - count and sum aggregate我导入了错误的“字节”类。所以,一定要导入下面的类“org.apache.kafka.common.utils.Bytes”。

但是,也许您有一个更好的主意,可以使用来自另一个流的历史数据来丰富来自一个流的 Kafka 消息,该流与(外)键相关。

多谢你们。


推荐阅读