首页 > 解决方案 > RabbitTemplate 的 waitForConfirms 和阻塞相关数据回调的性能差异

问题描述

我正在实现一项服务,该服务需要以可靠和阻塞的方式将消息发布到 RabbitMQ 代理。如https://www.rabbitmq.com/tutorials/tutorial-seven-java.html中所述,发布者确认有多种选择。最基本的选项是使用waitForConfirms. 我测试了以下代码(使用设置cachingConnectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.SIMPLE)),发布 10,000 条这些消息大约需要 100 秒:

rabbitTemplate.invoke(rabbitOperations -> {
  rabbitOperations.convertAndSend("test_exchange", "test_key", "test message");
  return rabbitOperations.waitForConfirms(1000);
});

cachingConnectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED)我还使用非常原始的阻塞包装器测试了异步选项(带有设置)。以下代码花费了大约 20 秒来发布 10,000 条消息:

CountDownLatch latch = new CountDownLatch(1);
CorrelationData correlationData = new CorrelationData();
rabbitTemplate.convertAndSend("test_exchange", "test_key", "test message", correlationData);
correlationData.getFuture().addCallback(success -> {
  latch.countDown();
}, failure -> {
  latch.countDown();
});
latch.await();

据我了解上面链接中的以下引用,我的第一个示例应该与我的第二个示例基本相同。

将 waitForConfirmsOrDie 视为一个同步助手,它在后台依赖异步通知。

但是性能差异太大,我的假设不成立。我看到它RabbitTemplate.invoke比我的第二个代码示例执行了更多的东西。但是,这种巨大的性能差异究竟是什么原因呢?我的第二个例子是不好的做法,还是我的第一个例子以某种不必要的方式效率低下?

编辑 从加里复制粘贴代码,我遇到了以下情况:

StopWatch '': running time = 106379462700 ns
---------------------------------------------
ns         %     Task name
---------------------------------------------
17546479900  016 %  correlated
88832982800  084 %  simple

我正在使用 Windows 10 Pro、运行通过的 RabbitMQ 3.8.9docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management和 Spring Boot 2.3.1

编辑 2 添加RabbitUtils.isPhysicalCloseRequired()到 Gary 的简单确认代码:

simpleTemplate.invoke(ops -> {
  ops.convertAndSend("so64857773", "test message");
  RabbitUtils.isPhysicalCloseRequired();
  return ops.waitForConfirms(1000);
});

产生以下结果:

StopWatch '': running time = 46090067400 ns
---------------------------------------------
ns         %     Task name
---------------------------------------------
21100935900  046 %  correlated
24989131500  054 %  simple

似乎在 Windows 上运行 dockerized 版本的 RabbitMQ 时,关闭和重新创建通道非常昂贵。

编辑 3 使用带有 Erlang 23.1.3 的本机 Windows 10 RabbitMQ 服务器(3.8.9),性能大大提高。使用 10k 条消息,相关确认测试只用了大约 3 秒。我将以下两个测试的消息数量增加到 50k。

第一次测试,使用来自 Gary 的原始代码:

StopWatch '': running time = 144403128200 ns
---------------------------------------------
ns         %     Task name
---------------------------------------------
19750222900  014 %  correlated
124652905300  086 %  simple

第二次测试,RabbitUtils.isPhysicalCloseRequired()如编辑 2 中所述添加:

StopWatch '': running time = 30030523500 ns
---------------------------------------------
ns         %     Task name
---------------------------------------------
14549132300  048 %  correlated
15481391200  052 %  simple

在 Windows 上关闭和重新打开频道似乎非常昂贵 - 使用 dockerized 版本似乎不是问题。

标签: javaspringrabbitmqspring-rabbit

解决方案


相关确认涉及更多代码,因此我预计该版本会花费更多时间(尽管差异不大)。

我不确定你为什么会看到你观察到的行为。

但是,在这两种情况下,最好在等待确认之前发送多个请求;发送和等待每一个都会显着变慢。

通过相关确认,您可以保存CorrelationData实例并稍后等待期货。

编辑

我没有看到与您相同的行为;使用这两种方法我得到了几乎相同的性能;每种情况下约 62 秒。

@SpringBootApplication
public class So64857773Application {

    public static void main(String[] args) {
        SpringApplication.run(So64857773Application.class, args);
    }

//  @Bean
//  Queue queue() {
//      return new Queue("so64857773");
//  }

    @Bean
    ApplicationRunner runner(CachingConnectionFactory cf, RabbitTemplate template) {
        return args -> {
            StopWatch watch = new StopWatch();
            cf.createConnection().close();
            watch.start("correlated");
            for (int i = 0; i < 10_000; i++) {
                CorrelationData cd = new CorrelationData();
                template.convertAndSend("", "so64857773", "test message", cd);
                cd.getFuture().get(1, TimeUnit.SECONDS);
            };
            watch.stop();

            cf.resetConnection();
            cf.setPublisherConfirmType(ConfirmType.SIMPLE);
            RabbitTemplate simpleTemplate = new RabbitTemplate(cf);
            cf.createConnection().close();
            watch.start("simple");
            for (int i = 0; i < 10_000; i++) {
                simpleTemplate.invoke(ops -> {
                    ops.convertAndSend("so64857773", "test message");
                    return ops.waitForConfirms(1000);
                });
            };
            watch.stop();

            System.out.println(watch.prettyPrint());
        };
    }

}
spring.rabbitmq.publisher-confirm-type=correlated

结果:

StopWatch '': running time = 124245233864 ns
---------------------------------------------
ns         %     Task name
---------------------------------------------
62136894432  050%  correlated
62108339432  050%  simple

推荐阅读