首页 > 解决方案 > 检查Kafka Consumer是否没有任何记录要返回并且在java中为空的好方法?

问题描述

我正在使用 Apache KafkaConsumer。我想检查消费者是否有任何消息要返回而不进行轮询。如果我轮询消费者并且没有任何消息,那么即使我有一个records.isEmpty()子句,我也会在无限循环中收到消息“尝试心跳失败,因为该组正在重新平衡”,直到超时到期。这是我的代码片段:

ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));
if (records.isEmpty()) {
      log.info("No More Records");
      consumer.close();
    }
else {
      records.iterator().forEachRemaining(record -> log.info("RECORD: " + record);
);

这工作正常,直到records为空。一旦为空,它会多次记录“Attempt to heartbeat failed since the group is rebalancing”,记录一次“No More Records”,然后继续记录心跳错误。我能做些什么来解决这个问题,我怎样才能优雅地检查(没有任何心跳消息)没有更多的记录要轮询?

编辑:我问了另一个问题,完整的代码和上下文在这个链接上:How to get messages from Kafka Consumer one by one in java?

提前致谢!

标签: javaapache-kafkakafka-consumer-api

解决方案


出评论:“由于我有一个UI,并且想通过单击“接收”按钮来一条一条地接收消息,因此可能会出现没有更多消息要轮询的情况。”

在这种情况下,您需要在KafkaConsumer每次有人单击“接收”按钮时创建一个新按钮,然后再将其关闭。

如果您想在客户端的生命周期内使用相同的 KafkaConsumer,您需要让代理知道它仍然活着(通过发送心跳,这通过调用 poll 方法隐式完成)。否则,正如您已经体验过的那样,代理认为您的 KafkaConsumer 已死,并将启动重新平衡。由于没有其他可用的活跃消费者,这种重新平衡不会停止。


推荐阅读