java - Spring Boot Kafka 应用程序未收到消息
问题描述
我们正在使用 Spring boot 1.5.4.RELEASE,我可以在我们的 gradle 项目中看到以下 jar:spring-kafka-1.1.6、kafka-clients-0.10.1.1
我们有一个简单的接收器类,如下所示:
`
@KafkaListener(topics = "${mytopic.consume}")
public void receive(byte[] event) {
<our code here>...
}
//$mytopic.consume is being picked up from properties file
`
该代码在大多数日子里工作正常,我能够正确接收事件并处理它们,然后突然停止接收消息。没有错误,没有警告。我可以看到有关该主题的消息,但我们的听众似乎没有使用它们。我尝试重新启动应用程序,重新启动 eclipse,在运行模式和调试模式之间切换,更改 kafka 属性中的组名称 - 但没有任何效果。我们在几个小时后查看应用程序,然后您就可以了 - 它再次开始工作,我们收到了所有被卡住的消息。有时问题也可能持续一天以上。我的怀疑不是日食问题,而是企业 Kafka 的设置方式或 Kafka 中的一些错误(可能性较小,因为我在谷歌上找不到这样的问题)。
任何指针将不胜感激。
以下是我们在应用程序中配置的一些属性(可能感兴趣):`
acks=all
retries=0
batch.size=<a number greater than 10k>
linger.ms=1
buffer.memory=<a number in 10s of MBs - our incoming message is just few KBs>
ENABLE_AUTO_COMMIT_CONFIG=true
AUTO_COMMIT_INTERVAL_MS_CONFIG=1ms
SESSION_TIMEOUT_MS_CONFIG=30ms
zookeeper.sync.time=200ms
zookeeper.session.timeout=400ms
`
解决方案
配置没有明显错误。我建议你退后一步,重新开始。创建一个新的消费者组。潜在地,也可以从头开始编写代码。
做一个消费者组中有不同数量的消费者的测试,注意消费者应该等于或小于分区数。使用不同数量的消息进行测试,看看效果如何。同时,还要注意 Kafka 集群,是否有任何代理离开集群,这会改变分区的领导者。最终,你可以停止一些消费者,看看再平衡如何影响你的消费者。
希望这可以帮助。
推荐阅读
- visual-studio - 在条件“$(MSBuildVersion) >= 16.1.0”) 下,对“$(MSBuildVersion)”进行了数值比较,计算结果为“”而不是数字
- deadlock - ExternalTaskOperators 死锁:为什么,多久一次,以及如何解决这个问题?
- c - emxFree_real_T 导致可执行文件中止
- java - 在小程序中加载时,清单中的 Class-Path 属性会导致烦人的 FileNotFoundException
- python - 在不将其转换为 Pandas 的情况下绘制 Spark Dataframe 的方法
- visual-studio - 无法在 macOS 上构建 VS 项目
- javacard - 如何获取 ACOSJ Java Card 的传输密钥?
- java - .jsp 动态表创建
- plugins - 更新到 v5 后,TinyMCE 中的列表插件无法初始化
- excel - 如何在不同工作簿上的 Excel 中的两个数据范围内进行 sumif 部分文本匹配