spring - 即使代理关闭,卡夫卡也会继续产生请求
问题描述
目前,当我创建生产者来发送我的记录时,例如由于某些原因,kafka 不可用,生产者会无限期地发送相同的消息。例如,在我收到此错误 3 次后如何停止生成消息:
Connection to node -1 could not be established. Broker may not be available.
我正在使用反应堆 kafka 生产者:
@Bean
public KafkaSender<String, String> createSender() {
return KafkaSender.create(senderOptions());
}
private SenderOptions<String, String> senderOptions() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
props.put(ProducerConfig.CLIENT_ID_CONFIG, kafkaProperties.getClientId());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.RETRIES_CONFIG, kafkaProperties.getProducerRetries());
return SenderOptions.create(props);
}
然后用它来发送记录:
sender.send(Mono.just(SenderRecord.create(new ProducerRecord<>(topicName, null, message), message)))
.flatMap(result -> {
if (result.exception() != null) {
return Flux.just(ResponseEntity.badRequest()
.body(result.exception().getMessage()));
}
return Flux.just(ResponseEntity.ok().build());
})
.next();
解决方案
恐怕clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
重试不涉及它,它会迭代直到maxBlockTimeMs = 60000
默认情况下。ProducerConfig.MAX_BLOCK_MS_CONFIG
您可以通过属性为生产者减少此选项:
public static final String MAX_BLOCK_MS_CONFIG = "max.block.ms";
private static final String MAX_BLOCK_MS_DOC = "The configuration controls how long <code>KafkaProducer.send()</code> and <code>KafkaProducer.partitionsFor()</code> will block."
+ "These methods can be blocked either because the buffer is full or metadata unavailable."
+ "Blocking in the user-supplied serializers or partitioner will not be counted against this timeout.";
更新
我们可以这样解决问题:
@PostMapping(path = "/v1/{topicName}")
public Mono<ResponseEntity<?>> postData(
@PathVariable("topicName") String topicName, String message) {
return sender.send(Mono.just(SenderRecord.create(new ProducerRecord<>(topicName, null, message), message)))
.flatMap(result -> {
if (result.exception() != null) {
sender.close();
return Flux.just(ResponseEntity.badRequest()
.body(result.exception().getMessage()));
}
return Flux.just(ResponseEntity.ok().build());
})
.next();
}
注意sender.close();
在错误的情况下。
我认为是时候针对 Reactor Kafka 项目提出一个问题,以允许关闭生产者出错。
推荐阅读
- c# - 从 DataGridView 打印时列顺序错误
- c++ - 使用 -static-libstdc++ 时 std::thread 弱,从而导致运行时崩溃
- php - 未读 get_field 和 sub_field 始终返回 null 或 false
- xml - Google 电子表格:来自 Google Drive .txt 的 ImportXML
- reactjs - 用 sinon.fakeTimers 测试 setInterval() 不起作用?
- javascript - AE脚本更改字体样式
- javascript - 为什么 setInterval() 给出循环引用?
- indexof - JMESPath 当前数组索引
- javascript - 提交时表单未进入验证功能
- scala - SparkException:无法及时执行广播