java - 在 Spring Kafka 中找不到处理恢复的正确方法
问题描述
我正在开发一个相当简单的 Spring Boot 应用程序,它将处理来自单个主题的消息,然后为每条消息调用一个外部 Web 服务。我希望这个服务对错误有点聪明,例如,如果外部网站在短时间内不可用,则应该使用指数退避重试记录,直到我们放弃并只记录错误并提交记录。
我正在使用 Spring Boot 和 Spring Kafka 2.3.3。
我将向您展示我为此设置所拥有的一些 Spring 配置。为简洁起见省略了一些内容。询问是否有一些价值或其他配置可能会有所帮助。
@Configuration
@EnableKafka
public class SpringConfiguration {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers);
// .. Misc other properties related to serialisation etc ..
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public SeekToCurrentErrorHandler eh() {
long initialMillis = 500;
long factor = 2;
long maxElapsedTimeSecs = 60;
ExponentialBackOff backoff = new ExponentialBackOff(initialMillis, factor);
backoff.setMaxElapsedTime(maxElapsedTimeSecs*1000);
BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer = (rec, exc) -> {
// TODO In the final app do something more useful here
logger.error("* Maximum retry policy has been reached {} - acknowledging and proceeding *", rec);
};
SeekToCurrentErrorHandler eh = new SeekToCurrentErrorHandler(recoverer, backoff);
eh.setCommitRecovered(true);
return eh;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(RetryTemplate retryTemplate) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setMissingTopicsFatal(missingTopicsFatal); // True in prod, false otherwise
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setSyncCommitTimeout(Duration.ofSeconds(60));
factory.setStatefulRetry(true);
factory.setErrorHandler(eh());
return factory;
}
}
我的听众很简单:
@KafkaListener(topics = "${kafka.input_topic}")
public void handle(ConsumerRecord<String, SendToEBoksMessage> record, Acknowledgment acknowledgment) throws Exception {
logger.info("Listener invoked");
// TODO Right so simulate some sort of problem. External web service not available, for example.
throw new Exception("Exception of some kind");
}
但似乎该ExponentialBackoff
参数引入的延迟增加SeekToCurrentErrorHandler
导致 Kafka 发生重新平衡。重试几次后,日志显示发生了这种重新平衡:
...
Caused by: org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void MyListenerClass.handle(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, MyMessageClass>,org.springframework.kafka.support.Acknowledgment) throws java.lang.Exception' threw exception; nested exception is java.lang.Exception: Exception of some kind; nested exception is java.lang.Exception: Exception of some kind
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:1686)
... 10 more
Caused by: java.lang.Exception: Exception of some kind
at MyListenerClass.handle(SendToEboksMessageKafkaListener.java:20)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
2019-12-16 12:49:04.364 INFO michael-laptop --- [ntainer#0-0-C-1] o.a.k.c.c.i.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=mygroupid] Revoking previously assigned partitions [MyTopic-0]
2019-12-16 12:49:04.365 INFO michael-laptop --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : MyTopic: partitions revoked: [MyTopic-0]
2019-12-16 12:49:04.365 INFO michael-laptop --- [ntainer#0-0-C-1] o.a.k.c.c.i.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=mygroupid] (Re-)joining group
2019-12-16 12:49:04.373 INFO michael-laptop --- [ntainer#0-0-C-1] o.a.k.c.c.i.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=mygroupid] Successfully joined group with generation 18
2019-12-16 12:49:04.373 INFO michael-laptop --- [ntainer#0-0-C-1] o.a.k.c.c.i.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=mygroupid] Setting newly assigned partitions:
2019-12-16 12:49:04.373 INFO michael-laptop --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : ...
我不明白为什么会这样。根据我的阅读,以这种方式使用错误处理程序进行恢复将导致重试由容器处理,并避免由于调用consumer.poll()
频率不足以满足max.poll.ms
属性的潜在问题。
谁能告诉我我在这里做错了什么?
*** 更新:
我在 Kafka 代理日志中看到以下内容:
[2019-12-17 14:13:22,714] INFO [GroupCoordinator 1001]: Preparing to rebalance group MyGroup1 in state PreparingRebalance with old generation 0 (__consumer_offsets-37) (reason: Adding new member consumer-1-2d76a488-3677-4294-9aed-c153f0dca66c with group instanceid None) (kafka.coordinator.group.GroupCoordinator)
[2019-12-17 14:13:22,722] INFO [GroupCoordinator 1001]: Stabilized group MyGroup1 generation 1 (__consumer_offsets-37) (kafka.coordinator.group.GroupCoordinator)
[2019-12-17 14:13:22,735] INFO [GroupCoordinator 1001]: Assignment received from leader for group MyGroup1 for generation 1 (kafka.coordinator.group.GroupCoordinator)
[2019-12-17 14:14:18,096] INFO [GroupCoordinator 1001]: Preparing to rebalance group MyGroup1 in state PreparingRebalance with old generation 1 (__consumer_offsets-37) (reason: Adding new member consumer-1-addbdcfd-21ed-44fa-9d17-b10c7c67f07f with group instanceid None) (kafka.coordinator.group.GroupCoordinator)
[2019-12-17 14:14:20,161] INFO [GroupCoordinator 1001]: Stabilized group MyGroup1 generation 2 (__consumer_offsets-37) (kafka.coordinator.group.GroupCoordinator)
[2019-12-17 14:14:20,163] INFO [GroupCoordinator 1001]: Assignment received from leader for group MyGroup1 for generation 2 (kafka.coordinator.group.GroupCoordinator)
解决方案
这是我前段时间写的一个应用程序;我将其更新为 Boot 2.2.2 并且工作正常:
@SpringBootApplication
public class Kgh1234Application {
public static void main(String[] args) {
SpringApplication.run(Kgh1234Application.class, args);
}
@KafkaListener(id = "kgh1234", topics = "kgh1234")
public void listen(String in) {
System.out.println(in);
if (in.endsWith("5")) {
throw new RuntimeException("fail");
}
}
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.setErrorHandler(new SeekToCurrentErrorHandler(new FixedBackOff(0L, 2L)));
return factory;
}
@Bean
public NewTopic topic() {
return new NewTopic("kgh1234", 32, (short) 1);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
IntStream.range(0, 10).forEach(i -> template.send("kgh1234", "foo" + i));
};
}
}
和
spring.kafka.consumer.auto-offset-reset=earliest
它只使用默认的恢复器,它只是在重试用尽时记录。
foo5
2019-12-17 10:50:32.018 错误 32052 --- [kgh1234-0-C-1] osklSeekToCurrentErrorHandler:Backoff FixedBackOff{interval=0, currentAttempts=3, maxAttempts=2} 已耗尽 ConsumerRecord(主题 = kgh1234,分区= 1, leaderEpoch = 0, offset = 0, CreateTime = 1576597830940, 序列化键大小 = -1, 序列化值大小 = 4, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = foo5)
推荐阅读
- java - 检查 Thymeleaf 页面中的对象上是否存在特定字段
- c++ - 带有指针和递归的随机崩溃
- r - Plotly R:具有0 y轴值的值的悬停文本
- networking - 用于流量委托的 Kubernetes 概念或类似的东西?
- javascript - 打开一个带有 5 个不同选项卡的新窗口
- javascript - 使用 Ionic 的社交分享插件的正确方法?
- python - 关键错误:向数据框添加新列时出现“图像”
- plotly-dash - 如何在破折号图中有效地调整图形边距或填充
- swift - CollectionView Reloaddata - 致命错误:索引超出范围
- ibm-mq - RFH2 - 标头的格式是什么?