apache-kafka-streams - kafka 流中的聚合和状态存储保留
问题描述
我有一个如下用例。对于每个传入事件,我想查看某个字段以查看其状态是否从 A 更改为 B,如果是,则将其发送到输出主题。流程是这样的:带有键“xyz”的事件以状态 A 进入,一段时间后另一个事件以键“xyz”进入状态 B。我使用高级 DSL 编写了这段代码。
final KStream<String, DomainEvent> inputStream....
final KStream<String, DomainEvent> outputStream = inputStream
.map((k, v) -> new KeyValue<>(v.getId(), v))
.groupByKey(Serialized.with(Serdes.String(), jsonSerde))
.aggregate(DomainStatusMonitor::new,
(k, v, aggregate) -> {
aggregate.updateStatusMonitor(v);
return aggregate;
}, Materialized.with(Serdes.String(), jsonSerde))
.toStream()
.filter((k, v) -> v.isStatusChangedFromAtoB())
.map((k,v) -> new KeyValue<>(k, v.getDomainEvent()));
有没有更好的方法来使用 DSL 编写这个逻辑?
关于上面代码中聚合创建的状态存储的几个问题。
- 它是否默认创建内存状态存储?
- 如果我有无限数量的唯一传入键会发生什么?如果它默认使用内存存储,我不需要切换到持久存储吗?我们如何处理 DSL 中的这种情况?
- 如果状态存储非常大(内存中或持久),它如何影响启动时间?如何使流处理等待以使存储完全初始化?或者 Kafka Streams 是否会确保在处理任何传入事件之前完全初始化状态存储?
提前致谢!
解决方案
默认情况下,将使用持久性 RocksDB 存储。如果你想使用内存存储,你会传入
Materialized.as(Stores.inMemoryKeyValueStore(...))
如果您有无限数量的唯一键,您最终将耗尽主内存或磁盘,并且您的应用程序将死亡。根据您的语义,您可以通过使用带有大“gap”参数的会话窗口聚合来获得“TTL”,而不是使旧密钥过期。
在处理新数据之前,状态总是会被恢复。如果您使用内存存储,这将通过使用底层更改日志主题来实现。根据您所在州的大小,这可能需要一段时间。如果您使用持久性 RocksDB 存储,状态将从磁盘加载,因此不需要恢复并且应该立即进行处理。只有当您丢失本地磁盘上的状态时,才会在这种情况下从更改日志主题恢复。
推荐阅读
- c++ - 如何通过 scanf() 读取没有标点符号的字符串?
- python - 在 Python 中匹配字符串序列中的 2 个单词
- javascript - 获取多个复选框的值及其父输入字段
- javascript - 当用户从另一个页面返回页面时的 JavaScript 事件
- powershell - 如何在 powershell 脚本中执行多个 cmdlet?
- java - 在 Android 9 Internal Memory 上访问数据文件
- visual-studio-code - 是否可以在 VSC 中检测 Zen 模式是否启用
- java - 如何在 Java 中过滤 OpenCV 错误消息
- php - Symfony 更新失败并出现 YAML 错误
- python - 查找列表中的出现