首页 > 解决方案 > Spring - Rabbitmq 侦听器需要在数据库更改期间暂停和恢复

问题描述

嘿,要求是在更改后端表期间暂停 rabbitmq 侦听器处理消息。此更改仅限于我的应用程序,因此不想关闭整个 rabbitmq 实例。一旦这个过程完成,我想再次启动听众。

我面临的问题 我有 2 个侦听器连接到 2 个共享“consumerconnectionFactory”的单独队列。当我终止连接时,只有没有任何开放通道的连接被终止,当我恢复连接时,我得到了一个之前不存在的额外连接。你能帮忙吗?

我在下面分享我的 java 配置。

@Bean
    public SimpleMessageListenerContainer auditMessageListenerContainer(AuditMessageListener auditMessageListener)
    {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(consumerConnectionFactory);
        container.setQueueNames(messagingAuditQueue);
        container.setMessageListener(auditMessageListener);
        container.setMaxConcurrentConsumers(5);
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        container.setDefaultRequeueRejected(false);
        container.setMissingQueuesFatal(false);
        container.setForceCloseChannel(true);
        container.setExclusive(false);
        return container;
    }
    @Bean
    public SimpleMessageListenerContainer accessMessageListenerContainer(AccessLogListener accessLogListener)
    {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(consumerConnectionFactory);
        container.setQueueNames(accessAuditQueue);
        container.setMessageListener(accessLogListener);
        container.setMaxConcurrentConsumers(5);
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        container.setDefaultRequeueRejected(false);
        container.setMissingQueuesFatal(false);
        container.setForceCloseChannel(true);
        container.setExclusive(false);
        return container;
    }

这就是我为侦听器进行 Java 配置的方式。

下面是启动和停止监听器的 RestController

@RestController
@RequestMapping(MESSAGE_AUDIT_ROOT)
public class RestartController {
    @Autowired
    private List<MessageListenerContainer> listenerContainers;

    @Autowired
    private List<ConnectionFactory> connectionFactories;

    @GetMapping("/stop")
    public String stopMessageListenerContainer() {
        connectionFactories.forEach(conFactory -> {
            CachingConnectionFactory cConFactory = (CachingConnectionFactory) conFactory;
            cConFactory.resetConnection();
        });
        listenerContainers.forEach(container -> {
            SimpleMessageListenerContainer smlc = (SimpleMessageListenerContainer) container;
            smlc.shutdown();
        });
        listenerContainers.forEach(container -> System.out
                .println("Container: " + container.toString() + "is Running ?" + container.isRunning()));
        return "done - stop";
    }

    @GetMapping("/start")
    public String startMessageListenerContainer() {
        connectionFactories.forEach(conFactory -> {
            CachingConnectionFactory cConFactory = (CachingConnectionFactory) conFactory;
            cConFactory.createConnection();
        });
        listenerContainers.forEach(container -> {
            SimpleMessageListenerContainer smlc = (SimpleMessageListenerContainer) container;
            smlc.start();
        });
        listenerContainers.forEach(container -> System.out
                .println("Container: " + container.toString() + "is Running ?" + container.isRunning()));
        return "done - start";
    }

}

下面是我在本地看到的行为的图像。1.初始连接列表 初始连接列表

  1. 当连接停止休息时 在此处输入图像描述

2.1 队列连接仍处于活动状态 在此处输入图像描述 3. 连接开始时 Rest Call 在此处输入图像描述

标签: javarabbitmqspring-rabbit

解决方案


使用默认缓存模式 (CHANNEL),任何时候都应该只有一个连接,除非您将 a 配置为RabbitTemplate设置usePublisherConnection为 true,在这种情况下,连接名称将为api-audit.publisher.

由于您与 name 有两个联系api-audit,因此发生了一些非常奇怪的事情。我怀疑您以某种方式加载了两个连接工厂,也许一个在子应用程序上下文中?您不能在单个应用程序上下文中拥有两个具有相同名称的 bean。

即,您正在调用resetConnection其中一个,而不是另一个。

我建议您设置一个断点createConnection以查看谁在使用第二个 CF。

顺便说一句,你真的应该在容器停止后重置连接;否则容器将进入恢复模式并可能重新打开连接,具体取决于时间。


推荐阅读