首页 > 解决方案 > Nodejs:有限的并行执行在处理结束时挂起

问题描述

我正在根据这个问题处理一些文件。我的代码如下所示:

const procFile = async (url) => {
  await download(url);
  await convert(url);
  await upload(url);
};

const thread = async (urls) => {
  while (urls.length) {
    await procFile(urls.pop());
  }
};

const start = async (urls) => {
  const threads = [];
  const maxConcurrency = 4;

  for (let _ of new Array(maxConcurrency)) {
    threads.push(thread(urls));
  }

  await Promise.all(threads);
}

每当我设置maxConcurrency一个大于1的vlue时,程序最终都会挂起,值越大maxConcurrency挂起的越快。程序陷入emitHook了 NodeJs 核心模块方法内部的无限循环async_hooks

使用Node 14

谢谢你的帮助。

标签: node.jsasynchronousparallel-processing

解决方案


使用纯 JS 的简单并发(现场演示):

function concurrency(arr, map, limit){
    let pendingCount= 0;
    const results= [];

    return new Promise((resolve, reject)=>{
        function pump(){
            while (arr && arr.length && pendingCount < limit) {
                pendingCount++;
                map(arr.shift()).then(result=>{
                    pendingCount--;
                    results.push(result);
                    pump();
                }, err=> {
                    arr= null;
                    reject(err)
                });
            }

            if(!pendingCount){
                return resolve(results);
            }
        }

        pump();
    })
}

concurrency([
    'url1',
    'url2',
    'url3',
    'url4',
    'url5',
    'url6',
], async(url)=>{
    await download(url);
    await convert(url);
    return upload(url);
}, 2).then(results=> console.log(`Results: `, results));

或者您可以尝试使用任何具有并发限制功能的库,例如Bluebird.jsp-limitCPromise现场演示

const CPromise= require('c-promise2');

async function download(url){
    console.log(`download ${url}`);
    return CPromise.delay(1000)
}
async function convert(url){
    console.log(`convert ${url}`)
}
async function upload(url){
    console.log(`upload ${url}`)
}

(async()=>{
    await CPromise.all([
        'url1',
        'url2',
        'url3',
        'url4',
        'url5',
        'url6',
    ], {
        mapper: async (url) => {
            await download(url);
            await convert(url);
            await upload(url);
        },
        concurrency: 2
    })
    console.log('Done');
})();

推荐阅读