spring-amqp - SimpleMessageListenerContainer 和 IllegalArgumentException
问题描述
我在尝试执行以下操作时收到“IllegalArgumentException”错误:
- 收到一条 AMQP 消息,迫使我停止“SimpleMessageListenerContainer”。
- 使用“直接回复”机制发出 AMQP 请求。
- 重新启动“SimpleMessageListenerContainer”
我有一个带有 SimpleMessageListenerContainer 的项目,在一个名为“USER-broadcast-queue”的队列中监听:
@Bean("broadcastMessageListenerContainer")
public SimpleMessageListenerContainer broadcastMessageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(simpleRoutingConnectionFactory());
container.setQueues(marketDataBroadcastQueue());
container.setMessageListener(messageListenerAdapter());
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
container.setDeclarationRetries(12);
container.setFailedDeclarationRetryInterval(5000);
container.setMismatchedQueuesFatal(true);
container.setPrefetchCount(50);
container.setAutoStartup(false);
clientHandler.setBroadcastMessageListenerContainer(container);
return container;
}
@Bean
public RabbitTemplate rabbitTemplate()
{
RabbitTemplate template = new RabbitTemplate(simpleRoutingConnectionFactory());
template.setMessageConverter(jsonMessageConverter());
template.setUseDirectReplyToContainer(true);
template.setRoutingKey(REQUEST_EXCHANGE_NAME);
template.setMandatory(true);
template.setReplyTimeout(20000);
return template;
}
我的 ClientHandler 是这样定义的 vean:
@Component
public class ClientHandler{
public void handleMessage(…) {….}
}
@Bean
public MessageListenerAdapter messageListenerAdapter() {
return new MessageListenerAdapter(clientHandler, jsonMessageConverter());
}
现在,当接收某种消息时,我需要停止容器:
broadcastMessageListenerContainer.stop();
从 RabbitGatewaySupport 向服务器发出请求(使用直接回复):
_response = getRabbitOperations().convertSendAndReceive(requestExchange, routingKeyInquiry, myObject,
new MessagePostProcessor() {
public Message postProcessMessage(Message message) throws AmqpException {
…
return message;
}
});
然后再次启动监听器:
broadcastMessageListenerContainer.initialize();
broadcastMessageListenerContainer.start();
这第一次工作正常,但是当我接收第二条消息时,当我尝试发送请求时出现此错误(convertSendAndReceive):
12-09-2019 13:54:13.826|DEBUG|USER|org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer|1233|[]|broadcastMessageListenerContainer-1|Shutting down Rabbit listener container
12-09-2019 13:54:13.826|TRACE| USER |org.springframework.amqp.rabbit.connection.CachingConnectionFactory|1063|[]|broadcastMessageListenerContainer-1|AMQChannel(amqp:// USER @127.0.0.1:5671/,3) channel.isOpen()
12-09-2019 13:54:13.826|TRACE| USER |org.springframework.amqp.rabbit.connection.CachingConnectionFactory|1063|[]|broadcastMessageListenerContainer-1|AMQChannel(amqp:// USER @127.0.0.1:5671/,3) channel.basicCancel([amq.ctag-6ztkTlFxReqUvLDJOsmmdQ])
12-09-2019 13:54:13.828|INFO | USER |org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer|586|[]|broadcastMessageListenerContainer-1|Waiting for workers to finish.
12-09-2019 13:54:13.828|DEBUG| USER |org.springframework.amqp.rabbit.listener.BlockingQueueConsumer|886|[]|pool-7-thread-9|Received cancelOk for tag amq.ctag-6ztkTlFxReqUvLDJOsmmdQ (USER -broadcast-queue); Consumer@5c20aab9: tags=[[amq.ctag-6ztkTlFxReqUvLDJOsmmdQ]], channel=Cached Rabbit Channel: AMQChannel(amqp://USER@127.0.0.1:5671/,3), conn: Proxy@46baf579 Shared Rabbit Connection: SimpleConnection@48b0e701 [delegate=amqp:// USER @127.0.0.1:5671/, localPort= 55757], acknowledgeMode=AUTO local queue size=0
12-09-2019 13:54:18.829|INFO | USER |org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer|592|[]|broadcastMessageListenerContainer-1|Workers not finished.
12-09-2019 13:54:18.829|WARN | USER |org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer|596|[]|broadcastMessageListenerContainer-1|Closing channel for unresponsive consumer: Consumer@5c20aab9: tags=[[amq.ctag-6ztkTlFxReqUvLDJOsmmdQ]], channel=Cached Rabbit Channel: AMQChannel(amqp:// USER@127.0.0.1:5671/,3), conn: Proxy@46baf579 Shared Rabbit Connection: SimpleConnection@48b0e701 [delegate=amqp:// USER @127.0.0.1:5671/, localPort= 55757], acknowledgeMode=AUTO local queue size=0
12-09-2019 13:54:18.829|DEBUG| USER |org.springframework.amqp.rabbit.listener.BlockingQueueConsumer|735|[]|broadcastMessageListenerContainer-1|Closing Rabbit Channel: Cached Rabbit Channel: AMQChannel(amqp:// USER @127.0.0.1:5671/,3), conn: Proxy@46baf579 Shared Rabbit Connection: SimpleConnection@48b0e701 [delegate=amqp:// USER@127.0.0.1:5671/, localPort= 55757]
12-09-2019 13:54:18.829|TRACE| USER |org.springframework.amqp.rabbit.connection.CachingConnectionFactory|1063|[]|broadcastMessageListenerContainer-1|AMQChannel(amqp:// USER @127.0.0.1:5671/,3) channel.close()
12-09-2019 13:54:18.830|DEBUG| USER |org.springframework.amqp.rabbit.connection.CachingConnectionFactory|1276|[]|broadcastMessageListenerContainer-1|Closing cached Channel: AMQChannel(amqp:// USER @127.0.0.1:5671/,3)
java.lang.IllegalArgumentException: Already value [[USER-broadcast-queue]] for key [org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory@a50b09c] bound to thread [broadcastMessageListenerContainer-1]
at org.springframework.util.Assert.isNull(Assert.java:176) ~[MyLib-0.0.5-SNAPSHOT.jar:?]
at org.springframework.amqp.rabbit.connection.SimpleResourceHolder.bind(SimpleResourceHolder.java:125) ~[ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer.doConsumeFromQueue(DirectMessageListenerContainer.java:659) ~[ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer.adjustConsumers(DirectMessageListenerContainer.java:313) ~[ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer.setConsumersPerQueue(DirectMessageListenerContainer.java:161) ~[ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
at org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer.getChannelHolder(DirectReplyToMessageListenerContainer.java:190) ~[ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceiveWithDirect(RabbitTemplate.java:1896) ~[ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceive(RabbitTemplate.java:1762) ~[ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
at org.springframework.amqp.rabbit.core.RabbitTemplate.convertSendAndReceiveRaw(RabbitTemplate.java:1731) ~[ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
at org.springframework.amqp.rabbit.core.RabbitTemplate.convertSendAndReceive(RabbitTemplate.java:1600) ~[ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
at org.springframework.amqp.rabbit.core.RabbitTemplate.convertSendAndReceive(RabbitTemplate.java:1591) ~[ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
at es.siom.trading.amqp.gateway.RabbitServiceGateway.sendAndReceiveInquiry(RabbitServiceGateway.java:123) ~[Salida/:?]
at es.siom.trading.amqp.ui.UIController.sendAndReceiveInquiry(UIController.java:67) ~[Salida/:?]
at es.siom.trading.utils.ModuloTradingUtils.consultaContratosMercadoPublic(ModuloTradingUtils.java:627) ~[Salida/:?]
at es.siom.trading.utils.ModuloTradingUtils.leeMercado(ModuloTradingUtils.java:133) ~[Salida/:?]
at es.siom.trading.amqp.beans.DatosGeneralesLTS.actualizaOfertas(DatosGeneralesLTS.java:602) ~[Salida/:?]
at es.siom.trading.amqp.handler.ClientHandler.handleMessage(ClientHandler.java:89) ~[Salida/:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_91]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_91]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_91]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_91]
at org.springframework.util.MethodInvoker.invoke(MethodInvoker.java:280) ~[ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:363) ~[ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:292) ~[ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1552) ~[ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1478) ~[ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1466) [ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1461) [ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1410) [ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:870) [ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:854) [ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:78) [ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1137) [ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1043) [ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
任何想法,任何帮助?谢谢你。
解决方案
推荐阅读
- regex - 如何使用正则表达式解析文本中的路径而不使用斜杠?
- css - 减小屏幕宽度时图像离开屏幕
- c# - 在验证 winforms 上的文本框时禁用 System.ComponentModel.DataAnnotations 的“字段锁定”
- python - 在 Python 中使用 PYODBC/SQLAlchemy 检索为长查询处理的行的状态
- python - Python网络抓取返回记录-股票更新
- in-app-purchase - 如何在手机上测试 Trivial Drive 示例
- python - 如何修复 pylance 语法突出显示在启用默认主题的 VSCode 中为 self 和 cls python 类参数显示错误颜色
- java - Spark - 在数据集的元素上运行一次地图功能
- typescript - 需要使用接口的属性来指定重载函数的类型
- javascript - 有没有办法在不使用行索引的情况下唯一地标识 Tabulator 中的嵌套行?