java - 在 Java 中具有最低延迟的 Rabbit MQ 扇出设置
问题描述
我正在使用 Java 中的 RabbitMQ 来尝试查看在将消息发布到扇出交换器并拥有几个消费者时可以预期的延迟,所有这些消费者都在同一台机器上运行。
我测量延迟的方法是在调用之前简单地在消息中编码一个时间戳channel.basicPublish()
然后当我在另一端收到消息时,我采用另一个时间戳并计算两者之间的差异。现在我看到的延迟是在 2 到 4 毫秒的范围内。
我想配置我的交换和队列以提供最少的可靠性保证,并希望最大限度地减少过程中的延迟(无持久性、无 ack、无保证交付等)。我很难找到所有这些设置。尤其是在没有 ack 和持久性方面。
任何人都可以帮助进行设置以尽可能减少延迟吗?总体而言,有人会知道在同一台机器上与消费者和生产者打交道时我能达到的最佳延迟是多少?低于 100us 可以实现吗?
到目前为止,这是我的代码:
import com.rabbitmq.client.*;
import java.time.Instant;
class SPMCPublisher2 {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
long count = 0;
while (true) {
final Instant now = Instant.now();
final long now_ts = now.toEpochMilli() * 1000 + now.getNano()/1000;
final String message = "msg #" + (++count) + "," + now_ts;
channel.basicPublish(EXCHANGE_NAME, "",
new AMQP.BasicProperties.Builder()
.contentType("text/plain")
.build(),
message.getBytes("UTF-8"));
System.out.println(" --> '" + message + "'");
Thread.sleep(2000);
}
}
}
}
class SPMCConsumer11 {
private static final String EXCHANGE_NAME = "logs";
private static final String QUEUE_NAME = "my_queue";
private static final boolean AUTO_ACK = true;
private static final boolean NOT_DURABLE = false;
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//on-durable, exclusive, autodelete queue with a generated name
channel.queueDeclareNoWait(QUEUE_NAME,NOT_DURABLE, true, true ,null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
final String message = new String(delivery.getBody(), "UTF-8");
final Instant now = Instant.now();
final long now_ts = now.toEpochMilli() * 1000 + now.getNano()/1000;
final String[] parts = message.split(",");
final long latency_us = now_ts - Long.parseLong(parts[1]);
System.out.println(" <-- '" + message + "', latency: " + latency_us + " us");
};
channel.basicConsume(QUEUE_NAME, AUTO_ACK, deliverCallback, consumerTag -> { });
}
}
解决方案
推荐阅读
- java - 空 可选,带有自定义描述性异常
- javascript - 将对象值从字符串转换为普通值(它是原始数据类型)
- python - 保存 Altair 图表,周围没有空白
- python - Python TypeError:“字节”类型的对象不是 JSON 可序列化的
- mysql - 数据库模式嵌套任务列表
- python-3.x - SQLite,Python:我的更新函数交换了一些列
- homebrew - 输入“brew医生”后收到错误
- django - 我无法在 pythonanywhere 的 bash 控制台中为 django webapp 运行蝗虫
- node.js - 带重定向的 Spawn 命令
- c++ - 在 poco 1.10.1 中发送 HTTPRequest 超时