首页 > 解决方案 > 如何在 Spring Kafka Listener 上捕获警告“Broker 可能不可用”

问题描述

我正在开发一个 POC,用于在我的项目中实现一个 kafka 集群。我已经在我的本地机器上设置了一个带有 3 个代理的 kafka 集群。现在我正在使用 Spring MVC REST 服务向 Kafka 服务器发送消息,该服务在内部使用 Spring Kafka 来生成和使用来自 Kafka 集群的消息。现在,当代理关闭时,当消费者无法从主题接收消息时,我正在尝试发送警报。我关闭了消费者连接的唯一代理。我的日志中没有任何异常,但我收到了以下警告消息。

0:20:35.500 [TEST_GROUP-0-C-1] WARN o.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=TEST_GROUP] 无法建立到节点 2147483645 的连接。经纪人可能不可用。

是否可以捕获此警告消息,以便在我的消费者断开连接时发送警报?以下是我的消费者代码。

private static final Logger LOGGER = LoggerFactory.getLogger(ListenerServiceImpl.class);
    @Autowired
    Dao<RnMessage> messageDao;
    @Autowired
    MessageService messageService;

    @KafkaListener(id = "TEST_GROUP", topics = "TESTQUEUE", errorHandler="eventQueueMessageListenerExceptionHandler")
    public void listenMessageInQueue(String msg) {

        try {
            //String str = new String(msg, "UTF-8");
            LOGGER.info("receiving payload='{}'", msg);
            messageDao.saveMessage(msg);
            messageService.sendMessageToOutQueue(msg);
        }catch(Exception e) {
            e.printStackTrace();
        }       
    }

标签: javaspringapache-kafkaspring-kafka

解决方案


当无法轮询代理时,消费者将抛出 NonResponsiveConsumerEvent。可以使用@EventListener 捕获此事件。

这是一个示例代码:

@EventListener()
public void eventHandler(NonResponsiveConsumerEvent event) {
    //When Kafka server is down, NonResponsiveConsumerEvent error is caught here.
    System.out.println("CAUGHT the event "+ event);
}

您可以在文档中查看更多详细信息。https://docs.spring.io/spring-kafka/reference/htmlsingle/#idle-containers

我在文档中特别提到了这一点。

如果代理无法访问(在撰写本文时),则消费者 poll() 方法不会退出,因此不会收到任何消息,并且无法生成空闲事件。为了解决这个问题,容器会发布一个 NonResponsiveConsumerEvent

一个有效的 JUnit 测试代码在这里 - https://github.com/shankarps/KafkaDemo/blob/master/src/test/java/com/kafkademo/stack/TestConsumerWithNonAvailableBroker.java


推荐阅读