rabbitmq - Spring AMQP RabbitListener 调整自动重连配置
问题描述
我试图更好地理解RabbitListener
经纪人倒闭和恢复时的行为。我已经通过 localhost:6672 在 docker 容器中设置了一只兔子,并创建了一个简单的 Spring Boot 应用程序来监听消息。还配置application.yml
为指向这只兔子。
@RabbitListener(containerFactory = "myListenerContainerFactory",
bindings = @QueueBinding(value = @Queue(value = "MY_QUEUE"), exchange = @Exchange(value = "MY_EXCHANGE"), key = "MY_KEY"))
public void onMessage(final Mesg message) {
//some handling
}
还定义了以下容器工厂
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory cf) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(cf);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setDefaultRequeueRejected(false);
factory.setConsumerTagStrategy(q -> "My App");
return factory;
}
当 rabbit 容器运行时,我启动了 spring boot 应用程序,它能够自动创建交换/队列,并且能够使用我通过 rabbit 管理控制台发布的消息。我在 spring boot 应用程序运行时强行移除了 rabbit 容器(docker rm my_instance -f
),它开始打印以下消息。
2020-05-26 17:03:15.097 ERROR 26816 --- [ 127.0.0.1:6672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: connection error
2020-05-26 17:03:15.614 INFO 26816 --- [cTaskExecutor-1] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@69176296: tags=[[My App]], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:6672/,1), conn: Proxy@522c56e8 Shared Rabbit Connection: SimpleConnection@7578dfd0 [delegate=amqp://guest@127.0.0.1:6672/, localPort= 65342], acknowledgeMode=AUTO local queue size=0
2020-05-26 17:03:15.618 INFO 26816 --- [cTaskExecutor-2] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: localhost:6672
2020-05-26 17:03:22.662 WARN 26816 --- [cTaskExecutor-2] o.s.a.r.l.SimpleMessageListenerContainer : Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused: connect
2020-05-26 17:03:22.663 INFO 26816 --- [cTaskExecutor-2] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@31978e0c: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
2020-05-26 17:03:22.663 INFO 26816 --- [cTaskExecutor-3] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: localhost:6672
2020-05-26 17:03:29.683 WARN 26816 --- [cTaskExecutor-3] o.s.a.r.l.SimpleMessageListenerContainer : Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused: connect
再次启动 rabbit 容器后,spring boot 应用程序似乎已经检测到它,但是它打印了以下消息并且我认为它没有建立成功的连接。查看日志,默认情况下似乎设置了 3 次重试,是否有可以用来增加限制的配置?
处理此类案件是否有任何最佳实践?我的应用程序预计将与不同的兔子集群连接,并且其中一个或另一个可能会继续维护,我需要一种在兔子集群备份后立即自动重新连接的方法。
2020-05-26 17:05:18.303 INFO 26816 --- [TaskExecutor-19] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@5053bc3c: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
2020-05-26 17:05:18.306 INFO 26816 --- [TaskExecutor-20] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: localhost:6672
2020-05-26 17:05:18.348 INFO 26816 --- [TaskExecutor-20] o.s.a.r.c.CachingConnectionFactory : Created new connection: myConnectionFactory#ebfe707:19/SimpleConnection@19d03268 [delegate=amqp://guest@127.0.0.1:6672/, localPort= 49260]
2020-05-26 17:05:18.396 WARN 26816 --- [TaskExecutor-20] o.s.a.r.listener.BlockingQueueConsumer : Failed to declare queue: MY_QUEUE
2020-05-26 17:05:18.402 WARN 26816 --- [TaskExecutor-20] o.s.a.r.listener.BlockingQueueConsumer : Queue declaration failed; retries left=3
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[MY_QUEUE]
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:710) ~[spring-rabbit-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.passiveDeclarations(BlockingQueueConsumer.java:594) ~[spring-rabbit-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:581) ~[spring-rabbit-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.initialize(SimpleMessageListenerContainer.java:1196) ~[spring-rabbit-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1041) ~[spring-rabbit-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: java.io.IOException: null
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:126) ~[amqp-client-5.4.3.jar:5.4.3]
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:122) ~[amqp-client-5.4.3.jar:5.4.3]
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:144) ~[amqp-client-5.4.3.jar:5.4.3]
at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:1006) ~[amqp-client-5.4.3.jar:5.4.3]
at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:52) ~[amqp-client-5.4.3.jar:5.4.3]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1110) ~[spring-rabbit-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at com.sun.proxy.$Proxy283.queueDeclarePassive(Unknown Source) ~[na:na]
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:689) ~[spring-rabbit-2.1.4.RELEASE.jar:2.1.4.RELEASE]
... 5 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'MY_QUEUE' in vhost '/', class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66) ~[amqp-client-5.4.3.jar:5.4.3]
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36) ~[amqp-client-5.4.3.jar:5.4.3]
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:494) ~[amqp-client-5.4.3.jar:5.4.3]
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:288) ~[amqp-client-5.4.3.jar:5.4.3]
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:138) ~[amqp-client-5.4.3.jar:5.4.3]
... 14 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'MY_QUEUE' in vhost '/', class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:516) ~[amqp-client-5.4.3.jar:5.4.3]
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:346) ~[amqp-client-5.4.3.jar:5.4.3]
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:178) ~[amqp-client-5.4.3.jar:5.4.3]
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:111) ~[amqp-client-5.4.3.jar:5.4.3]
at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:670) ~[amqp-client-5.4.3.jar:5.4.3]
at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:48) ~[amqp-client-5.4.3.jar:5.4.3]
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:597) ~[amqp-client-5.4.3.jar:5.4.3]
... 1 common frames omitted
2020-05-26 17:05:23.420 WARN 26816 --- [TaskExecutor-20] o.s.a.r.listener.BlockingQueueConsumer : Failed to declare queue: MY_QUEUE
2020-05-26 17:05:23.425 WARN 26816 --- [TaskExecutor-20] o.s.a.r.listener.BlockingQueueConsumer : Queue declaration failed; retries left=2
解决方案
reply-text=NOT_FOUND - no queue 'MY_QUEUE' in vhost '/'
您有一个MY_QUEUE
声明为auto-delete
并且看起来在连接建立期间没有它的娱乐。考虑将它作为一个 bean 并让它RabbitAdmin
在重新连接时处理它的创建。
在文档中查看更多信息:https ://docs.spring.io/spring-amqp/docs/2.2.7.RELEASE/reference/html/#broker-configuration
推荐阅读
- python-3.x - 网页抓取,需要进一步解析
- flutter - 文档照片中的文本可读
- bash - ncal 命令问题查找星期一数
- c - strtoull 是否总是返回有效数字或设置 errno?
- deployment - 尝试在生产中部署 apex 类时出现 INSUFFICIENT_ACCESS_ON_CROSS_REFERENCE_ENTITY
- r - rsample::bootstraps 是否存储数据而不仅仅是行索引?
- javascript - 为什么 document.getElementById 不起作用?
- ffmpeg - 如何调用特定的 FFMPEG 测试?
- checkbox - 使用Alpine Js关闭模态时如何取消选中复选框?
- arrays - 在 SwiftUI 中修改 ForEach 内的视图