首页 > 解决方案 > kafka 流中的聚合和状态存储保留

问题描述

我有一个如下用例。对于每个传入事件,我想查看某个字段以查看其状态是否从 A 更改为 B,如果是,则将其发送到输出主题。流程是这样的:带有键“xyz”的事件以状态 A 进入,一段时间后另一个事件以键“xyz”进入状态 B。我使用高级 DSL 编写了这段代码。

final KStream<String, DomainEvent> inputStream....

final KStream<String, DomainEvent> outputStream = inputStream
          .map((k, v) -> new KeyValue<>(v.getId(), v))
                    .groupByKey(Serialized.with(Serdes.String(), jsonSerde))
                    .aggregate(DomainStatusMonitor::new,
                            (k, v, aggregate) -> {
                                aggregate.updateStatusMonitor(v);
                                return aggregate;
                            }, Materialized.with(Serdes.String(), jsonSerde))
                    .toStream()
                    .filter((k, v) -> v.isStatusChangedFromAtoB())
                    .map((k,v) -> new KeyValue<>(k, v.getDomainEvent()));

有没有更好的方法来使用 DSL 编写这个逻辑?

关于上面代码中聚合创建的状态存储的几个问题。

  1. 它是否默认创建内存状态存储?
  2. 如果我有无限数量的唯一传入键会发生什么?如果它默认使用内存存储,我不需要切换到持久存储吗?我们如何处理 DSL 中的这种情况?
  3. 如果状态存储非常大(内存中或持久),它如何影响启动时间?如何使流处理等待以使存储完全初始化?或者 Kafka Streams 是否会确保在处理任何传入事件之前完全初始化状态存储?

提前致谢!

标签: apache-kafka-streams

解决方案


  1. 默认情况下,将使用持久性 RocksDB 存储。如果你想使用内存存储,你会传入Materialized.as(Stores.inMemoryKeyValueStore(...))

  2. 如果您有无限数量的唯一键,您最终将耗尽主内存或磁盘,并且您的应用程序将死亡。根据您的语义,您可以通过使用带有大“gap”参数的会话窗口聚合来获得“TTL”,而不是使旧密钥过期。

  3. 在处理新数据之前,状态总是会被恢复。如果您使用内存存储,这将通过使用底层更改日志主题来实现。根据您所在州的大小,这可能需要一段时间。如果您使用持久性 RocksDB 存储,状态将从磁盘加载,因此不需要恢复并且应该立即进行处理。只有当您丢失本地磁盘上的状态时,才会在这种情况下从更改日志主题恢复。


推荐阅读