首页 > 解决方案 > 使用 Spring Integration 和 ActiveMQ Artemis 队列发送消息

问题描述

我正在尝试使用 ActiveMQ Artemis 和 Spring Integration 从 Spring Boot 应用程序发送消息。在管理控制台中,我可以看到具有 7 个路由消息计数的地址和队列,但消息计数为 0,并且消费者没有收到消息:

在此处输入图像描述

在我的生产者中,我使用 JMS 出站适配器发送消息:

@Bean
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
    return IntegrationFlows
            .from(requests())
            .transform(messageConverterQueue())
            .log()
            .handle(Jms.outboundAdapter(connectionFactory).destination(new ActiveMQQueue("requests")))
            .get();
}

在日志中,我可以看到GenericMessage带有有效负载和标头的代理没有错误。

对于消费者:

@Bean
public IntegrationFlow inboundFlowW(ActiveMQConnectionFactory connectionFactory) throws Exception  {
    return IntegrationFlows
            .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("requests"))
            .transform(jsonToChunkRequestTransformer)
            .handle(chunkProcessorChunkHandler())
            .channel(replies())
            .get();
}

以前我忘记启动消费者,这就是消费者计数为 0 的原因,但当它运行时它变成了 1。

这是跟踪:

 DEBUG 19268 --- [erContainer#0-1] o.a.a.a.c.p.core.impl.ChannelImpl        : RemotingConnectionID=48a825d5 Sending packet nonblocking PACKET(SessionForceConsumerDelivery)[type=78, channelID=11, responseAsync=false, requiresResponse=false, correlationID=-1, packetObject=SessionForceConsumerDelivery, consumerID=0, sequence=14] on channelID=11
 DEBUG 19268 --- [erContainer#0-1] o.a.a.a.c.p.core.impl.ChannelImpl        : RemotingConnectionID=48a825d5 Writing buffer for channelID=11
 INFO 19268 --- [global-threads)] o.s.integration.handler.LoggingHandler   : GenericMessage [payload={"jobId":348,...}, headers={id=c69469bd-8c5a-131f-ef61-cf6d86b308ca, json_resolvableType=org.springframework.batch.integration.chunk.ChunkRequest<?>, json__TypeId__=class org.springframework.batch.integration.chunk.ChunkRequest, contentType=application/json, timestamp=1635790793335}]
 DEBUG 19268 --- [global-threads)] o.a.a.a.c.p.core.impl.ChannelImpl        : RemotingConnectionID=1362aceb Sending blocking PACKET(SessionBindingQueryMessage)[type=49, channelID=11, responseAsync=false, requiresResponse=false, correlationID=-1, packetObject=SessionBindingQueryMessage, address=requests]
 DEBUG 19268 --- [-netty-threads)] o.a.a.a.c.p.c.i.RemotingConnectionImpl   : RemotingConnectionID=48a825d5 handling packet PACKET(SessionReceiveMessage)[type=75, channelID=11, responseAsync=false, requiresResponse=false, correlationID=-1, packetObject=SessionReceiveMessage, message=ClientMessageImpl[messageID=253545, durable=false, address=replies,userID=null,properties=TypedProperties[_hornetq.forced.delivery.seq=14]], consumerID=0, deliveryCount=0]
 DEBUG 19268 --- [erContainer#0-1] o.a.a.a.c.c.impl.ClientConsumerImpl      : org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl@5d2e71f5{consumerContext=ActiveMQConsumerContext{id=0}, queueName=replies}::There was nothing on the queue, leaving it now:: returning null
 DEBUG 19268 --- [erContainer#0-1] o.a.a.a.c.c.impl.ClientConsumerImpl      : org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl@5d2e71f5{consumerContext=ActiveMQConsumerContext{id=0}, queueName=replies}:: returning null
 TRACE 19268 --- [erContainer#0-1] o.s.j.l.DefaultMessageListenerContainer  : Consumer [ActiveMQMessageConsumer[org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl@5d2e71f5{consumerContext=ActiveMQConsumerContext{id=0}, queueName=replies}]] of session [ActiveMQSession->ClientSessionImpl [name=41d6e210-3b40-11ec-84ef-00ff1cbc4657, username=artemis, closed=false, factory = org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl@2d41bb5a, metaData=(jms-session=,)]@97a3bf2] did not receive a message
 DEBUG 19268 --- [erContainer#0-1] o.a.a.a.c.client.impl.ClientSessionImpl  : Sending commit

谁能告诉我这些消息发生了什么?为什么他们在队列中失踪了?

标签: spring-integrationactivemq-artemis

解决方案


推荐阅读