首页 > 解决方案 > Async.queue 在处理队列的 3 个初始元素后崩溃,并发 = 3

问题描述

Async.queue() 最初按预期运行,但在处理前 N 个元素 (N = 3) 后崩溃。

在运行 getAddress() 后添加 callback() 时,完全忽略并发性。随后 getAddress() 为通过流传递到队列的所有任务运行。

尝试在教程的基础上构建时出现了问题。

试图确定根本原因和解决方案。似乎这与承诺链接有关?

已尝试按照 async 文档重构 async.queue() ,但似乎语法已过时,并且无法找到具有链式承诺的工作示例。

const { csvFormat } = require('d3-dsv');
const Nightmare = require('nightmare');
const { readFileSync, writeFileSync } = require('fs');
const numbers = readFileSync('./tesco-title-numbers.csv', 
  {encoding: 'utf8'}).trim().split('\n');
const START = 'https://eservices.landregistry.gov.uk/wps/portal/Property_Search';

var async = require("async")
console.log(numbers)

// create a read stream 
var ArrayStream = require('arraystream')
var stream = ArrayStream.create(numbers)

// set concurrency
N = 3
var q = async.queue(function (task, callback) {
    let data =  getAddress(task)
    // , function(){
    // callback();
    }, 
// },
 N); 


q.drain = function() {
    stream.resume()
    console.log('all items have been processed');
    resolve()
}
// or await the end
// await q.drain()
q.saturated = function() {
  stream.pause();
}

// assign an error callback
q.error = function(err, task) {
    console.error('task experienced an error');
}

stream.on("data", function(data) {
    // console.log(data);
    q.push(data)
})


var getAddress = async id => {console.log(`Now checking ${id}`);
  const nightmare = new Nightmare({ show: true });
// Go to initial start page, navigate to Detail search
      try {
        await nightmare
          .goto(START)
          .wait('.bodylinkcopy:first-child')
          .click('.bodylinkcopy:first-child');
      } catch(e) {
        console.error(e);
      }

  // Type the title number into the appropriate box; click submit
    try {
      let SOMEGLOBALVAR;
      await nightmare
        // does some work
    } catch(e) {
      console.error(e);
      return undefined;
    }

};

标签: node.jsasync.js

解决方案


确定了问题的原因。需要返回与 getAddressed 一起的回调。

let dataArray = []
N = 4
var q = async.queue(async function (task, callback) {

    return getAddress(task).then((response)=>{
        console.log(response); 
        dataArray.push(response);
        callback()})
} , 
 N); 

推荐阅读