首页 > 解决方案 > 是否可以在 java 中使用 RabbitMQ 从配置中创建多个队列和交换?

问题描述

我有一个配置:

rabbitQueueProps: 
    myQueue1:
      routingKey: route1
      exchangeName: myExchange
      exchangeType: DIRECT
      maxPriority: 10
    myQueue2:
      routingKey: route2
      exchangeName: myExchange
      exchangeType: DIRECT
      maxPriority: 7

对于上述配置,我想动态创建交换和队列。

我已经尝试过了,但没有创建任何交换或队列。

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;



import lombok.extern.slf4j.Slf4j;

@Slf4j
@Component
public class DynamicQueueSetUp {

    @Autowired
    private RabbitAdmin rabbitAdmin;

    @Autowired
    RabbitAdminConfiguration rabbitTemplate;

    @Autowired
    private RabbitQueueConfiguration rabbitQueueConfig;

    @EventListener(ApplicationReadyEvent.class)
    public void init() {
        log.info("validating/creating new queues");
        Map<String, RabbitProps> rabbitQueueprops = rabbitQueueConfig.getRabbitQueueProps();

        rabbitQueueprops.keySet().parallelStream().forEach(queueName -> {
            RabbitProps rabbitProps = rabbitQueueConfig.getRabbitQueueProps().get(queueName);
            Optional<Properties> queueProps = Optional.ofNullable(rabbitAdmin.getQueueProperties(queueName));
            if (!queueProps.isPresent()) {
                log.info("rabbitProps {} , queueName {} ", rabbitProps, queueName);
                Map<String, Object> args = new HashMap<>();
                args.put("x-max-priority", rabbitProps.getMaxPriority());
                if (DIRECT.equalsIgnoreCase(rabbitProps.getExchangeName())) {
                    DirectExchange directExchange = new DirectExchange(rabbitProps.getExchangeName(), true, false);
                    Queue queue = new Queue(queueName, true, false, false, args);
                    rabbitAdmin.declareQueue(queue);
                    rabbitAdmin.declareExchange(directExchange);
                    Binding binding = BindingBuilder.bind(queue).to(directExchange).with(rabbitProps.getRoutingKey());
                    rabbitAdmin.declareBinding(binding);
                }

            }
        });
    }

    @RabbitListener(queues = { "myQueue1", "myQueue2" })
    public void listen(String in) {
        System.out.println(in);
    }
}

是否可以从配置中动态创建队列和交换,或者我是否需要为每个条目单独声明交换、队列和绑定 @Bean?

标签: javaspring-bootrabbitmq

解决方案


无法在本机配置中声明Declarable诸如Exchange,Queue之类的对象。Binding

你能做的最好的是使用Declarables

@Bean
public Declarables declarables() {
    return new Declarables(
        new DirectExchange("exchangeName", false, true),
        new Queue("queueName", false, false, true),
        new Binding("destination", DestinationType.QUEUE, "exchange", "routingKey", null));
}

在 Java 代码中完全声明您在 RabbitMQ 上的 AMQP 对象。但是你仍然需要 fetch"queueName""exchangeName"从 config. 所以我建议创建辅助类如下:

@Configuration
public QueueHelper {

    @Value("my.queue.names") private List<String> queueNames;

    @Bean
    public Declarables queueDeclarable() {
        Declarables declarables = new Declarables();
        declarables.addAll(queueNames.stream().map(queueName -> new Queue(queueName, false, false, true)).collect(Collectors.toList());
        return declarables;
    }
} 

因此,无需更改代码,只需修改my.queue.names配置中的内容,您就可以通过配置动态定义队列。

所以如果你有my.queue.names=queue1,queue1,你会得到两个队列;queue1queue2,但如果你有my.queue.names=queue1,queue2,queue3,你会得到三个名为queue1queue2的队列queue3


推荐阅读