apache-kafka - Kafka Connect JDBC 连接器 - 由于不可恢复的异常而退出 WorkerSinkTask
问题描述
我正在使用 JDBC 接收器连接器,并且在该主题中有一条错误消息。我知道为什么消息不好(由于生产者的问题导致违反 FK 约束而失败)。工作任务报告的错误是:
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
java.util.concurrent.FutureTask.run(FutureTask.java:266)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: java.sql.BatchUpdateException: Cannot add or update a child row: a foreign key constraint fails (`sensorium`.`reading`, CONSTRAINT `reading_ibfk_1` FOREIGN KEY (`sensorId`) REFERENCES `sensor` (`id`))\ncom.mysql.jdbc.exceptions.jdbc4.MySQLIntegrityConstraintViolationException: Cannot add or update a child row: a foreign key constraint fails (`sensorium`.`reading`, CONSTRAINT `reading_ibfk_1` FOREIGN KEY (`sensorId`) REFERENCES `sensor` (`id`))\n
io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:86)
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)\n\t... 10 more\nCaused by: java.sql.SQLException: java.sql.BatchUpdateException:
Cannot add or update a child row: a foreign key constraint fails
(`sensorium`.`reading`, CONSTRAINT `reading_ibfk_1` FOREIGN KEY (`sensorId`) REFERENCES `sensor`
(`id`))\ncom.mysql.jdbc.exceptions.jdbc4.MySQLIntegrityConstraintViolation
Exception: Cannot add or update a child row: a foreign key constraint
fails (`sensorium`.`reading`, CONSTRAINT `reading_ibfk_1` FOREIGN KEY
(`sensorId`) REFERENCES `sensor` (`id`))
我想要发生的是跳过这条坏消息。所以我试过设置"errors.tolerance": "all"
。sink 连接器的完整配置如下:
{
"name": "reading-sink2",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": 4,
"topics": "READING_MYSQL",
"key.converter.schema.registry.url": "http://localhost:8081",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"connection.url": "jdbc:mysql://localhost:3306/sensorium?user=app&password=tQpRMCzHlAeu6kQIBk4U",
"auto.create": true,
"table.name.format": "reading",
"errors.tolerance": "all"
}
}
但是正在记录相同的错误,没有跳过消息,也没有处理后续消息。
为什么errors.tolerance: all
没有按预期工作?
解决方案
errors.tolerance
属性是指在转换(消息转换到/从 Kafka Connect 模式)或转换消息(应用单消息转换)期间发生的错误。
您不能跳过/吞下在SinkTask::put(Collection<SinkRecord> records)
或期间抛出的异常SourceTask::poll()
在你的情况下抛出异常SinkTask::put(...)
io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:86)
关于类似问题的问题:
- Kafka Connect 接收器任务忽略容差限制
- kafka connect - jdbc sink sql异常
- Apache Kafka JDBC 连接器 - SerializationException: Unknown magic byte
您可以在 confluent 页面的以下博客中阅读更多相关信息:https ://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues
推荐阅读
- reporting-services - textbox4 的值表达式直接引用字段
- node.js - 猫鼬(检查并保存如果新)
- java - 如何知道 JVM 因 Segfault 而崩溃的原因?
- asp.net-mvc - '值不能为空。参数名称:ViewStart.cshtml 上的 virtualPath'
- c# - Win2D API 随机失败,并出现错误“必须从同一工厂实例创建一起使用的对象”。
- android - 在 Android 屏幕录制中 - 如何获取每一帧?
- c# - 如何停止项目模板概念中的向导操作(IWizard)
- excel - 如何在同一类别下仅识别一个最小的数字
- powershell - 在powershell中将system.array对象转换为json对象
- python - 如何从 argparse 解析器对象中查询我期望的参数类型?