java - 如何在 Spring Kafka Listener 上捕获警告“Broker 可能不可用”
问题描述
我正在开发一个 POC,用于在我的项目中实现一个 kafka 集群。我已经在我的本地机器上设置了一个带有 3 个代理的 kafka 集群。现在我正在使用 Spring MVC REST 服务向 Kafka 服务器发送消息,该服务在内部使用 Spring Kafka 来生成和使用来自 Kafka 集群的消息。现在,当代理关闭时,当消费者无法从主题接收消息时,我正在尝试发送警报。我关闭了消费者连接的唯一代理。我的日志中没有任何异常,但我收到了以下警告消息。
0:20:35.500 [TEST_GROUP-0-C-1] WARN o.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=TEST_GROUP] 无法建立到节点 2147483645 的连接。经纪人可能不可用。
是否可以捕获此警告消息,以便在我的消费者断开连接时发送警报?以下是我的消费者代码。
private static final Logger LOGGER = LoggerFactory.getLogger(ListenerServiceImpl.class);
@Autowired
Dao<RnMessage> messageDao;
@Autowired
MessageService messageService;
@KafkaListener(id = "TEST_GROUP", topics = "TESTQUEUE", errorHandler="eventQueueMessageListenerExceptionHandler")
public void listenMessageInQueue(String msg) {
try {
//String str = new String(msg, "UTF-8");
LOGGER.info("receiving payload='{}'", msg);
messageDao.saveMessage(msg);
messageService.sendMessageToOutQueue(msg);
}catch(Exception e) {
e.printStackTrace();
}
}
解决方案
当无法轮询代理时,消费者将抛出 NonResponsiveConsumerEvent。可以使用@EventListener 捕获此事件。
这是一个示例代码:
@EventListener()
public void eventHandler(NonResponsiveConsumerEvent event) {
//When Kafka server is down, NonResponsiveConsumerEvent error is caught here.
System.out.println("CAUGHT the event "+ event);
}
您可以在文档中查看更多详细信息。https://docs.spring.io/spring-kafka/reference/htmlsingle/#idle-containers
我在文档中特别提到了这一点。
如果代理无法访问(在撰写本文时),则消费者 poll() 方法不会退出,因此不会收到任何消息,并且无法生成空闲事件。为了解决这个问题,容器会发布一个 NonResponsiveConsumerEvent
一个有效的 JUnit 测试代码在这里 - https://github.com/shankarps/KafkaDemo/blob/master/src/test/java/com/kafkademo/stack/TestConsumerWithNonAvailableBroker.java
推荐阅读
- python-3.x - 如何使用 Python 3 文件操作和循环显示 1 到 10?
- javascript - 如何使用文本输入字段创建多选下拉菜单?
- google-bigquery - 如何使用数组更新 bigquery 表?
- java - UML 中两个类之间的虚线是什么?
- python - Python分析内部函数
- c++ - 算术操作数类型转换
- c# - 为什么当两个对撞机都击中时立方体会到达另一个立方体而不是停止?
- visual-studio-code - 如何告诉 VSCode 显示通知
- javascript - 如何使用 Express JS 响应 JSON
- python - 来自 Json 值的嵌套值