首页 > 解决方案 > 每隔一段时间从rabbitMQ队列中读取消息不起作用

问题描述

我想要实现的是每 15 分钟从 RabbitMQ 队列中读取消息。从文档中,我可以看到我可以使用“receiveTimeout”方法来设置间隔。

轮询消费者

AmqpTemplate 本身可用于轮询消息接收。默认情况下,如果没有消息可用,则立即返回 null。没有阻塞。从 1.5 版开始,您可以设置一个 receiveTimeout,以毫秒为单位,并且接收方法会阻塞很长时间,等待消息。

但是我尝试通过 sprint 集成来实现它,receiveTimeout 没有按我预期的那样工作。

我的测试代码如下。

@Bean
    Queue createMessageQueue() {
        return new Queue(RetryQueue, false);
    }

    @Bean
    public SimpleMessageListenerContainer QueueMessageListenerContainer(ConnectionFactory connectionFactory) {
        final SimpleMessageListenerContainer messageListenerContainer = new SimpleMessageListenerContainer(
                connectionFactory);
        messageListenerContainer.setQueueNames(RetryQueue);
        messageListenerContainer.setReceiveTimeout(900000);
        return messageListenerContainer;
    }

    @Bean
    public AmqpInboundChannelAdapter inboundQueueChannelAdapter(
            @Qualifier("QueueMessageListenerContainer") AbstractMessageListenerContainer messageListenerContainer) {
        final AmqpInboundChannelAdapter amqpInboundChannelAdapter = new AmqpInboundChannelAdapter(
                messageListenerContainer);
        amqpInboundChannelAdapter.setOutputChannelName("channelRequestFromQueue");
        return amqpInboundChannelAdapter;
    }

    @ServiceActivator(inputChannel = "channelRequestFromQueue")
    public void activatorRequestFromQueue(Message<String> message) {
        System.out.println("Message: " + message.getPayload() + ", recieved at: " + LocalDateTime.now());
    }

我正在近乎实时地将有效负载记录在控制台中。任何人都可以帮忙吗?消费者一旦开始活跃多久?

更新

IntegrationFlow 我曾经每隔一段时间从队列中检索消息,

@Bean
    public IntegrationFlow inboundIntegrationFlowPaymentRetry() {
        return IntegrationFlows
                .from(Amqp.inboundPolledAdapter(connectionFactory, RetryQueue),
                        e -> e.poller(Pollers.fixedDelay(20_000).maxMessagesPerPoll(-1)).autoStartup(true))
                .handle(message -> {
                    channelRequestFromQueue()
                            .send(MessageBuilder.withPayload(message.getPayload()).copyHeaders(message.getHeaders())
                                    .setHeader(IntegrationConstants.QUEUED_MESSAGE, message).build());
                }).get();
    }

标签: rabbitmqspring-integrationspring-amqp

解决方案


Polling Consumer文档来自关于`RabbitTemplate的Spring AMQP文档,与侦听器容器或Spring集成无关。

https://docs.spring.io/spring-amqp/docs/current/reference/html/#polling-consumer

Spring 集成的适配器是消息驱动的,只要消息可用,您就会收到消息。

要按需获取消息,您需要在所需的RabbitTemplate任何时间间隔调用。


推荐阅读