apache-kafka - 数据库关闭时 Kafka JDBC Sink Connector 的最大重试次数和重试间隔
问题描述
我正在尝试在数据库关闭时测试和评估 Kafka JDBC Sink 连接器的行为。
当数据库宕机时,当Kafka收到新消息时,会报如下错误:
INFO Unable to connect to database on attempt 1/3. Will retry in 10000 ms. (io.confluent.connect.jdbc.util.CachedConnectionProvider:91)
com.microsoft.sqlserver.jdbc.SQLServerException: Unable to access availability database 'Giorgos' because the database replica is not in the PRIMARY or SECONDARY role. Connections to an availability database is permitted only when the database replica is in the PRIMARY or SECONDARY role. Try the operation again later.
并且经过一些重试后,会报如下错误并杀死任务:
ERROR WorkerSinkTask{id=sink-giorgos_test-2} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:173)
我在哪里可以修改根据第一个错误设置为10000
毫秒的退休次数和重试间隔?
假设我想让工作人员继续尝试连接到数据库 5 分钟。我应该配置哪些参数来做到这一点?
编辑以包括所需的文件:
接收器文件.properties
name=sink-test
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=GIORGOS.TOPIC
connection.url=jdbc:sqlserver://ip:port;DatabaseName=Streaming;user=myuser;password=mypass
auto.create=true
# DB failover
max.retries=10
retry.backoff.ms=10000
pk.mode=record_value
pk.fields=ID
insert.mode=upsert
transforms=ExtractField
transforms.ExtractField.type=org.apache.kafka.connect.transforms.ExtractField$Value
transforms.ExtractField.field=data
worker.properties(我在分布式模式下运行时有多个文件)
bootstrap.servers=localhost:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-statuses
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
rest.port=8040
rest.advertised.port=8040
plugin.path=/usr/share/java
解决方案
在这里解释:https ://docs.confluent.io/current/connect/connect-jdbc/docs/sink_config_options.html#retries
您可以在连接器配置中配置 2 个属性:
max.retries=30
retry.backoff.ms=10000
在这里,它将重试 30 次,每次尝试之间等待 10 秒(= 300 秒 = 5 分钟)
推荐阅读
- git - 如何在拉取期间更改全局 git 设置以进行 git 合并
- python - 如何通过python将整个文本拆分为句子片段(部分)
- c++ - 如何自动屏蔽数据
- opc-ua - 如果一些旧的 PLC 没有运行 OPC 服务器应该怎么办,我们可以在不使用任何第三方工具的情况下连接它们吗?
- angular - Angular 的 Spring Cloud Gateway Preflight Cors 错误
- elasticsearch - 为什么使用term查询查询elasticsearch时搜索结果会有差异?
- mysql - 5.1 版的 MySQL 查询在 5.7 中不起作用
- mysql - 当我想创建触发器时,如何在 MySql 工作台中修复错误代码 1415?
- arrays - Flutter:如何解码多个json数据数组?
- mongodb - 错误:在服务“mongo”中使用了命名卷“mongodb_config:/data/configdb:rw”,但在卷部分未找到声明