java - Spring - Rabbitmq 侦听器需要在数据库更改期间暂停和恢复
问题描述
嘿,要求是在更改后端表期间暂停 rabbitmq 侦听器处理消息。此更改仅限于我的应用程序,因此不想关闭整个 rabbitmq 实例。一旦这个过程完成,我想再次启动听众。
我面临的问题 我有 2 个侦听器连接到 2 个共享“consumerconnectionFactory”的单独队列。当我终止连接时,只有没有任何开放通道的连接被终止,当我恢复连接时,我得到了一个之前不存在的额外连接。你能帮忙吗?
我在下面分享我的 java 配置。
@Bean
public SimpleMessageListenerContainer auditMessageListenerContainer(AuditMessageListener auditMessageListener)
{
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(consumerConnectionFactory);
container.setQueueNames(messagingAuditQueue);
container.setMessageListener(auditMessageListener);
container.setMaxConcurrentConsumers(5);
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
container.setDefaultRequeueRejected(false);
container.setMissingQueuesFatal(false);
container.setForceCloseChannel(true);
container.setExclusive(false);
return container;
}
@Bean
public SimpleMessageListenerContainer accessMessageListenerContainer(AccessLogListener accessLogListener)
{
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(consumerConnectionFactory);
container.setQueueNames(accessAuditQueue);
container.setMessageListener(accessLogListener);
container.setMaxConcurrentConsumers(5);
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
container.setDefaultRequeueRejected(false);
container.setMissingQueuesFatal(false);
container.setForceCloseChannel(true);
container.setExclusive(false);
return container;
}
这就是我为侦听器进行 Java 配置的方式。
下面是启动和停止监听器的 RestController
@RestController
@RequestMapping(MESSAGE_AUDIT_ROOT)
public class RestartController {
@Autowired
private List<MessageListenerContainer> listenerContainers;
@Autowired
private List<ConnectionFactory> connectionFactories;
@GetMapping("/stop")
public String stopMessageListenerContainer() {
connectionFactories.forEach(conFactory -> {
CachingConnectionFactory cConFactory = (CachingConnectionFactory) conFactory;
cConFactory.resetConnection();
});
listenerContainers.forEach(container -> {
SimpleMessageListenerContainer smlc = (SimpleMessageListenerContainer) container;
smlc.shutdown();
});
listenerContainers.forEach(container -> System.out
.println("Container: " + container.toString() + "is Running ?" + container.isRunning()));
return "done - stop";
}
@GetMapping("/start")
public String startMessageListenerContainer() {
connectionFactories.forEach(conFactory -> {
CachingConnectionFactory cConFactory = (CachingConnectionFactory) conFactory;
cConFactory.createConnection();
});
listenerContainers.forEach(container -> {
SimpleMessageListenerContainer smlc = (SimpleMessageListenerContainer) container;
smlc.start();
});
listenerContainers.forEach(container -> System.out
.println("Container: " + container.toString() + "is Running ?" + container.isRunning()));
return "done - start";
}
}
解决方案
使用默认缓存模式 (CHANNEL),任何时候都应该只有一个连接,除非您将 a 配置为RabbitTemplate
设置usePublisherConnection
为 true,在这种情况下,连接名称将为api-audit.publisher
.
由于您与 name 有两个联系api-audit
,因此发生了一些非常奇怪的事情。我怀疑您以某种方式加载了两个连接工厂,也许一个在子应用程序上下文中?您不能在单个应用程序上下文中拥有两个具有相同名称的 bean。
即,您正在调用resetConnection
其中一个,而不是另一个。
我建议您设置一个断点createConnection
以查看谁在使用第二个 CF。
顺便说一句,你真的应该在容器停止后重置连接;否则容器将进入恢复模式并可能重新打开连接,具体取决于时间。
推荐阅读
- java - 获取多个用户条目而不使用数组 java
- java - 我正在尝试更改实际 JTable 下区域的颜色
- javascript - 使用reactjs获取文件的路径而不是要上传的文件
- python - 在 Python 中使用 post 请求时数据丢失
- sql - postgresql给定“1,2,3,6,7,8,11,12,15,18,19,20”,查询返回每组连续数的最大值
- json - 使用 Stack Exchange API 获取用户级别的问答链接
- c++ - 类构造函数中的 C++ 指针语法导致 malloc 错误
- c - 为什么这段代码会导致无限循环?另外,我怎样才能完全重新开始我的号码棒球比赛?(我怎样才能重新生成随机数?)
- javascript - JS重新加载/再次返回对象值
- python - 从数据框的列中提取字符串并使用该字符串添加新列