首页 > 解决方案 > Spring Boot Kafka 应用程序未收到消息

问题描述

我们正在使用 Spring boot 1.5.4.RELEASE,我可以在我们的 gradle 项目中看到以下 jar:spring-kafka-1.1.6、kafka-clients-0.10.1.1

我们有一个简单的接收器类,如下所示:

`

   @KafkaListener(topics = "${mytopic.consume}")
    public void receive(byte[] event) {
        <our code here>...
    }

//$mytopic.consume is being picked up from properties file

`

该代码在大多数日子里工作正常,我能够正确接收事件并处理它们,然后突然停止接收消息。没有错误,没有警告。我可以看到有关该主题的消息,但我们的听众似乎没有使用它们。我尝试重新启动应用程序,重新启动 eclipse,在运行模式和调试模式之间切换,更改 kafka 属性中的组名称 - 但没有任何效果。我们在几个小时后查看应用程序,然后您就可以了 - 它再次开始工作,我们收到了所有被卡住的消息。有时问题也可能持续一天以上。我的怀疑不是日食问题,而是企业 Kafka 的设置方式或 Kafka 中的一些错误(可能性较小,因为我在谷歌上找不到这样的问题)。

任何指针将不胜感激。

以下是我们在应用程序中配置的一些属性(可能感兴趣):`

acks=all
     retries=0
     batch.size=<a number greater than 10k>
     linger.ms=1
     buffer.memory=<a number in 10s of MBs - our incoming message is just few KBs>
     ENABLE_AUTO_COMMIT_CONFIG=true
     AUTO_COMMIT_INTERVAL_MS_CONFIG=1ms
     SESSION_TIMEOUT_MS_CONFIG=30ms
     zookeeper.sync.time=200ms
     zookeeper.session.timeout=400ms

`

标签: javaspring-bootapache-kafka

解决方案


配置没有明显错误。我建议你退后一步,重新开始。创建一个新的消费者组。潜在地,也可以从头开始编写代码。

做一个消费者组中有不同数量的消费者的测试,注意消费者应该等于或小于分区数。使用不同数量的消息进行测试,看看效果如何。同时,还要注意 Kafka 集群,是否有任何代理离开集群,这会改变分区的领导者。最终,你可以停止一些消费者,看看再平衡如何影响你的消费者。

希望这可以帮助。


推荐阅读