首页 > 解决方案 > 有没有办法限制生产者创建的频道?

问题描述

我们有一个应用程序可以非常快速地生成消息并且需要 ACK。

这是一个愚蠢的模拟:

    @Bean
    public CachingConnectionFactory ccf() {
        var ccf = new CachingConnectionFactory("localhost");
        ccf.setPublisherConfirmType(CORRELATED);
        ccf.setPublisherReturns(true);
        return ccf;
    }


    @Bean
    public ApplicationRunner run(RabbitTemplate template) {
        return args -> {
            for (int i = 1; i < 10001; i++) {
                template.convertAndSend("poc", "hey");
                if (i % 1000 == 0) {
                    LOG.info("{}", i);
            }   }
        };
    }

这段代码最终创建和销毁了几个通道,有没有办法限制将要同时创建的通道?(阻塞或排队)

这是在兔子控制台中保持缓存的通道之一:127.0.0.1:54190 (509)。

标签: spring-amqpspring-rabbit

解决方案


setChannelCheckoutTimeout

    /**
     * Sets the channel checkout timeout. When greater than 0, enables channel limiting
     * in that the {@link #channelCacheSize} becomes the total number of available channels per
     * connection rather than a simple cache size. Note that changing the {@link #channelCacheSize}
     * does not affect the limit on existing connection(s), invoke {@link #destroy()} to cause a
     * new connection to be created with the new limit.
     * <p>
     * Since 1.5.5, also applies to getting a connection when the cache mode is CONNECTION.
     * @param channelCheckoutTimeout the timeout in milliseconds; default 0 (channel limiting not enabled).
     * @since 1.4.2
     * @see #setConnectionLimit(int)
     */
    public void setChannelCheckoutTimeout(long channelCheckoutTimeout) {

编辑

这按预期工作......

spring.rabbitmq.cache.channel.size=2
spring.rabbitmq.cache.channel.checkout-timeout=1s
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true
@SpringBootApplication
public class So69699961Application {

    public static void main(String[] args) {
        SpringApplication.run(So69699961Application.class, args).close();
    }

    @Bean
    ApplicationRunner runner(CachingConnectionFactory cf) {
        return args -> {
            Connection conn = cf.createConnection();
            Channel chann1 = conn.createChannel(false);
            Channel chann2 = conn.createChannel(false);
            try {
                Channel chann3 = conn.createChannel(false);
            }
            catch (Exception e) {
                e.printStackTrace();
            }
            chann2.close();
            chann1.close();
            conn.close();
        };
    }

}
org.springframework.amqp.AmqpTimeoutException: No available channels

推荐阅读