首页 > 解决方案 > NodeJS:如何正确并行运行 spawn?

问题描述

我正在运行一个 for 循环,其中执行以下操作:

let perform_vrp = function() {
    //..
    perform_tsp();
}

let perform_tsp = function() {
  //..

  const pyProg = spawn('python3', [process.env.PWD + '/server/vrp_solver/tsp_solver.py', '/../../route_data/' + depot.city + '/' + moment(route.date_driven).format('Y-MM-DD'), 'morning',route.name]);

  winston.info('Solving the TSP for %s...', route.name);

  pyProg.stdout.on('data', function (data) {

    let result_string = data.toString();
    winston.info('Route result for %s is: %s', route.name, result_string);

    let result_array = eval(result_string); 
    //...
  });
}

它基本上为 for 循环中的每个项目调用一个 python 脚本。

但是,当一个脚本完成时,它也会完成所有其他脚本,并继续使用 stdout 对 for 循环中的所有项目使用相同的“数据”。

如何防止这种情况发生并让 stdout 等待正确的子进程完成?

更新:

以上是从以下命令触发的:

const winston = require('../../server/winston');
const perform_vrp = require('../../server/modules/vrp');
const moment = require('moment');
const Utils = require('../modules/utils');
let utils = new Utils();

let do_vrp = async function(next_delivery_date, cities) {
  winston.info('Generating routes for %s', next_delivery_date);
  for (let index in cities) {
    if (cities.hasOwnProperty(index)) {
      let city = cities[index];
      winston.info('Generating route for %s on %s', city, next_delivery_date);
      await perform_vrp(next_delivery_date,[city]);
      await utils.sleep(60000);
    }
  }
};


let process_routes = async function() {

  //...

  let morning_cities = ['Boston','Chicago'];
  await do_vrp(next_delivery_date.format('YYYY-MM-DD'), morning_cities);
};

process_routes();

标签: node.jsspawn

解决方案


问题是您在 for 循环中使用 await 来等待脚本完成,但您没有将脚本的执行包装在Promise中。

perform_tsp

在数据事件触发时解析 Promise 中的 spawn 和事件,并使用接收到的数据进行解析,如下所示:

  let perform_tsp = function () {
  //..
  return new Promise((resolve, reject) => {
    const pyProg = spawn('python3', [process.env.PWD + '/server/vrp_solver/tsp_solver.py', '/../../route_data/' + depot.city + '/' + moment(route.date_driven).format('Y-MM-DD'), 'morning', route.name]);
    winston.info('Solving the TSP for %s...', route.name);
    pyProg.stdout.on('data', function (data) {
      resolve(data);
    })
  });
}

perform_vrp

只需从 perform_tsp 返回返回的承诺,以便您可以在 for 循环中等待它:

let perform_vrp = function () {
  //..
  return perform_tsp();
}

do_vrp

接收已解决的 promise 的数据,即 spawn 的数据,之后您可以执行数据事件回调中的逻辑:

 let do_vrp = async function (next_delivery_date, cities) {
  winston.info('Generating routes for %s', next_delivery_date);
  for (let index in cities) {
    if (cities.hasOwnProperty(index)) {
      let city = cities[index];
      winston.info('Generating route for %s on %s', city, next_delivery_date);
      let data = await perform_vrp(next_delivery_date, [city]);
      let result_string = data.toString();
      winston.info('Route result for %s is: %s', route.name, result_string);
      let result_array = eval(result_string);
      await utils.sleep(60000);
    }
  }
};

如果您想使用Promise.all api 并行重新生成 spawn,之前的结构将连续运行 spawn。

  let do_vrp = async function (next_delivery_date, cities) {
  winston.info('Generating routes for %s', next_delivery_date);

  const promises = cities.map((city) => {
    return perform_vrp(next_delivery_date, [city]);
  });

  const results = await Promise.all(promises);
  results.forEach((data) => {
    winston.info('Generating route for %s on %s', city, next_delivery_date);
    let result_string = data.toString();
    winston.info('Route result for %s is: %s', route.name, result_string);
    let result_array = eval(result_string);
  });
};

推荐阅读