首页 > 解决方案 > 如何让一定数量的函数在 NodeJs 中循环并行运行?

问题描述

我正在寻找一种在循环中一次运行 3 个相同功能并等待它完成并继续运行另外 3 个相同功能的方法。我认为它涉及一个循环,promise API。但我的解决方案是失败。如果您能告诉我我做错了什么以及如何解决它,那就太好了。

这是我到目前为止所做的:


我有一个下载功能(call downloadFile)、一个保留功能(call runAfter)和一个多下载功能(call downloadList)。它们看起来像这样:

const https = require('https')
const fs = require('fs')
const { join } = require('path')
const chalk = require('chalk') // NPM
const mime = require('./MIME') // A small module read Json and turn it to object. It returns a file extension string.

exports.downloadFile = url => new Promise((resolve, reject) => {
    const req = https.request(url, res => {
        console.log('Accessing:', chalk.greenBright(url))
        console.log(res.statusCode, res.statusMessage)
        // console.log(res.headers)

        const ext = mime(res)
        const name = url
            .replace(/\?.+/i, '')
            .match(/[\ \w\.-]+$/i)[0]
            .substring(0, 250)
            .replace(`.${ext}`, '')
        const file = `${name}.${ext}`
        const stream = fs.createWriteStream(join('_DLs', file))

        res.pipe(stream)
        res.on('error', reject)

        stream
            .on('open', () => console.log(
                chalk.bold.cyan('Download:'),
                file
            ))
            .on('error', reject)
            .on('close', () => {
                console.log(chalk.bold.cyan('Completed:'), file)
                resolve(true)
            })
    })
    req.on('error', reject)
    req.end()
})

exports.runAfter = (ms, url) => new Promise((resolve, reject) => {
    setTimeout(() => {
        this.downloadFile(url)
            .then(resolve)
            .catch(reject)
    }, ms);
})

/* The list param is Array<String> only */
exports.downloadList = async (list, options) => {
    const opt = Object.assign({
        thread: 3,
        delayRange: {
            min: 100,
            max: 1000
        }
    }, options)

    // PROBLEM
    const multiThread = async (pos, run) => {
        const threads = []
        for (let t = pos; t < opt.thread + t; t++) threads.push(run(t))
        return await Promise.all(threads)
    }

    const inQueue = async run => {
        for (let i = 0; i < list.length; i += opt.thread)
            if (opt.thread > 1) await multiThread(i, run)
            else await run(i)
    }

    const delay = range => Math.floor(
        Math.random() * (new Date()).getHours() *
        (range.max - range.min) + range.min
    )

    inQueue(i => this.runAfter(delay(opt.delayRange), list[i]))
}

downloadFile将从给定的链接下载任何内容。将runAfter在执行之前延迟一个随机毫秒downloadFile。接收一个 URL 列表并将每个downloadListURL 传递runAfter给下载。而那个 ( downloadList) 就是麻烦的开始。

如果我只是通过简单的循环传递整个列表并一次执行一个文件。这简单。但是如果我传递一个大请求,比如一个包含 50 个 url 的列表。这需要很长时间。所以我决定让它一次以 3-5 并行运行downloadFile,而不是 1 downloadFile。我正在考虑使用async/awaitPromise.all解决问题。然而,这是崩溃。以下是 NodeJs 报告:

<--- Last few GCs --->

[4124:01EF5068]    75085 ms: Scavenge 491.0 (493.7) -> 490.9 (492.5) MB, 39.9 / 0.0 ms  (average mu = 0.083, current mu = 0.028) allocation failure
[4124:01EF5068]    75183 ms: Scavenge 491.4 (492.5) -> 491.2 (493.2) MB, 29.8 / 0.0 ms  (average mu = 0.083, current mu = 0.028) allocation failure


<--- JS stacktrace --->

==== JS stack trace =========================================

    0: ExitFrame [pc: 00B879E7]
Security context: 0x03b40451 <JSObject>
    1: multiThread [04151355] [<project folder>\inc\Downloader.js:~62] [pc=03C87FBF](this=0x03cfffe1 <JSGlobal Object>,0,0x041512d9 <JSFunction (sfi = 03E2E865)>)
    2: inQueue [041513AD] [<project folder>\inc\Downloader.js:70] [bytecode=03E2EA95 offset=62](this=0x03cfffe1 <JSGlobal Object>,0x041512d9 ...

FATAL ERROR: Ineffective mark-compacts near heap limit Allocation failed - JavaScript heap out of memory

Writing Node.js report to file: report.20200428.000236.4124.0.001.json
Node.js report completed

downloadList显然, ( )的子功能multiThread是一个原因,但我无法读取这些数字(似乎是 RAM 的物理地址或其他东西),所以我不知道如何修复它。我不是专业工程师,如果您能给我一个很好的解释,我将不胜感激。

补充资料:

如果您可能会问:

标签: javascriptnode.js

解决方案


你的 for 循环multiThread永远不会结束,因为你的延续条件是t < opt.thread + t. 如果不为零,这将始终是。你将在这里有一个无限循环,这就是你崩溃的原因。trueopt.thread

我怀疑你想做这样的事情:

const multiThread = async (pos, run) => {
  const threads = [];
  for (let t = 0; t < opt.thread && pos+t < list.length; t++)  {
    threads.push(run(pos + t));
  }
  return await Promise.all(threads);
};

这里的区别在于循环的继续条件应该将自身限制为最多opt.thread次数,并且不超过list数组中条目数的末尾。

如果list变量不是全局变量(即list.length在函数中不可用multiThread),那么您可以省略条件的第二部分,并像这样在函数中处理它,这样列表末尾的run任何值都是i忽略:

inQueue(i => {
  if (i < list.length) this.runAfter(delay(opt.delayRange), list[i])
})

推荐阅读