java - 使用 DSL KStream 到 KTable 转换的 Kafka Tombstoning
问题描述
我有一个KStream<String, X>
我基本上想转换为KTable<String, Y>
我能找到使用 DSL 实现此目的的唯一方法是使用映射、分组然后减少。
val stream: KStream<String, X> = ...
val table: KTable<String, Y> = stream
.mapValues({ value -> toYOrNull(value)})
.groupByKey(Grouped.with(Serdes.String(), ySerde))
.reduce(
{old: Y?, updated: Y? -> updated},
Materialized.`as`<String, Y, KeyValueStore<Bytes, ByteArray>>("y-store")
.withKeySerde(Serdes.String()
.withValueSerde(ySerde)
)
我希望这可以处理当updated
in 的值reduce
是null
但是当我使用它检查商店时TopologyTestDriver
它似乎仍然具有旧版本的情况。我究竟做错了什么?
这是我的测试:
@Test
fun shouldDeleteFromTableWhenNull() {
val store = testDriver.getKeyValueStore<String, Y?>("y-store")
store.put("key", Y())
inputTopic.pipeInput("key", anXThatMapsToANullY)
assertThat(store.get("key")).isNull() // Fails as the old entry is still there
}
解决方案
推荐阅读
- python - Python - Pandas(在中间插入新行但不覆盖现有行)
- ios - 我可以在 Swift (4) 的视图控制器中获得隐式解包选项的编译时检查吗?
- sql-server - 仅在数据实际更新时才存储信息的更新触发器?
- javascript - 找不到更新选择的属性
- salesforce - #kentico cms 到 salesforce 社区
- javascript - 在谷歌地球引擎上将一层剪裁到另一层的边界
- python-3.x - 从多个特定日期选择 pandas 中的数据
- c# - C#:LINQ 中的嵌套 If else 条件
- c# - 尝试在 C# 中执行 HTTPRequest 时出现 SocketException
- go - go中的平方负数