首页 > 解决方案 > 在 Node 中正确批处理嵌套的 Promise

问题描述

knex seed在 Node 中运行,由于服务器的限制,我需要对我的数据库进行额外的查询。我开始掌握 Promise 和 async/await 的窍门,但我无法让它在几个层次上工作(在这一点上让我特别失望的是,它似乎干扰了批处理我无法理解的方式)。我的seed文件如下所示:

exports.seed = async function(knex) {
  const fs = require('fs');
  const _ = require('lodash');

  function get_event_id(location) {
    return knex('events')
      .where({location: location})
      .first()
      .then(result => { return result['id']; })
      .finally(() => { knex.destroy() })
  }

  function createImage(row, event_id) {
    return {
      name: row[4],
      event_id: event_id
    }
  };

  async function run_query(line) {
      let row = line.split(',');
      let event_id = await get_event_id(row[0]);
      return createImage(row, event_id);
  };

  async function run_batch(batch) {

      return Promise.all(batch.map(run_query));
  } 

  const file = fs.readFileSync('./data.csv');
  const lines = file.toString().replace(/[\r]/g, '').split('\n').slice(1,60); // skip csv header, then run first 59 lines

  const batches = _.chunk(lines, 30); // set batch size

  let images = await Promise.all(batches.map(run_batch));

  console.log(_.flatten(images).length);

};

我的数据库一次可以处理 30 个查询。如果我在定义.slice(1,30)的行上运行单个批处理,一切都会正确解决。lines但是如上所述以 60 运行给了我ER_TOO_MANY_USER_CONNECTIONS: User already has more than 'max_user_connections' active connections.

如果我更改run_batchto的内容,脚本就会完成return batch.map(run_query),它会返回正确数量的条目(因此它似乎正在正确批处理)。但是,Promises 仍然悬而未决。我错过了什么,有没有更优雅的方法来做到这一点?

标签: javascriptnode.jsasynchronouspromiseknex.js

解决方案


在这一行:

let images = await Promise.all(batches.map(run_batch));

您正在尝试并行运行所有批次,这完全破坏了您的分块。

您可以使用常规for循环await而不是.map()runva 批处理,等待它完成,然后运行下一个批处理。

let allResults = [];
for (let batch of batches) {
     let images = await run_batch(batch);
     allResults.push(...images);
}
console.log(allResults);

仅供参考,您可能会受益于人们为同时处理不超过 N 个请求的大型数组而编写的任何数量的函数。这些不需要您手动将数据分成批次。相反,它们会同时监控有多少请求在进行中,并启动您想要的请求数量,当一个请求完成时,它们会启动另一个请求,为您收集结果。

runN(fn, limit, cnt, options):在多个请求上循环一个 API

pMap(array, fn, limit):向一次只能处理 20 个的 api 发出多个请求

rateLimitMap(array, requestsPerSec, maxInFlight, fn)每秒最大请求的正确异步方法

mapConcurrent(array, maxConcurrent, fn): Promise.all() 消耗了我所有的内存

Bluebird Promise 库Async-promises 库中还内置了一些功能。


推荐阅读