apache-kafka-streams - 墓碑消息没有从 KTable 状态存储中删除记录?
问题描述
我正在从 KStream 创建 KTable 处理数据。但是当我触发带有键和空有效负载的墓碑消息时,它不会从 KTable 中删除消息。
样本 -
public KStream<String, GenericRecord> processRecord(@Input(Channel.TEST) KStream<GenericRecord, GenericRecord> testStream,
KTable<String, GenericRecord> table = testStream
.map((genericRecord, genericRecord2) -> KeyValue.pair(genericRecord.get("field1") + "", genericRecord2))
.groupByKey()
reduce((genericRecord, v1) -> v1, Materialized.as("test-store"));
GenericRecord genericRecord = new GenericData.Record(getAvroSchema(keySchema));
genericRecord.put("field1", Long.parseLong(test.getField1()));
ProducerRecord record = new ProducerRecord(Channel.TEST, genericRecord, null);
kafkaTemplate.send(record);
在触发具有空值的消息后,我可以在带有空负载的 testStream 映射函数中进行调试,但它不会删除 KTable 更改日志“test-store”上的记录。看起来它甚至没有达到 reduce 方法,不确定我在这里缺少什么。
感谢您对此的任何帮助!
谢谢。
解决方案
如 JavaDocs 中所述reduce()
带有 {@code null} 键或值的记录将被忽略。
因为,<key,null>
记录被删除,因此(genericRecord, v1) -> v1
永远不会被执行,所以不会将 tombstone 写入 store 或 changelog 主题。
对于您想到的用例,您需要使用指示“删除”的代理值,例如 Avro 记录中的布尔标志。您的 reduce 函数需要检查标志并null
在标志设置时返回;否则,它必须定期处理记录。
更新:
KStream#toTable()
Apache Kafka 2.6 添加了允许将 aKStream
转换为KTable
.
推荐阅读
- ios - 初始化静态实例时如何运行代码?
- python - .join 函数如何与 for 循环一起工作?
- mysql - 如何在 SQL 中为一列显示一个值?
- java - 如何修复“无法将''下的属性绑定到 com.zaxxer.hikari.HikariDataSource”错误?
- flutter - 如何按后退按钮退出应用程序?
- wordpress - 如何在 woocommerce 管理订单详细信息页面中保存或更新订单项目元数据?
- ios - 无法为失败结果类型推断通用参数“T”
- javascript - 如何检查计数器值是否增加或没有反应?
- python - vscode python没有输出
- sql - 将两列信息合二为一