首页 > 解决方案 > Spring Boot kafka 每两小时监听一次,以防连接丢失发送信息

问题描述

我如何检查kafka服务器是否每两个小时运行一次,以防连接丢失抛出和事件到我创建的名为“throwEven()”的方法并用“listenKafkaEveryThisMs:200000”收听kafka

我的卡夫卡 yaml

卡夫卡:

  url: localhost:9092
  topic: topicName
  groupid: conver
  offsetResetConfig: earliest
  concurrency: 1
  maxPollInternalMsConfig: 300000
  maxPollRecordsConfig: 30
  errorHandlerRetryCount: 5
  listenKafkaEveryThisMs: 200000

我的 KafkaConsumerConfig 类

@EnableKafka
@Configuration
public class KafkaConsumerConfig {
    
    @Value("${kafka.concurrency}")
    private String concurrency;
    
    @Value("${kafka.url}")
    private String kafkaUrl;
    
    @Value("${kafka.groupid}")
    private String groupid;
    
    @Value("${kafka.offsetResetConfig}")
    private String offsetResetConfig;
    
    @Value("${kafka.maxPollInternalMsConfig}")
    private String maxPollInternalMsConfig;
    
    @Value("${kafka.maxPollRecordsConfig}")
    private String maxPollRecordsConfig;
    
    @Value("${kafka.errorHandlerRetryCount}")
    private String retryCount;

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUrl);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupid);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetResetConfig);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, Integer.parseInt(maxPollInternalMsConfig));
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, Integer.parseInt(maxPollRecordsConfig));
        return new DefaultKafkaConsumerFactory<>(props);
    }
 
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> 
      kafkaListenerContainerFactory() {
    
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
          new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(Integer.parseInt(concurrency));
        factory.getContainerProperties().setSyncCommits(true);
        factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);

        factory.getContainerProperties().setAckOnError(false);
        factory.setStatefulRetry(true);
        factory.setErrorHandler(new SeekToCurrentErrorHandler(Integer.parseInt(retryCount)));
        RetryTemplate retryTemplate = new RetryTemplate();
        retryTemplate.setRetryPolicy(new AlwaysRetryPolicy());
        retryTemplate.setBackOffPolicy(new ExponentialBackOffPolicy());
        
        factory.setRetryTemplate(retryTemplate);
        
        return factory;
    }
}

我的 KafkaConsumer 类

 @Service
    public class KafkaConsumer implements ConsumerSeekAware {
    
    
        private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
    
        @KafkaListener(topics = "#{'${kafka.topic}'}", groupId = "#{'${kafka.groupid}'}")
        public void consume(String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
                            @Header(KafkaHeaders.OFFSET) Long offset, Acknowledgment ack, KafkaMessageDrivenChannelAdapter kafkaMessageDrivenChannelAdapter) {


   if (!kafkaMessageDrivenChannelAdapter.isRunning()) {

            throwEvent();
        }
    
   
    
            try {
                mobileSubscriptionService.processMessage(message, ack, null);
    
            } catch (ParseException e) {
                logger.error(e.getMessage());
            }
        }
    
        @Scheduled(fixedDelayString = "${kafka.listenKafkaEveryThisMs}")
        private void throwEvent() {
    
            Map<String, String> eventDetails = new HashMap<>();
            eventDetails.put("eventDetailsKey", "eventDetailsValue");
    
            AppDynamicsEventUtil.publishEvent("eventSummary", EventSeverityStatus.INFO, EventTypeStatus.CUSTOM, eventDetails);
        }

我真的不知道我应该用什么来听 kafka server runnig 与否

谢谢

标签: javaspring-bootapache-kafka

解决方案


对于检查集群连接状态,最简单的方法是AdminClient.describeCluster()

或者,您可以将一些检查挂钩到执行器


推荐阅读