apache-kafka - 如何忽略 Kafka Connect Elasticsearch 中的错误结果
问题描述
我正在尝试运行 kafka connect 以进行弹性搜索。但由于一些错误,我在 kafka 主题中输入了错误的记录。
现在我修复了这个问题并插入了正确的值,但弹性搜索仍然在主题中的先前记录上抛出错误
这是错误
Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'lambdaDemo0': was expecting ('true', 'false' or 'null')
at [Source: (byte[])"lambdaDemo0-9749-0e710000fd04"; line: 1, column: 13]
有什么办法可以忽略主题中的旧记录并告诉 kafka connect 选择最新记录?我正在尝试删除主题,我将主题标记为删除,但主题中仍然存在记录。
我尝试了以下两个属性,但似乎确实有效
drop.invalid.message=true
behavior.on.malformed.documents=ignore
请建议我如何清理主题中的错误记录
解决方案
您可以告诉 Kafka Connect 跳过不良记录
errors.tolerance = all
或者,您可以通过添加将这些消息路由到另一个主题(称为死信队列)进行检查
errors.tolerance = all
errors.deadletterqueue.topic.name = my-dlq-topic
这些设置适用于 Kafka Connect 以及在处理的序列化/反序列化阶段失败的任何连接器。有关详细信息,请参阅本文。
推荐阅读
- regex - 如何在perl中对数组应用负正则表达式?
- javascript - 如何从 node.js 脚本文件执行 package.json 脚本?
- javascript - 没有 Node.js 的 HTML 表单数据到本地 JSON 文件
- sql - 将附加数据与查询一起发送到 SQL?
- python - 使用 ORB 未检测到关键点
- arrays - xcode swift没有重新填充数组
- firebase - Flutter Firebase 动画列表流
- kubernetes - 在帮助器模板中声明 cassandra.name 时 .Values.nameOverride 的意义
- javascript - 减去状态值 - 反应
- javascript - 续集:SQLITE_ERROR:靠近“[]”:语法错误