首页 > 解决方案 > RabbitMQ - 为什么消费者从队列中消失

问题描述

我们的圣杯应用程序正在使用以下 RabbitMQ 客户端并使用 XML 配置

spring-integration-amqp:3.0.8.RELEASE
spring-rabbit:1.4.5.RELEASE

以下是出现问题的队列之一的 XML 配置:

    <integration:poller id="default" default="true" fixed-rate="2000"/>

    <rabbit:connection-factory id="rabbitConnectionFactory"
                               host="${rabbitmq.host}"
                               username="${rabbitmq.username}"
                               password="${rabbitmq.password}"
                               requested-heartbeat="30"/>

    <rabbit:admin connection-factory="rabbitConnectionFactory"/>

    <rabbit:template id="amqpTemplate"
                     connection-factory="rabbitConnectionFactory"/>
    <rabbit:queue id="batchUnspscQueue" name="${unspsc.batch.queue}"/>

    <integration:channel id="fromUnspscBatch">
        <integration:queue capacity="10" />
    </integration:channel>

    <amqp:inbound-channel-adapter channel="fromUnspscBatch" concurrent-consumers="1" queue-names="${unspsc.batch.queue}" acknowledge-mode="NONE" channel-transacted="false" connection-factory="rabbitConnectionFactory" />
    <integration:service-activator
            id="erpUnspscBatchSubscriber"
            ref="erpMessageHandlerService"
            method="performUnspscBatchUpdate"
            input-channel="fromUnspscBatch"
            output-channel="nullChannel"/>

在生产中,我们注意到当“unspsc.batch.queue”负载过重时(即输入消息超过通道内部队列容量,即 10),突然消费者开始从队列中消失(使用 RabbitMQ 管理 UI 注意到)

看起来连接正在重置,但奇怪的是连接到同一连接的所有其他队列的消费者收到关闭消息然后重新启动,但连接到这个繁忙的特定队列的消费者没有重新启动;消息停止处理此队列并开始在代理队列中排队。

现在,每当我们处于这种情况下,我们都需要重新启动应用程序。有时我们需要多次重启应用程序,因为消费者处理了一些消息然后又消失了。

注意到其他重新启动正常的队列消费者的以下日志消息:

02.02-07:10:21.202 [SimpleAsyncTaskExecutor-3] INFO  org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer -  Restarting Consumer: tags=[{amq.ctag-OaL0P3G4j0VQKGQ16lGlBg=background-worker.queue}], channel=Cached Rabbit Channel: AMQChannel(amqp://wluser@10.6.10.4:5672/,44), acknowledgeMode=NONE local queue size=0
...
02.02-07:10:21.200 [SimpleAsyncTaskExecutor-2] WARN  org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer -  Consumer raised exception, processing can restart if the connection factory supports itcom.rabbitmq.client.ShutdownSignalException: connection error

我们能够在开发环境中复制它,当我们将容量更改为一个巨大的数字时,它不会给我们这个问题。因为消费者能够带来所有排队的消息,然后按照自己的节奏进行处理。但这会对应用程序 cpu /ram 产生副作用,如果服务器在中间重新启动,所有消息都将丢失,因为 ACK 模式设置为 NONE。

    <integration:channel id="fromUnspscBatch">
        <integration:queue capacity="100000" />
    </integration:channel>

对此的任何帮助或见解将不胜感激。

1:我们是否缺少任何配置来处理场景,即当容量保持较少(例如10)并且代理队列中有大量消息(比如10000)时?

https://docs.spring.io/spring-integration/docs/1.0.x/reference/html/ch03s02.html注意到followign 行。当通道内部队列达到其容量时是否有任何副作用?

If the queue has reached capacity, then the sender will block until room is available

2:还有什么想法为什么附加到同一连接的所有其他队列的消费者收到关闭消息然后重新启动但附加到这个繁忙的特定队列的消费者没有重新启动?

标签: javaspringrabbitmq

解决方案


推荐阅读