node.js - 关于如何在一轮请求/响应中发布大量消息有什么建议吗?
问题描述
Promise.all
如果我使用如下方式发布 50K 消息:
const pubsub = new PubSub({ projectId: PUBSUB_PROJECT_ID });
const topic = pubsub.topic(topicName, {
batching: {
maxMessages: 1000,
maxMilliseconds: 100,
},
});
const n = 50 * 1000;
const dataBufs: Buffer[] = [];
for (let i = 0; i < n; i++) {
const data = `message payload ${i}`;
const dataBuffer = Buffer.from(data);
dataBufs.push(dataBuffer);
}
const tasks = dataBufs.map((d, idx) =>
topic.publish(d).then((messageId) => {
console.log(`[${new Date().toISOString()}] Message ${messageId} published. index: ${idx}`);
})
);
// publish messages concurrencly
await Promise.all(tasks);
// send response to front-end
res.json(data);
如果我使用 for 循环和async/await
. 问题消失了。
const n = 50 * 1000;
for (let i = 0; i < n; i++) {
const data = `message payload ${i}`;
const dataBuffer = Buffer.from(data);
const messageId = await topic.publish(dataBuffer)
console.log(`[${new Date().toISOString()}] Message ${messageId} published. index: ${i}`)
}
// some logic ...
// send response to front-end
res.json(data);
但它会阻塞后续逻辑的执行,因为async/await
直到所有消息都被发布。发布 50k 条消息需要很长时间。
关于如何在不阻塞后续逻辑执行的情况下发布大量消息(约 50k)的任何建议?我是否需要使用child_process
或像公牛这样的队列在后台发布大量消息而不阻塞 API 的请求/响应工作流?这意味着我需要尽快响应前端,这 50k 条消息应该是后台任务。
库内似乎有一个内存队列@google/pubsub
。我不确定我是否应该再次使用像公牛这样的另一个队列。
解决方案
发布大量数据所需的时间取决于很多因素:
- 消息大小。消息越大,发送它们所需的时间就越长。
- 网络容量(发布者运行的任何地方与 Google Cloud 之间的连接,以及虚拟机本身(如果相关)之间的连接)。这对可以传输的数据量设置了上限。看到较小的虚拟机限制在 40MB/s 范围内的情况并不少见。请注意,如果您通过 Wifi 进行测试,则限制可能会低于此值。
- 线程数和 CPU 内核数。当必须运行大量异步回调时,调度它们运行的能力可能会受到机器或运行时环境的并行容量的限制。
通常,尝试从发布者的一个实例同时发送 50,000 个发布是不好的。上述因素很可能会导致客户端超载并导致超出期限的错误。防止这种情况发生的最好方法是限制一次可以发布的未完成消息的数量。像 Java 这样的一些库本身就支持这一点。Node.js 库尚不支持此功能,但将来可能会支持。
同时,您希望保留未处理消息数量的计数器,并将其限制为客户端似乎能够处理的任何内容。从 1000 开始,然后根据结果从那里向上或向下工作。信号量将是实现此行为的一种非常标准的方式。在您的情况下,代码将如下所示:
var sem = require('semaphore')(1000);
var publishes = []
const tasks = dataBufs.map((d, idx) =>
sem.take(function() => {
publishes.push(topic.publish(d).then((messageId) => {
console.log(`[${new Date().toISOString()}] Message ${messageId} published. index: ${idx}`);
sem.leave();
}));
})
);
// Await the start of publishing all messages
await Promise.all(tasks);
// Await the actual publishes
await Promise.all(publishes);
推荐阅读
- javascript - 如何使用分析报告获取 Google Analytics 的页面特定数据?
- python - 以有效的方式展开 python 字典
- java - 从 DeckHand 中删除卡片
- python - 超市数据类
- javascript - 你能在二叉搜索树的构造中发现一个错误吗?
- c++ - UE4-根据移动方向改变移动速度
- python - 如何将代码从文本文件复制到 .py 文件?
- javascript - 我无法在 vue.js 中为我的数组实现过滤器
- assembly - 需要了解指令 ldrvsb 和 ldrbvs 指令做
- c# - 多个按钮的 MouseEnter 事件上的 Visual Studio 更改面板背景颜色