javascript - Node.js:可控的并发while循环
问题描述
我有一个 170 万条记录的 mongodb 集合。每条记录都是一个 ID 号。我需要读取每个 ID 号,向另一个服务发出一些请求,转换数据,将其写入不同的集合,如果一切成功,则删除原始 ID 记录。
我想要一个无限期地做这些事情的脚本,直到集合为空,并具有可指定的并发性(即任何时候最多 3 个请求)。
通常我会使用 Bluebird's map
,它可以指定并发承诺的数量,但没有输入数组(除非我要将所有输入记录读入内存,我不会这样做)。
我想要的本质上是一个并发的while循环,即:(伪javascript)
promiseWhile(queueNotEmpty, 3){
readFromQueue
.then(doc => {
return process(doc);
})
.then(result => {
if(result == "empty") // (or whatever)
queueNotEmpty = false;
});
}
解决方案
您可以使用 mongodb 的游标对所有记录进行异步迭代。为了让三个工作人员处理它,将任务包装到一个异步函数中并多次调用它:
const cursor = db.collection("records").find({});
async function process() {
while(await cursor.hasNext()) {
const record = await cursor.next();
//...
}
}
await Promise.all([ process(), process(), process() ]);
(我不确定 mongodb 驱动程序是否支持并发调用.next()
,你应该测试一下)
否则,此 Semaphore 实现可能会有所帮助:
function Semaphore(count = 1) {
const resolvers = [];
let startCount = count;
return {
aquire() {
return new Promise(resolve => {
if(startCount) { resolve(); startCount -= 1; }
else resolvers.push(resolve);
});
},
free() {
if(resolvers.length) resolvers.pop()();
else startCount += 1;
},
async use(cb) {
await this.aquire();
await cb();
this.free()
},
async done() {
await Promise.all(Array.from({ length: count }, () => this.aquire()));
startCount = count;
},
};
}
运行演示 在您的情况下,它可用作:
const connectionSemaphore = Semaphore(3);
(async fuction() {
while(await cursor.hasNext()) {
const record = await cursor.next();
/*await*/ connectionSemaphore.use(async () => {
// Do connection stuff concurrently
});
}
await connectionSemaphore.done();
})();
推荐阅读
- math - 在lua中反转指数
- sql - 子查询没有按我想要的方式工作
- tensorflow - 如何为 yolo-detector 的每个网格单元输出类?
- angular - 滚动到div的末尾然后会再次出现
- javascript - 运行 setInterval 时无法更新对象属性
- c# - Winforms - listbox 从列表中的选中项中获取对象
- python - 如何运行 repl.it 中不同目录中的文件?
- php - 全屏显示图像
- neural-network - 如何知道 gensim 预训练 word2vec 中的标记 id 将匹配标记器词汇表的 id
- c++ - 调用 AI MoveToLocation 时 Unreal 崩溃