首页 > 解决方案 > 在 Java 中具有最低延迟的 Rabbit MQ 扇出设置

问题描述

我正在使用 Java 中的 RabbitMQ 来尝试查看在将消息发布到扇出交换器并拥有几个消费者时可以预期的延迟,所有这些消费者都在同一台机器上运行。

我测量延迟的方法是在调用之前简单地在消息中编码一个时间戳channel.basicPublish()然后当我在另一端收到消息时,我采用另一个时间戳并计算两者之间的差异。现在我看到的延迟是在 2 到 4 毫秒的范围内。

我想配置我的交换和队列以提供最少的可靠性保证,并希望最大限度地减少过程中的延迟(无持久性、无 ack、无保证交付等)。我很难找到所有这些设置。尤其是在没有 ack 和持久性方面。

任何人都可以帮助进行设置以尽可能减少延迟吗?总体而言,有人会知道在同一台机器上与消费者和生产者打交道时我能达到的最佳延迟是多少?低于 100us 可以实现吗?

到目前为止,这是我的代码:

import com.rabbitmq.client.*;
import java.time.Instant;

class SPMCPublisher2 {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
             channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

            long count = 0;
            while (true) {
                final Instant now = Instant.now();
                final long now_ts = now.toEpochMilli() * 1000 + now.getNano()/1000;
                final String message = "msg #" + (++count) + "," + now_ts;
                channel.basicPublish(EXCHANGE_NAME, "",
                        new AMQP.BasicProperties.Builder()
                                .contentType("text/plain")
                                .build(),
                        message.getBytes("UTF-8"));

                System.out.println(" --> '" + message + "'");
                Thread.sleep(2000);
            }
        }
    }

}


class SPMCConsumer11 {
    private static final String EXCHANGE_NAME = "logs";
    private static final String QUEUE_NAME = "my_queue";
    private static final boolean AUTO_ACK = true;
    private static final boolean NOT_DURABLE = false;

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        //on-durable, exclusive, autodelete queue with a generated name
        channel.queueDeclareNoWait(QUEUE_NAME,NOT_DURABLE, true, true ,null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            final String message = new String(delivery.getBody(), "UTF-8");
            final Instant now = Instant.now();
            final long now_ts = now.toEpochMilli() * 1000 + now.getNano()/1000;
            final String[] parts = message.split(",");
            final long latency_us = now_ts - Long.parseLong(parts[1]);
            System.out.println(" <-- '" + message + "', latency: " + latency_us + " us");

        };
        channel.basicConsume(QUEUE_NAME, AUTO_ACK, deliverCallback, consumerTag -> { });
    }
}

标签: javaperformancerabbitmqlow-latency

解决方案


推荐阅读