java - Spring Boot kafka 每两小时监听一次,以防连接丢失发送信息
问题描述
我如何检查kafka服务器是否每两个小时运行一次,以防连接丢失抛出和事件到我创建的名为“throwEven()”的方法并用“listenKafkaEveryThisMs:200000”收听kafka
我的卡夫卡 yaml
卡夫卡:
url: localhost:9092
topic: topicName
groupid: conver
offsetResetConfig: earliest
concurrency: 1
maxPollInternalMsConfig: 300000
maxPollRecordsConfig: 30
errorHandlerRetryCount: 5
listenKafkaEveryThisMs: 200000
我的 KafkaConsumerConfig 类
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value("${kafka.concurrency}")
private String concurrency;
@Value("${kafka.url}")
private String kafkaUrl;
@Value("${kafka.groupid}")
private String groupid;
@Value("${kafka.offsetResetConfig}")
private String offsetResetConfig;
@Value("${kafka.maxPollInternalMsConfig}")
private String maxPollInternalMsConfig;
@Value("${kafka.maxPollRecordsConfig}")
private String maxPollRecordsConfig;
@Value("${kafka.errorHandlerRetryCount}")
private String retryCount;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUrl);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupid);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetResetConfig);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, Integer.parseInt(maxPollInternalMsConfig));
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, Integer.parseInt(maxPollRecordsConfig));
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(Integer.parseInt(concurrency));
factory.getContainerProperties().setSyncCommits(true);
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setAckOnError(false);
factory.setStatefulRetry(true);
factory.setErrorHandler(new SeekToCurrentErrorHandler(Integer.parseInt(retryCount)));
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(new AlwaysRetryPolicy());
retryTemplate.setBackOffPolicy(new ExponentialBackOffPolicy());
factory.setRetryTemplate(retryTemplate);
return factory;
}
}
我的 KafkaConsumer 类
@Service
public class KafkaConsumer implements ConsumerSeekAware {
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
@KafkaListener(topics = "#{'${kafka.topic}'}", groupId = "#{'${kafka.groupid}'}")
public void consume(String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
@Header(KafkaHeaders.OFFSET) Long offset, Acknowledgment ack, KafkaMessageDrivenChannelAdapter kafkaMessageDrivenChannelAdapter) {
if (!kafkaMessageDrivenChannelAdapter.isRunning()) {
throwEvent();
}
try {
mobileSubscriptionService.processMessage(message, ack, null);
} catch (ParseException e) {
logger.error(e.getMessage());
}
}
@Scheduled(fixedDelayString = "${kafka.listenKafkaEveryThisMs}")
private void throwEvent() {
Map<String, String> eventDetails = new HashMap<>();
eventDetails.put("eventDetailsKey", "eventDetailsValue");
AppDynamicsEventUtil.publishEvent("eventSummary", EventSeverityStatus.INFO, EventTypeStatus.CUSTOM, eventDetails);
}
我真的不知道我应该用什么来听 kafka server runnig 与否
谢谢
解决方案
对于检查集群连接状态,最简单的方法是AdminClient.describeCluster()
或者,您可以将一些检查挂钩到执行器
推荐阅读
- workflow - NetSuite 创建没有销售订单的采购订单工作流
- r - 获取两个 R 数据表中每个元素的最接近数量
- julia - 如何从 Julia 中的 Ref 继承
- angular - 制作一个接受可观察对象并等待该可观察对象满足条件的自定义 RXJS 管道
- python - 将两个列表(一个包含重复元素,一个包含整数)合并到 dict 中
- ssis - SSIS 将参数传递给挂在 Azure SQL DB 中的 WHERE 子句
- list - 如何按偶数和奇数位置将列表拆分为两个列表?
- html - 如何在 xamarin 表单 webview 中调整内容的大小
- python - 如何从 python 的模块中列出全局命名空间的内容
- powershell - 如何将数组从中间点连接到末尾或末尾 - 1?