首页 > 解决方案 > 需要一个监听器类来处理 Spring 流应用程序中发生的 RabbitMQ 通道关闭异常

问题描述

我们正在使用 Spring Cloud 数据流流应用程序和 RabbitMQ 作为消息代理。

在从源到接收器模块的整体流中,当我们看到给定流中的任何模块中发生“ChannelShutdown:连接错误”时,我们就会丢失数据。

Stream Example: Source | Transformer1 | transformer2 | transformer3 | sink

即任何 RabbitMQ 通道连接丢失,然后应用程序无法将数据传输到下一个模块/应用程序,从而导致数据丢失。

例外:

2019-02-18 15:29:41.364 ERROR 94489 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: connection error; 
2019-02-18 15:29:42.008  INFO 94489 --- [strationQueue-1] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@6adc5b9c: tags=[{amq.ctag-5dNneAd3QgwWADta7JAmQQ=employeeRegistrations.employeeRegistrationQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,1), conn: Proxy@a6e4897 Shared Rabbit Connection: SimpleConnection@22dc59b2 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 50775], acknowledgeMode=NONE local queue size=0
2019-02-18 15:29:42.010  INFO 94489 --- [strationQueue-2] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
2019-02-18 15:29:42.019  INFO 94489 --- [strationQueue-2] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#d611f1c:1/SimpleConnection@1782b48a [delegate=amqp://guest@127.0.0.1:5672/, localPort= 50864]

复制问题:我运行了两个spring cloud stream程序

  1. 生产者 - 将 100,000 条消息推送到 RabbitMQ 交换
  2. 消费者 - 接收器模块从该队列接收有效负载 [链接到交换] 并打印

为了在消费者程序日志中获得“通道关闭:连接错误”,我去了rabbitMQ UI页面并不断删除RabbitMQ UI页面中可用的连接。

在此处输入图像描述

最后,在这个过程中,消费者在 100,000 条消息中只收到了 98,484 条消息。因此,由于通道连接关闭,我们在传输过程中丢失了数据


我的问题 :

我们可以在 Spring 流应用程序中捕获或检测“通道关闭:连接错误”吗?

是否有任何 RabbitMQ 侦听器类可包含在流应用程序中以处理错误“通道关闭:连接错误”?

我遇到了 RabbitMQ 听众,比如

在 Stream 应用程序中使用 @RabbitListener 注解

Example:
@RabbitListener(queues = TEST_QUEUE)
    public void handle(Foo in) {
        logger.info("Received: " + in);
    }

但是此 RabbitMQ 侦听器仅侦听定义中指定的指定队列或绑定https://docs.spring.io/spring-amqp/api/org/springframework/amqp/rabbit/annotation/RabbitListener.html

我想知道是否有任何常见的 RabbitMQ 侦听器类来侦听通道连接而不是侦听特定队列。

所以我的问题是,是否有任何侦听器可用于检查任何通道[链接到当前应用程序]是否关闭,如果这样我可以通过在通道连接建立后将有效负载发送回下一个应用程序来处理数据丢失。

SimpleRabbitListenerContainerFactory在这种情况下对我有帮助吗?如果是这样,请告诉我解决由于通道关闭和连接丢失问题而导致的数据丢失问题的方法。

例子:

标签: springrabbitmqspring-amqpspring-cloud-streamspring-cloud-dataflow

解决方案


您可以使用 Spring Boot 属性在生产者中启用重试。文档在这里

向下滚动到 RabbitMQ

...
spring.rabbitmq.template.retry.enabled=false # Whether publishing retries are enabled.
spring.rabbitmq.template.retry.initial-interval=1000ms # Duration between the first and second attempt to deliver a message.
spring.rabbitmq.template.retry.max-attempts=3 # Maximum number of attempts to deliver a message.
spring.rabbitmq.template.retry.max-interval=10000ms # Maximum duration between attempts.
spring.rabbitmq.template.retry.multiplier=1 # Multiplier to apply to the previous retry interval.
...

但是,要回答您的问题,您可以ConnectionListener在连接工厂 bean 定义中添加一个。


推荐阅读