java - 超时后收到回复
问题描述
我想使用最新的 Spring 版本为 Spring AMQP 注册两个队列侦听器:
@Bean
public SimpleMessageListenerContainer processingTransactionSaleContainer(ConnectionFactory cf, TransactionElavonSaleListener listener) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cf);
container.setQueueNames(QUEUE_PROCESSING_SALE);
container.setMessageListener(new MessageListenerAdapter(listener, "transactionSaleProcess"));
container.setMessageConverter(new SerializerMessageConverter());
return container;
}
@Bean
public SimpleMessageListenerContainer processingTransactionAuthorizeContainer(ConnectionFactory cf, TransactionAuthorizeListener listener) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cf);
container.setQueueNames(QUEUE_PROCESSING_AUTHORIZE);
container.setMessageListener(new MessageListenerAdapter(listener, "transactionAuthorizeProcess"));
container.setMessageConverter(new SerializerMessageConverter());
return container;
}
听众:
@Component
public class TransactionElavonSaleListener {
public TransactionResponseFactory transactionElavonSaleProcess(TransactionRequestFactory ro) {
..... do some heavy network request
return parseRawSuccessResponse(response);
}
}
当我删除其中一个 SimpleMessageListenerContainer 时,它是工作文件,但是当我使用这两种方法时,我得到了这个异常:
00:40:14,469 INFO [stdout] (pool-9-thread-5) 00:40:14.468 [pool-9-thread-5] WARN o.s.amqp.rabbit.core.RabbitTemplate - Reply received after timeout for 1
00:40:14,472 INFO [stdout] (pool-9-thread-5) 00:40:14.472 [pool-9-thread-5] WARN o.s.a.r.l.ConditionalRejectingErrorHandler - Execution of Rabbit message listener failed.
00:40:14,473 INFO [stdout] (pool-9-thread-5) org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
00:40:14,473 INFO [stdout] (pool-9-thread-5) at deployment.rest_api.war//org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1613)
00:40:14,473 INFO [stdout] (pool-9-thread-5) at deployment.rest_api.war//org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1517)
00:40:14,473 INFO [stdout] (pool-9-thread-5) at deployment.rest_api.war//org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1440)
00:40:14,473 INFO [stdout] (pool-9-thread-5) at deployment.rest_api.war//org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1428)
00:40:14,473 INFO [stdout] (pool-9-thread-5) at deployment.rest_api.war//org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1423)
00:40:14,473 INFO [stdout] (pool-9-thread-5) at deployment.rest_api.war//org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1372)
00:40:14,473 INFO [stdout] (pool-9-thread-5) at deployment.rest_api.war//org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.callExecuteListener(DirectMessageListenerContainer.java:995)
00:40:14,473 INFO [stdout] (pool-9-thread-5) at deployment.rest_api.war//org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.handleDelivery(DirectMessageListenerContainer.java:955)
00:40:14,474 INFO [stdout] (pool-9-thread-5) at deployment.rest_api.war//com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
00:40:14,474 INFO [stdout] (pool-9-thread-5) at deployment.rest_api.war//com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104)
00:40:14,474 INFO [stdout] (pool-9-thread-5) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
00:40:14,474 INFO [stdout] (pool-9-thread-5) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
00:40:14,478 INFO [stdout] (pool-9-thread-5) at java.base/java.lang.Thread.run(Thread.java:844)
00:40:14,481 INFO [stdout] (pool-9-thread-5) Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException: Reply received after timeout
00:40:14,494 INFO [stdout] (pool-9-thread-5) at deployment.rest_api.war//org.springframework.amqp.rabbit.core.RabbitTemplate.onMessage(RabbitTemplate.java:2446)
00:40:14,494 INFO [stdout] (pool-9-thread-5) at deployment.rest_api.war//org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer.lambda$setMessageListener$1(DirectReplyToMessageListenerContainer.java:115)
00:40:14,494 INFO [stdout] (pool-9-thread-5) at deployment.rest_api.war//org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1514)
00:40:14,494 INFO [stdout] (pool-9-thread-5) ... 11 common frames omitted
00:40:14,495 INFO [stdout] (pool-9-thread-5) 00:40:14.495 [pool-9-thread-5] ERROR o.s.a.r.l.DirectReplyToMessageListenerContainer - Failed to invoke listener
00:40:14,495 INFO [stdout] (pool-9-thread-5) org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
00:40:14,495 INFO [stdout] (pool-9-thread-5) at deployment.rest_api.war//org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1613)
00:40:14,495 INFO [stdout] (pool-9-thread-5) at deployment.rest_api.war//org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1517)
00:40:14,496 INFO [stdout] (pool-9-thread-5) at deployment.rest_api.war//org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1440)
00:40:14,496 INFO [stdout] (pool-9-thread-5) at deployment.rest_api.war//org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1428)
00:40:14,496 INFO [stdout] (pool-9-thread-5) at deployment.rest_api.war//org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1423)
00:40:14,496 INFO [stdout] (pool-9-thread-5) at deployment.rest_api.war//org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1372)
00:40:14,496 INFO [stdout] (pool-9-thread-5) at deployment.rest_api.war//org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.callExecuteListener(DirectMessageListenerContainer.java:995)
00:40:14,496 INFO [stdout] (pool-9-thread-5) at deployment.rest_api.war//org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.handleDelivery(DirectMessageListenerContainer.java:955)
00:40:14,496 INFO [stdout] (pool-9-thread-5) at deployment.rest_api.war//com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
00:40:14,496 INFO [stdout] (pool-9-thread-5) at deployment.rest_api.war//com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104)
00:40:14,496 INFO [stdout] (pool-9-thread-5) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
00:40:14,496 INFO [stdout] (pool-9-thread-5) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
00:40:14,497 INFO [stdout] (pool-9-thread-5) at java.base/java.lang.Thread.run(Thread.java:844)
00:40:14,497 INFO [stdout] (pool-9-thread-5) Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException: Reply received after timeout
00:40:14,497 INFO [stdout] (pool-9-thread-5) at deployment.rest_api.war//org.springframework.amqp.rabbit.core.RabbitTemplate.onMessage(RabbitTemplate.java:2446)
00:40:14,497 INFO [stdout] (pool-9-thread-5) at deployment.rest_api.war//org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer.lambda$setMessageListener$1(DirectReplyToMessageListenerContainer.java:115)
00:40:14,497 INFO [stdout] (pool-9-thread-5) at deployment.rest_api.war//org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1514)
00:40:14,497 INFO [stdout] (pool-9-thread-5) ... 11 common frames omitted
当我有这两种方法时,你知道为什么我会得到这个异常吗?
EDITL 定义的监听器:
@Bean
public SimpleMessageListenerContainer processingTransactionElavonSaleContainer(ConnectionFactory cf, TransactionElavonSaleListener listener) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cf);
container.setQueueNames(QUEUE_PROCESSING_ELAVON_SALE);
container.setMessageListener(new MessageListenerAdapter(listener, "transactionElavonSaleProcess"));
return container;
}
@Bean
public SimpleMessageListenerContainer processingTransactionElavonAuthorizeContainer(ConnectionFactory cf, TransactionElavonAuthorizeListener listener) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cf);
container.setQueueNames(QUEUE_PROCESSING_ELAVON_AUTHORIZE);
container.setMessageListener(new MessageListenerAdapter(listener, "transactionElavonAuthorizeProcess"));
return container;
}
...
@Component
public class TransactionElavonSaleListener {
public TransactionResponseFactory transactionElavonSaleProcess(TransactionRequestFactory ro) {
return new TransactionResponseFactory();
}
}
@Component
public class TransactionElavonAuthorizeListener {
public TransactionResponseFactory transactionElavonAuthorizeProcess(TransactionRequestFactory tf) {
TransactionResponseFactory obj = new TransactionResponseFactory();
return obj;
}
}
发送对象:
TransactionResponseFactory processingPeply = (TransactionResponseFactory) processingTransactionElavonAuthorizeTemplate.convertSendAndReceive(
ContextServer.EXCHANGE_PROCESSING, ContextServer.ROUTING_KEY_PROCESSING_TRANSACTION_ELAVON, tf);
System.out.println("!!!!! Received PROCESSING_TRANSACTION " + processingPeply.getTransaction_id());
解决方案
观察您的配置(但与您的问题无关)...
- 您的
declare...
代码AmqpAdmin
不是必需的。- 您永远不应该在 bean 定义阶段与代理进行交互——现在还为时过早。
- 它们不是必需的,因为管理员会在第一次打开连接时找到
Queue
和bean 并自动为您声明它们。Exchange
是的,该消息转换器设置器不用于这种侦听器;应该设置消息转换器MessageListenerAdapter
。
但是,SimpleMessageConverter
默认情况下它会得到一个,所以这不应该是问题;此转换器处理序列化对象以及纯文本。
现在,到你的实际问题;添加第二个容器不应对客户端产生任何影响;每个模板都有自己的回复容器,默认情况下使用直接回复,因此它们之间不会有串扰(如果您使用命名的回复队列,可能会发生这种情况,但这里不是这种情况)。
我建议你打开 DEBUG 日志来弄清楚发生了什么;如果您需要帮助分析它们;发布日志(从客户端和服务器端),我会看看。
编辑
您的绑定不正确:
@Bean
public Binding bindingQueueProcessingElavonSale() {
return BindingBuilder.bind(new Queue(QUEUE_PROCESSING_ELAVON_SALE, true))
.to(new TopicExchange(EXCHANGE_PROCESSING)).with(ROUTING_KEY_PROCESSING_TRANSACTION_ELAVON);
}
@Bean
public Binding bindingQueueProcessingElavonAuthorize() {
return BindingBuilder.bind(new Queue(QUEUE_PROCESSING_ELAVON_AUTHORIZE, true))
.to(new TopicExchange(EXCHANGE_PROCESSING)).with(ROUTING_KEY_PROCESSING_TRANSACTION_ELAVON);
}
您正在使用相同的路由键将两个队列绑定到同一个交换 - RabbitMQ 将使用该路由键将消息发送到两个队列,因此两个侦听器都会响应。
推荐阅读
- python - 使用 pyinstaller 将 .py 转换为 .exe 文件
- c++ - 如何将sizer限制在某个空间内?(wxWidgets)
- floating-point - ULP(最后一个单位)和量子(IEEE 754)之间的区别
- c++ - C++14。声明一个具有相同类型和固定长度参数列表的函数
- c++ - 如何从另一个类的静态成员函数调用一个类的非静态成员函数?
- flutter - 带有用户输入的颤振计时器
- sql - 用于计算滚动期间的周转率的 SQL 查询
- ubuntu - gdb 调用时不打印任何内容
- r - 逗号后只保留每个单词的第一个字母
- html - VBA 转 HTML 按回车键