spring-integration - 使用 Spring Integration 和 ActiveMQ Artemis 队列发送消息
问题描述
我正在尝试使用 ActiveMQ Artemis 和 Spring Integration 从 Spring Boot 应用程序发送消息。在管理控制台中,我可以看到具有 7 个路由消息计数的地址和队列,但消息计数为 0,并且消费者没有收到消息:
在我的生产者中,我使用 JMS 出站适配器发送消息:
@Bean
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlows
.from(requests())
.transform(messageConverterQueue())
.log()
.handle(Jms.outboundAdapter(connectionFactory).destination(new ActiveMQQueue("requests")))
.get();
}
在日志中,我可以看到GenericMessage
带有有效负载和标头的代理没有错误。
对于消费者:
@Bean
public IntegrationFlow inboundFlowW(ActiveMQConnectionFactory connectionFactory) throws Exception {
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("requests"))
.transform(jsonToChunkRequestTransformer)
.handle(chunkProcessorChunkHandler())
.channel(replies())
.get();
}
以前我忘记启动消费者,这就是消费者计数为 0 的原因,但当它运行时它变成了 1。
这是跟踪:
DEBUG 19268 --- [erContainer#0-1] o.a.a.a.c.p.core.impl.ChannelImpl : RemotingConnectionID=48a825d5 Sending packet nonblocking PACKET(SessionForceConsumerDelivery)[type=78, channelID=11, responseAsync=false, requiresResponse=false, correlationID=-1, packetObject=SessionForceConsumerDelivery, consumerID=0, sequence=14] on channelID=11
DEBUG 19268 --- [erContainer#0-1] o.a.a.a.c.p.core.impl.ChannelImpl : RemotingConnectionID=48a825d5 Writing buffer for channelID=11
INFO 19268 --- [global-threads)] o.s.integration.handler.LoggingHandler : GenericMessage [payload={"jobId":348,...}, headers={id=c69469bd-8c5a-131f-ef61-cf6d86b308ca, json_resolvableType=org.springframework.batch.integration.chunk.ChunkRequest<?>, json__TypeId__=class org.springframework.batch.integration.chunk.ChunkRequest, contentType=application/json, timestamp=1635790793335}]
DEBUG 19268 --- [global-threads)] o.a.a.a.c.p.core.impl.ChannelImpl : RemotingConnectionID=1362aceb Sending blocking PACKET(SessionBindingQueryMessage)[type=49, channelID=11, responseAsync=false, requiresResponse=false, correlationID=-1, packetObject=SessionBindingQueryMessage, address=requests]
DEBUG 19268 --- [-netty-threads)] o.a.a.a.c.p.c.i.RemotingConnectionImpl : RemotingConnectionID=48a825d5 handling packet PACKET(SessionReceiveMessage)[type=75, channelID=11, responseAsync=false, requiresResponse=false, correlationID=-1, packetObject=SessionReceiveMessage, message=ClientMessageImpl[messageID=253545, durable=false, address=replies,userID=null,properties=TypedProperties[_hornetq.forced.delivery.seq=14]], consumerID=0, deliveryCount=0]
DEBUG 19268 --- [erContainer#0-1] o.a.a.a.c.c.impl.ClientConsumerImpl : org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl@5d2e71f5{consumerContext=ActiveMQConsumerContext{id=0}, queueName=replies}::There was nothing on the queue, leaving it now:: returning null
DEBUG 19268 --- [erContainer#0-1] o.a.a.a.c.c.impl.ClientConsumerImpl : org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl@5d2e71f5{consumerContext=ActiveMQConsumerContext{id=0}, queueName=replies}:: returning null
TRACE 19268 --- [erContainer#0-1] o.s.j.l.DefaultMessageListenerContainer : Consumer [ActiveMQMessageConsumer[org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl@5d2e71f5{consumerContext=ActiveMQConsumerContext{id=0}, queueName=replies}]] of session [ActiveMQSession->ClientSessionImpl [name=41d6e210-3b40-11ec-84ef-00ff1cbc4657, username=artemis, closed=false, factory = org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl@2d41bb5a, metaData=(jms-session=,)]@97a3bf2] did not receive a message
DEBUG 19268 --- [erContainer#0-1] o.a.a.a.c.client.impl.ClientSessionImpl : Sending commit
谁能告诉我这些消息发生了什么?为什么他们在队列中失踪了?
解决方案
推荐阅读
- functional-programming - 如何从 SML 中的文件中逐对读取?
- ios - 如何修复 Xcode 11.3.1 中出现的此构建时间错误 - Command CompileSwiftSources failed with a nonzero exit code
- java - 每次更新数组时如何向侦听器发出消息?
- typescript - 将背景更改为另一种颜色
- php - 如何在 Ubuntu 19.04 上安装 php7.4?
- javascript - 如何在testcafe中使用相同的选择器索引2个按钮
- paypal - PAYMENT.SALE.COMPLETED 未收到
- .net - 某些服务无法构建 - 在 Worker Service .Net Core 3.1(依赖注入)中验证服务描述符时出错
- google-sheets - 在后续查询中使用具有多个结果的查询 - Google 表格
- regex - Replacing a certain number of characters after a match in regular expression