首页 > 解决方案 > Spring rabbitmq 任务队列并发

问题描述

我有一个使用代理发出 http 请求的任务队列。代理限制为 10 个并发线程/连接。我无权访问代理的日志。

我正在使用以下代码,这是在名为ntContainer#1-1和的两个线程上发出请求container1。这导致许多请求由于使用过多的代理连接而出错。

侦听器是否仅使用 1 个默认线程和额外的容器线程,还是使用 spring/rabbitmq 在幕后进行更多操作?另外我该如何进一步调试呢?

@Configuration
public class RabbitMQConfig {

    public final static String EXCHANGE_NAME = "my-tx";
    public final static String MY_PRODUCT_ROUTING_KEY = "my-product-routing-key";
    public final static String MY_PRODUCT_QUEUE = "my-product";

    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(EXCHANGE_NAME);
    }

    @Bean
    public Queue myProductQueue() {
        return new Queue(MY_PRODUCT_QUEUE);
    }

    @Bean
    Binding myProductBinding() {
        return BindingBuilder.bind(myProductQueue()).to(topicExchange()).with(MY_PRODUCT_ROUTING_KEY);
    }

    @Bean
    SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter messageListenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(MY_PRODUCT_QUEUE);
        container.setMessageListener(messageListenerAdapter);
        container.setPrefetchCount(1);
        container.setConcurrentConsumers(1);
        return container;
    }

    @Bean
    MessageListenerAdapter messageListenerAdapter(MyListener myListener) {
        return new MessageListenerAdapter(myListener, "process");
    }
}

// 监听器

@RabbitListener(queues = RabbitMQConfig.MY_PRODUCT_QUEUE)
public void process(final Message message) {
    // something like this
    Jsoup.connect(message.getUrl()).proxy().execute()
}

标签: spring-bootrabbitmqspring-rabbit

解决方案


哎呀;我正在看手机上的问题;我跳过了容器豆;我认为容器 bean 是容器工厂而不是容器。

您有 2 个侦听器容器 -

@RabbitListener(queues = RabbitMQConfig.MY_PRODUCT_QUEUE)
public void process(final Message message) {
    // something like this
    Jsoup.connect(message.getUrl()).proxy().execute()
}

框架将自动为该侦听器创建一个容器(它检测注释),并且您已显式声明另一个容器@Bean

代理限制为 10 个并发线程/连接。

即使有 2 个容器,你也只会得到 2 个线程,而不是 10 个。


推荐阅读