java - 检查Kafka Consumer是否没有任何记录要返回并且在java中为空的好方法?
问题描述
我正在使用 Apache KafkaConsumer。我想检查消费者是否有任何消息要返回而不进行轮询。如果我轮询消费者并且没有任何消息,那么即使我有一个records.isEmpty()
子句,我也会在无限循环中收到消息“尝试心跳失败,因为该组正在重新平衡”,直到超时到期。这是我的代码片段:
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));
if (records.isEmpty()) {
log.info("No More Records");
consumer.close();
}
else {
records.iterator().forEachRemaining(record -> log.info("RECORD: " + record);
);
这工作正常,直到records
为空。一旦为空,它会多次记录“Attempt to heartbeat failed since the group is rebalancing”,记录一次“No More Records”,然后继续记录心跳错误。我能做些什么来解决这个问题,我怎样才能优雅地检查(没有任何心跳消息)没有更多的记录要轮询?
编辑:我问了另一个问题,完整的代码和上下文在这个链接上:How to get messages from Kafka Consumer one by one in java?
提前致谢!
解决方案
出评论:“由于我有一个UI,并且想通过单击“接收”按钮来一条一条地接收消息,因此可能会出现没有更多消息要轮询的情况。”
在这种情况下,您需要在KafkaConsumer
每次有人单击“接收”按钮时创建一个新按钮,然后再将其关闭。
如果您想在客户端的生命周期内使用相同的 KafkaConsumer,您需要让代理知道它仍然活着(通过发送心跳,这通过调用 poll 方法隐式完成)。否则,正如您已经体验过的那样,代理认为您的 KafkaConsumer 已死,并将启动重新平衡。由于没有其他可用的活跃消费者,这种重新平衡不会停止。
推荐阅读
- flutter - 如何显示来自 AttachedPicture 的图像 | 扑
- c++ - 字符编码不一致(Win32 API)
- python - 叶列表中的杨枚举返回数字而不是字符串
- symfony - Doctrine.yaml" 不包含有效的 YAML:冒号不能用于未引用的映射值
- java - 有没有更快的方法在 Java 中使用 csv 阅读器?
- git - 撤消(即还原)旧的还原提交是否也会导致从历史记录中撤消另一个单独的还原提交?
- javascript - 如何使用三元运算符不向函数传递任何内容 - javascript
- python - 当他们共享同一个类并在其中找到带有漂亮汤的某个元素时,如何选择特定的“div”?
- python - Python Selenium ng-click 动作
- angular - 为什么代码覆盖率报告中不包含 rxjs pipe() 运算符?