首页 > 解决方案 > 如何忽略 Kafka Connect Elasticsearch 中的错误结果

问题描述

我正在尝试运行 kafka connect 以进行弹性搜索。但由于一些错误,我在 kafka 主题中输入了错误的记录。

现在我修复了这个问题并插入了正确的值,但弹性搜索仍然在主题中的先前记录上抛出错误

这是错误

Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error

Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'lambdaDemo0': was expecting ('true', 'false' or 'null')
 at [Source: (byte[])"lambdaDemo0-9749-0e710000fd04"; line: 1, column: 13]

有什么办法可以忽略主题中的旧记录并告诉 kafka connect 选择最新记录?我正在尝试删除主题,我将主题标记为删除,但主题中仍然存在记录。

我尝试了以下两个属性,但似乎确实有效

drop.invalid.message=true
behavior.on.malformed.documents=ignore

请建议我如何清理主题中的错误记录

标签: apache-kafkaapache-kafka-connect

解决方案


您可以告诉 Kafka Connect 跳过不良记录

errors.tolerance = all

或者,您可以通过添加将这些消息路由到另一个主题(称为死信队列)进行检查

errors.tolerance = all
errors.deadletterqueue.topic.name = my-dlq-topic

这些设置适用于 Kafka Connect 以及在处理的序列化/反序列化阶段失败的任何连接器。有关详细信息,请参阅本文


推荐阅读