首页 > 解决方案 > 使用 DSL KStream 到 KTable 转换的 Kafka Tombstoning

问题描述

我有一个KStream<String, X>我基本上想转换为KTable<String, Y>

我能找到使用 DSL 实现此目的的唯一方法是使用映射、分组然后减少。

val stream: KStream<String, X> = ...
val table: KTable<String, Y> = stream
  .mapValues({ value -> toYOrNull(value)})
  .groupByKey(Grouped.with(Serdes.String(), ySerde))
  .reduce(
    {old: Y?, updated: Y? -> updated},
    Materialized.`as`<String, Y, KeyValueStore<Bytes, ByteArray>>("y-store")
      .withKeySerde(Serdes.String()
      .withValueSerde(ySerde)
  )

我希望这可以处理当updatedin 的值reducenull但是当我使用它检查商店时TopologyTestDriver它似乎仍然具有旧版本的情况。我究竟做错了什么?

这是我的测试:

@Test
fun shouldDeleteFromTableWhenNull() {
  val store = testDriver.getKeyValueStore<String, Y?>("y-store")
  store.put("key", Y())

  inputTopic.pipeInput("key", anXThatMapsToANullY)

  assertThat(store.get("key")).isNull() // Fails as the old entry is still there
}

标签: javakotlinapache-kafka-streams

解决方案


值为 null 的记录将被忽略。

根据文档,这是预期的行为:KGroupedStream::reduce(...) Java Doc

通过分组键组合此流中记录的值。具有空键或值的记录将被忽略


推荐阅读