apache-kafka - 删除并重新创建任何连接器后,所有连接器都进入失败状态
问题描述
我有启用 SSL 的 Kafka 设置并使用 POST 请求(如下)在 kafka-connect 上注册消息。如果连接设置是新的,没有现有的连接器,它会很好地注册连接器。但是,如果我删除任何连接器,一段时间后所有连接器都会进入失败状态并出现 TimeoutException(如下)。如果我停止 kafka-connect 并从 kafka 中删除所有与 kafka-connect 相关的元数据主题,然后重新启动它。问题出现了,但我再次注册了所有连接器。问题是 kafka-connect 元数据主题可能没有更新,但我无法确定问题并找到解决方案。这是 POST 请求:
curl -k -v -X POST -H "Accept:application/json" -H "Content-Type:application/json" https://kafka-connect.domain.com:9093/connectors/ -d '{
"name": "TEST-CONNECTOR-TEST1131",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "test.domain.com",
"database.port": "3306",
"database.user": "debezium",
"database.password": "test",
"database.serverTimezone":"America/Los_Angeles",
"database.server.id": "201908281131",
"database.server.name": "TEST-CONNECTOR",
"database.history.kafka.bootstrap.servers":
"kafka1.domain.com:9094",
"database.history.kafka.topic": "dbhistory.test_201908281131",
"include.schema.changes": "true",
"table.whitelist": "qwerdb.test1",
"database.history.producer.sasl.mechanism": "PLAIN",
"database.history.producer.security.protocol": "SASL_SSL",
"database.history.producer.ssl.key.password": "test",
"database.history.producer.ssl.keystore.location":
"/opt/keystore.jks",
"database.history.producer.ssl.keystore.password": "test",
"database.history.producer.ssl.truststore.location":
"/opt/truststore.jks",
"database.history.producer.ssl.truststore.password": "test"
}
}'
这是异常跟踪:
"trace": "org.apache.kafka.connect.errors.ConnectException: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata\n
\tat io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:273)\n
\tat io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:47)\n
\tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:198)\n
\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)\n
\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)\n
\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n
\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n
\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n
\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\n
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata\n"
解决方案
将生产者设置添加到 kafka connect connect-distributed.properties 文件后,此问题就消失了。connect-distributed.properties 文件中缺少这些
推荐阅读
- node.js - 猫鼬电话不返回
- vb.net - 在 Visual Basic 中拆分字符串时出错?
- javascript - JS验证函数跳过字段并从中间开始
- php - 如何修复 php 页面中 shell_exec 中的 pm2 错误?
- button - 从 BUTTON 或 SCRIPTEDITOR 运行时脚本不同
- html - 如果主要内容很短,则带有页脚的可滚动侧边栏可见
- protocol-buffers - .proto 文件中的 json 标记
- amazon-web-services - 如何使用具有现货 EC2 实例的感觉的单个 EBS 卷
- sql - 从 TXT SUM 组中获取数据并减去
- android - 如何解析从 Volley Android 获取的 json