首页 > 解决方案 > Node.js:可控的并发while循环

问题描述

我有一个 170 万条记录的 mongodb 集合。每条记录都是一个 ID 号。我需要读取每个 ID 号,向另一个服务发出一些请求,转换数据,将其写入不同的集合,如果一切成功,则删除原始 ID 记录。

我想要一个无限期地做这些事情的脚本,直到集合为空,并具有可指定的并发性(即任何时候最多 3 个请求)。

通常我会使用 Bluebird's map,它可以指定并发承诺的数量,但没有输入数组(除非我要将所有输入记录读入内存,我不会这样做)。

我想要的本质上是一个并发的while循环,即:(伪javascript)

promiseWhile(queueNotEmpty, 3){
  readFromQueue
    .then(doc => {
      return process(doc);
    })
    .then(result => {
      if(result == "empty") // (or whatever)
        queueNotEmpty = false;
    });
} 

标签: javascriptnode.jsconcurrency

解决方案


您可以使用 mongodb 的游标对所有记录进行异步迭代。为了让三个工作人员处理它,将任务包装到一个异步函数中并多次调用它:

 const cursor = db.collection("records").find({});

 async function process() {
   while(await cursor.hasNext()) {
     const record = await cursor.next();
     //...
   }
 }

 await Promise.all([ process(), process(), process() ]);

(我不确定 mongodb 驱动程序是否支持并发调用.next(),你应该测试一下)


否则,此 Semaphore 实现可能会有所帮助:

 function Semaphore(count = 1) {
  const resolvers = [];
  let startCount = count;

   return {
     aquire() {
       return new Promise(resolve => {
         if(startCount) { resolve(); startCount -= 1; }
         else resolvers.push(resolve);
       });
     },
     free() { 
       if(resolvers.length) resolvers.pop()(); 
       else startCount += 1;
     },
     async use(cb) { 
       await this.aquire(); 
       await cb(); 
       this.free() 
     },
     async done() {
       await Promise.all(Array.from({ length: count }, () => this.aquire()));
       startCount = count;
     },
   };
 }

运行演示 在您的情况下,它可用作:

 const connectionSemaphore = Semaphore(3);

 (async fuction() {
    while(await cursor.hasNext()) {
      const record = await cursor.next();
      /*await*/ connectionSemaphore.use(async () => {
        // Do connection stuff concurrently
      });
    }

    await connectionSemaphore.done();
 })();

推荐阅读