首页 > 解决方案 > ActiveMQ 消费者未能在请求队列上消费消息

问题描述

过程:

应用程序 A 在启动时创建与远程 Active MQ 代理中的“请求队列”的消费者连接。应用 B 将请求推送到“REQUEST QUEUE”,应用 A 也使用该请求。应用 A 向另一个队列“RESPONSE QUEUE”产生响应。

应用 A 使用 ActiveMQ 连接工厂和 Spring DMLC 来使用消息。

我从 Spring DMLC 中了解到的是,它不断地在队列上轮询消息。

问题:

应用 A 最初在应用重启时使用请求。但是当第二天发出另一个请求时,无法第二次使用该请求。甚至消费者在第二个请求到达时也会失败。

到目前为止尝试过:

该问题仅存在于 LIVE 中。根据观察,消费者在一段时间(30 分钟)后无法处理/接收传入消息。我尝试在其他环境中使用相同的代理设置,但不幸的是无法重新生成。我已经尝试了所有其他链接,但都是徒劳的。

寻找指针:

我使用 ActiveMQ 连接进行消费和生产的方式是否存在根本性错误?

有关调试问题时要注意的配置的可能指针。

代码:

消息轮询器配置代码:

@Configuration
@EnableJms
@ComponentScan
@Slf4j
public class MessagePoller
{
    @Value("${outbound.queue}")
    private String outboundQueue;

    private final BrokerProperties brokerProperties;

    @Autowired
    public MessagePoller(BrokerProperties brokerProperties) {this.brokerProperties = brokerProperties;}

    @Bean(name = CONNECTION_FACTORY)
    @Primary
    public ActiveMQConnectionFactory connectionFactory()
    {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL(brokerProperties.getUrl());
        connectionFactory.setUserName(brokerProperties.getUser());
        connectionFactory.setPassword(brokerProperties.getPassword());
        return connectionFactory;
    }

    @Bean
    public JmsListenerContainerFactory jmsListenerContainerFactory()
    {
        DefaultJmsListenerContainerFactory factory =
                new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        factory.setConcurrency("1-1");
        factory.setErrorHandler(getErrorHandler());
        factory.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
        factory.setMessageConverter(new SimpleMessageConverter());
        factory.setPubSubDomain(false);
        return factory;
    }

    @Bean(name = OUTBOUND_JMS_TEMPLATE)
    public JmsTemplate outboudJmsTemplate(
            @Qualifier(CONNECTION_FACTORY)
                    ConnectionFactory connectionFactory)
    {
        JmsTemplate template = new JmsTemplate(connectionFactory);
        template.setPubSubDomain(false);
        template.setDefaultDestinationName(outboundQueue);
        return template;
    }

    private ErrorHandler getErrorHandler()
    {
        return exception -> log.error("Exception thrown from consumer", exception);
    }
}

消息侦听器代码:

@JmsListener(
        destination = "requestQueue",
        containerFactory = "jmsListenerContainerFactory"
)
public void onMessage(Message<String> jmsMessage)
{
    log.info("TriggerJobOnRequestService.onMessage() starts");
    log.info("Incoming request message: {}", jmsMessage);
    Integer deliveryCount = jmsMessage.getHeaders().get("JMSXDeliveryCount", Integer.class);

    log.info("Payload : {}", jmsMessage.getPayload());
    log.info("Headers : {}", jmsMessage.getHeaders());
    log.info("Delivery Count : {}", deliveryCount);
    //Processing Code logic
    log.info("TriggerJobOnRequestService.onMessage() ends");
}

ActiveMQ 连接地址:

spring.activemq.url=failover://(tcp://mqueue05.net:61616,tcp://mqueue06.net:61616,tcp://mqueue07.net:61616,tcp://mqueue08.net:61616)?randomize=false&timeout=100&initialReconnectDelay=1000&maxReconnectDelay=1000&maxReconnectAttempts=10

标签: javaspring-bootactivemqspring-jms

解决方案


推荐阅读