java - Spring Boot Kafka 客户端是否有“断路器”?
问题描述
如果 Kafka 服务器(暂时)关闭,我的 Spring Boot 应用程序ReactiveKafkaConsumerTemplate
会一直尝试连接不成功,从而导致不必要的流量并弄乱日志文件:
2021-11-10 14:45:30.265 WARN 24984 --- [onsumer-group-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-group-1, groupId=consumer-group] Connection to node -1 (localhost/127.0.0.1:29092) could not be established. Broker may not be available.
2021-11-10 14:45:32.792 WARN 24984 --- [onsumer-group-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-group-1, groupId=consumer-group] Bootstrap broker localhost:29092 (id: -1 rack: null) disconnected
2021-11-10 14:45:34.845 WARN 24984 --- [onsumer-group-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-group-1, groupId=consumer-group] Connection to node -1 (localhost/127.0.0.1:29092) could not be established. Broker may not be available.
2021-11-10 14:45:34.845 WARN 24984 --- [onsumer-group-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-group-1, groupId=consumer-group] Bootstrap broker localhost:29092 (id: -1 rack: null) disconnected
是否可以使用断路器之类的东西(此处或此处的灵感),因此 Spring Boot Kafka 客户端在发生故障(甚至更好的几次连续故障)时会减慢其连接尝试的速度,并返回到服务器重新启动后才正常速度?
是否已经有现成的配置参数或任何其他解决方案?
我知道参数 reconnect.backoff.ms
,这就是我创建ReactiveKafkaConsumerTemplate
bean的方式:
@Bean
public ReactiveKafkaConsumerTemplate<String, MyEvent> kafkaConsumer(KafkaProperties properties) {
final Map<String, Object> map = new HashMap<>(properties.buildConsumerProperties());
map.put(ConsumerConfig.GROUP_ID_CONFIG, "MyGroup");
map.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, 10_000L);
final JsonDeserializer<DisplayCurrencyEvent> jsonDeserializer = new JsonDeserializer<>();
jsonDeserializer.addTrustedPackages("com.example.myapplication");
return new ReactiveKafkaConsumerTemplate<>(
ReceiverOptions
.<String, MyEvent>create(map)
.withKeyDeserializer(new ErrorHandlingDeserializer<>(new StringDeserializer()))
.withValueDeserializer(new ErrorHandlingDeserializer<>(jsonDeserializer))
.subscription(List.of("MyTopic")));
}
消费者仍然每 3 秒尝试连接一次。
解决方案
请参阅https://kafka.apache.org/documentation/#consumerconfigs_retry.backoff.ms
在尝试重新连接到给定主机之前等待的基本时间量。这避免了在紧密循环中重复连接到主机。此退避适用于客户端到代理的所有连接尝试。
和https://kafka.apache.org/documentation/#consumerconfigs_reconnect.backoff.max.ms
重新连接到反复连接失败的代理时等待的最长时间(以毫秒为单位)。如果提供,每台主机的退避将在每次连续连接失败时呈指数增长,直至达到此最大值。在计算回退增加后,添加 20% 的随机抖动以避免连接风暴。
和
推荐阅读
- vb.net - 按十进制对 DataGridView 进行排序
- dask - Dask DataFrame .head() 索引后非常慢
- linux - readdir/File::Find::Rule 没有读取 perl 中的子目录及其内容
- json - Django REST Framework:空 ListView 上的顶级 JSON 数组是否存在安全风险?
- wpf - 在 WPF 应用程序中向 ComboBox 添加文本和值
- android - 通过 gRPC/Protobuf 进行通信
- python - 如何将 Python 列表转换为有效的 JSON 路径?
- ios - ISO 8601 日期的正确日期格式
- vue.js - [Vue 警告]:您可能在 watcher 中有一个无限更新循环,表达式为“chartData”
- c# - 演员表