java - RabbitMQ - 为什么消费者从队列中消失
问题描述
我们的圣杯应用程序正在使用以下 RabbitMQ 客户端并使用 XML 配置
spring-integration-amqp:3.0.8.RELEASE
spring-rabbit:1.4.5.RELEASE
以下是出现问题的队列之一的 XML 配置:
<integration:poller id="default" default="true" fixed-rate="2000"/>
<rabbit:connection-factory id="rabbitConnectionFactory"
host="${rabbitmq.host}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
requested-heartbeat="30"/>
<rabbit:admin connection-factory="rabbitConnectionFactory"/>
<rabbit:template id="amqpTemplate"
connection-factory="rabbitConnectionFactory"/>
<rabbit:queue id="batchUnspscQueue" name="${unspsc.batch.queue}"/>
<integration:channel id="fromUnspscBatch">
<integration:queue capacity="10" />
</integration:channel>
<amqp:inbound-channel-adapter channel="fromUnspscBatch" concurrent-consumers="1" queue-names="${unspsc.batch.queue}" acknowledge-mode="NONE" channel-transacted="false" connection-factory="rabbitConnectionFactory" />
<integration:service-activator
id="erpUnspscBatchSubscriber"
ref="erpMessageHandlerService"
method="performUnspscBatchUpdate"
input-channel="fromUnspscBatch"
output-channel="nullChannel"/>
在生产中,我们注意到当“unspsc.batch.queue”负载过重时(即输入消息超过通道内部队列容量,即 10),突然消费者开始从队列中消失(使用 RabbitMQ 管理 UI 注意到)
看起来连接正在重置,但奇怪的是连接到同一连接的所有其他队列的消费者收到关闭消息然后重新启动,但连接到这个繁忙的特定队列的消费者没有重新启动;消息停止处理此队列并开始在代理队列中排队。
现在,每当我们处于这种情况下,我们都需要重新启动应用程序。有时我们需要多次重启应用程序,因为消费者处理了一些消息然后又消失了。
注意到其他重新启动正常的队列消费者的以下日志消息:
02.02-07:10:21.202 [SimpleAsyncTaskExecutor-3] INFO org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer - Restarting Consumer: tags=[{amq.ctag-OaL0P3G4j0VQKGQ16lGlBg=background-worker.queue}], channel=Cached Rabbit Channel: AMQChannel(amqp://wluser@10.6.10.4:5672/,44), acknowledgeMode=NONE local queue size=0
...
02.02-07:10:21.200 [SimpleAsyncTaskExecutor-2] WARN org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer - Consumer raised exception, processing can restart if the connection factory supports itcom.rabbitmq.client.ShutdownSignalException: connection error
我们能够在开发环境中复制它,当我们将容量更改为一个巨大的数字时,它不会给我们这个问题。因为消费者能够带来所有排队的消息,然后按照自己的节奏进行处理。但这会对应用程序 cpu /ram 产生副作用,如果服务器在中间重新启动,所有消息都将丢失,因为 ACK 模式设置为 NONE。
<integration:channel id="fromUnspscBatch">
<integration:queue capacity="100000" />
</integration:channel>
对此的任何帮助或见解将不胜感激。
1:我们是否缺少任何配置来处理场景,即当容量保持较少(例如10)并且代理队列中有大量消息(比如10000)时?
在https://docs.spring.io/spring-integration/docs/1.0.x/reference/html/ch03s02.html注意到followign 行。当通道内部队列达到其容量时是否有任何副作用?
If the queue has reached capacity, then the sender will block until room is available
2:还有什么想法为什么附加到同一连接的所有其他队列的消费者收到关闭消息然后重新启动但附加到这个繁忙的特定队列的消费者没有重新启动?
解决方案
推荐阅读
- mysql - hackerEarth 中的 mysql 中的预期输出不正确
- php - 如何在 PHP 7 中的另一个类中使用一个类的实例
- r - 自动添加斜线可防止使用 apa_table() 自定义乳胶代码
- java - 什么时候应该关闭 GRPC ManagedChannel?
- python - 如果它存在于python的txt文件中,则从字符串中删除一个单词
- node.js - mongodb中的deleteMany()不删除文档
- c# - 如何在 C# 中单击按钮时更改按钮颜色?
- ruby - Cocoapods RUBY 未定义的局部变量或方法
- python - Keras LSTM - “检查模型目标时出错:预期没有数据,但得到:”数据
- powerbi - 比率而不是现有值