azure-servicebus-queues - Azure Java ServiceBus 队列 Java 批量接收或完成消息以获得更好的性能
问题描述
我正在尝试从队列接收消息,并尝试了不同的方法并且面临性能问题。以下是每种运行类型的指标:
- 接收模式 = 窥视和锁定;1000 条消息需要 2.5 分钟,因为我必须一条一条地完成每条消息
- 接收模式=接收和删除;1000 条消息平均需要 1.5 分钟。
- 接收模式=接收和删除(预取计数为100);1000 条消息需要 3 秒,但我最终丢失了执行结束时缓冲区中的 100 条消息
- 接收模式 = peek and lock(预取计数为 100);1000 条消息需要 2 分钟,因为我必须再次完成每条消息。只有当有办法批量完成它们时,它才会成为一个问题解决者。
以下是我的代码供参考:
ServiceBusSessionReceiverClient sessionReceiverClient = new ServiceBusClientBuilder()
.connectionString(System.getenv("QueueConnectionString"))
.sessionReceiver()
.maxAutoLockRenewDuration(Duration.ofMinutes(2))
.receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
.queueName(queueName)
.buildClient();
ServiceBusReceiverClient receiverClient = sessionReceiverClient.acceptSession(System.getenv("QueueSessionName"));
ObjectMapper objectMapper = new ObjectMapper();
do {
receiverClient.receiveMessages((int) prefetchCount).stream().forEach(message -> {
try {
String str = message.getBody().toString();
final T dataDto = objectMapper.readValue(message.getBody().toString(), returnType);
dataDtoList.add(dataDto);
receiverClient.complete(message);
} catch (Exception e) {
AzFaUtil.getLogger().severe("Message processing failed. Error: " + e.getMessage() + e + "\n Payload: "
+ message);
}
});
} while (dataDtoList.size() < numberOfMessages);
receiverClient.close();
sessionReceiverClient.close();
我能想到的可能解决方案:
- 如果有一种方法可以批量完成消息,而不是一个接一个地完成。
- 如果有办法将消息重新排入队列,这些消息位于预取缓冲区中。
注意:此 API 需要同步。我刚刚尝试了 1000 个条目,但我正在处理 30000 个条目,所以性能很重要。队列也启用了会话并且还启用了分区
解决方案
根据这个问题,微软还没有测试他们的 ServiceBus 的性能。由于 FIFO(先进先出)是我的消息队列的要求,因此我使用了 JMS,它的平均执行速度快了近 10 倍,但有一个缺点。目前,JMS 不支持基于会话的队列,因此我必须禁用会话,然后为了确保 FIFO,我还必须禁用队列上的分区。在 Microsoft 提高其 ServiceBusRecieverClient 的性能或在 JMS 上启用会话之前,这是一个部分和临时的解决方案,以获得更好的性能。
推荐阅读
- vector - 矢量重新调整大小的矢量
- java - 如何使用 Jackson 从 JSON 对象内的数组中检索值
- macos - 如何与 docker-machine 中的镜像共享 Docker for Mac 镜像?
- python - 如何修剪掉数组元素的 \x 部分?
- javascript - 如何使用 phpMyAdmin 存储音频和视频文件
- r - 如何使 group_by 和 lm 快速?
- r - 绘制时间序列数据,其中第一列是
- java - 如何通过通知保持活动活动并且每秒可运行
- design-patterns - 什么样的设计模式应该用于这样的工作流程?
- python - 如何在不循环的情况下打印项目的名称?