spring - @RabbitListener(已设置 id)未向 RabbitListenerEndpointRegistry 注册
问题描述
我正在使用@RabbitListener
来自 RabbitMQ 的消息。我希望能够根据某个阈值暂停/恢复消息消费过程。
根据这个和这个SO 帖子,我可以使用RabbitListenerEndpointRegistry
和获取容器列表,并根据需要暂停/恢复消费。我也明白,为了能够注册到 RabbitListenerEndpointRegistry,我需要为注释指定一个id 。@RabbitListener
但是,我在@RabbitListener
注释中添加了一个 id,但仍然RabbitListenerEndpointRegistry.getListenerContainers()
返回一个空集合。
我不确定,可能是什么问题导致我无法获得 ListenerContainers 集合。
现在我创建了SimpleRabbitListenerContainerFactory
. (我使用SimpleRabbitListenerContainerFactory
是因为我需要从不同的队列中消费)
public static SimpleRabbitListenerContainerFactory getSimpleRabbitListenerContainerFactory(
final ConnectionFactory connectionFactory,
final Jackson2JsonMessageConverter jsonConverter,
final AmqpAdmin amqpAdmin,
final String queueName, final String bindExchange,
final String routingKey, final int minConsumerCount,
final int maxConsumerCount, final int msgPrefetchCount,
final AcknowledgeMode acknowledgeMode) {
LOGGER.info("Creating SimpleRabbitListenerContainerFactory to consume from Queue '{}', " +
"using {} (min), {} (max) concurrent consumers, " +
"prefetch count set to {} and Ack mode is {}",
queueName, minConsumerCount, maxConsumerCount, msgPrefetchCount, acknowledgeMode);
/**
* Before creating the connector factory, ensure that the model with appropriate
* binding exists.
*/
createQueueAndBind(amqpAdmin, bindExchange, queueName, routingKey);
// Create listener factory
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
// set connection factory
factory.setConnectionFactory(connectionFactory);
// set converter
factory.setMessageConverter(jsonConverter);
// set false so that if model is missing the app will not crash
factory.setMissingQueuesFatal(false);
// set min concurrent consumer
factory.setConcurrentConsumers(minConsumerCount);
// set max concurrent consumer
factory.setMaxConcurrentConsumers(maxConsumerCount);
// how many message should be fetched in a go
factory.setPrefetchCount(msgPrefetchCount);
// set acknowledge mode to auto
factory.setAcknowledgeMode(acknowledgeMode);
// captures the error on consumer side if any error
factory.setErrorHandler(errorHandler());
return factory;
}
方法上的@RabbitListener 注释声明
@RabbitListener(queues = "${object.scan.queue-name}",
id = "object-scan",
containerFactory = "object.scan")
解决方案
/**
* Before creating the connector factory, ensure that the model with appropriate
* binding exists.
*/
createQueueAndBind(amqpAdmin, bindExchange, queueName, routingKey);
您不应该在 bean 定义阶段这样做;在应用程序上下文的生命周期中还为时过早。
您需要做的就是将 queue/exchange/binding 添加@Bean
到应用程序上下文中,Spring (RabbitAdmin) 将在容器首次启动时自动进行声明。
推荐阅读
- css - 反应路由器中的activeClassName同时突出显示两个NavLink
- if-statement - Google Script App删除Google工作表中特定列中具有特定值的重复行
- r - 以变量值为条件的lappy
- elasticsearch - 从术语列表中匹配至少 1 个术语的正确方法
- c# - 会话在更改其 ID 时丢失键和值
- javascript - VS 代码为任何项目显示 javascript 和 typescript 帮助(智能感知)两次
- c++ - 内部方法但循环仍然得到预期的 unqualified-id 错误 C++
- mysql - 如何将我制作的应用程序部署到服务器?
- javascript - 过滤列脚本谷歌表
- javascript - 使用JS函数代替重复代码