首页 > 解决方案 > @RabbitListener(已设置 id)未向 RabbitListenerEndpointRegistry 注册

问题描述

我正在使用@RabbitListener来自 RabbitMQ 的消息。我希望能够根据某个阈值暂停/恢复消息消费过程。

根据这个这个SO 帖子,我可以使用RabbitListenerEndpointRegistry和获取容器列表,并根据需要暂停/恢复消费。我也明白,为了能够注册到 RabbitListenerEndpointRegistry,我需要为注释指定一个id 。@RabbitListener

但是,我在@RabbitListener注释中添加了一个 id,但仍然RabbitListenerEndpointRegistry.getListenerContainers() 返回一个空集合。

我不确定,可能是什么问题导致我无法获得 ListenerContainers 集合。

现在我创建了SimpleRabbitListenerContainerFactory. (我使用SimpleRabbitListenerContainerFactory是因为我需要从不同的队列中消费)

public static SimpleRabbitListenerContainerFactory getSimpleRabbitListenerContainerFactory(
        final ConnectionFactory connectionFactory,
        final Jackson2JsonMessageConverter jsonConverter,
        final AmqpAdmin amqpAdmin,
        final String queueName, final String bindExchange,
        final String routingKey, final int minConsumerCount,
        final int maxConsumerCount, final int msgPrefetchCount,
        final AcknowledgeMode acknowledgeMode) {

    LOGGER.info("Creating SimpleRabbitListenerContainerFactory to consume from Queue '{}', " +
                    "using {} (min), {} (max) concurrent consumers, " +
                    "prefetch count set to {} and Ack mode is {}",
            queueName, minConsumerCount, maxConsumerCount, msgPrefetchCount, acknowledgeMode);

    /**
     * Before creating the connector factory, ensure that the model with appropriate
     * binding exists.
     */
    createQueueAndBind(amqpAdmin, bindExchange, queueName, routingKey);

    // Create listener factory
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();

    // set connection factory
    factory.setConnectionFactory(connectionFactory);

    // set converter
    factory.setMessageConverter(jsonConverter);

    // set false so that if model is missing the app will not crash
    factory.setMissingQueuesFatal(false);

    // set min concurrent consumer
    factory.setConcurrentConsumers(minConsumerCount);

    // set max concurrent consumer
    factory.setMaxConcurrentConsumers(maxConsumerCount);

    // how many message should be fetched in a go
    factory.setPrefetchCount(msgPrefetchCount);

    // set acknowledge mode to auto
    factory.setAcknowledgeMode(acknowledgeMode);

    // captures the error on consumer side if any error
    factory.setErrorHandler(errorHandler());

    return factory;
}

方法上的@RabbitListener 注释声明

@RabbitListener(queues = "${object.scan.queue-name}",
        id = "object-scan",
        containerFactory = "object.scan")

标签: springrabbitmqspring-batchspring-amqpspring-rabbit

解决方案


/**
 * Before creating the connector factory, ensure that the model with appropriate
 * binding exists.
 */
createQueueAndBind(amqpAdmin, bindExchange, queueName, routingKey);

您不应该在 bean 定义阶段这样做;在应用程序上下文的生命周期中还为时过早。

您需要做的就是将 queue/exchange/binding 添加@Bean到应用程序上下文中,Spring (RabbitAdmin) 将在容器首次启动时自动进行声明。


推荐阅读