首页 > 解决方案 > Nodejs 承诺 - 按顺序从异步函数组成一个管道,支持提前退出

问题描述

最近我在 nodejs 中玩异步函数组合,并意识到如果我可以将它们包装到管道中以便在其他地方重用它们会很好。前一个异步函数的每个输出都将作为下一个异步函数的输入。对于基本实现,我使用了如下代码:

pipeline.js


function run(tasks, input) {
  let context = input;

  const reducer = (acc, task) => acc.then((result) => {
    context = result;
    return task(context);
  };

  return tasks.reduce(reducer, Promise.resolve(context));
}

tasks.js

/* do nothing, return input */
const task1 = async (context) => {
  console.log(`previous output: ${context}`);
  return context;
}

/* do nothing, return input */
const task2 = async (context) => {
  console.log(`previous output: ${context}`);
  return context;
}

...

test.js

/* import tasks -> tas1, task2, ... */
const tasks = require('tasks.js');
const pipeline = require('pipeline.js');

pipeline.run(tasks, null)
  .then((output) => console.log('done')
  .catch((err) => console.error(err));

在哪里 :

这是大部分时间都能满足我需求的基本场景。但是,我得出的结论是,当满足某些条件时,我很可能需要提前退出(即提前完成管道)。我找不到干净利落地实现这一目标的方法,唯一对我有用的是拒绝承诺并出现一些错误,这将起到断路器的作用。这是我使用的方法:

pipeline.js

function run(tasks, input) {
  let context = input;

  /* added cancel closure */
  const cancel = () => {
    throw new PipelineCancelError();
  }

  const reducer = (acc, task) => acc.then((result) => {
    context = result;
    return task(context);
  };

  return tasks.reduce(reducer, Promise.resolve(context));
}

所以我的任务会收到额外的函数作为参数,所以我可以取消一些管道阶段:

tasks.js

/* do nothing, return input */
const task3 = async (context, cancel) => {
  console.log(`previous output: ${context}`);
  if (someCondition) {
    /* cancel pipeline */
    cancel();
  }
  return context;
}

我的问题是:当满足某些条件而不抛出错误时,是否有一些干净的方法可以实现提前退出和完成管道?

另外,我发现这个项目看起来很有前途(express样式处理程序),但是当任务抛出未处理的错误时我很难处理错误:promise-pipeline

请注意,我不是经验丰富的 nodejs 开发人员。

谢谢!

标签: node.jspromisepipeline

解决方案


您忘记传递cancel给您的task-

function run(tasks, input) {
  let context = input;

  /* added cancel closure */
  const cancel = () => {
    throw new PipelineCancelError();
  }

  const reducer = (acc, task) => acc.then((result) => {
    context = result;
    return task(context); // <- pass cancel to task
  };

  return tasks.reduce(reducer, Promise.resolve(context));
}

让我们看看如何解决这个问题。我们将添加一个简单sleep的用于演示目的 -

function run(tasks, input) {
  const cancel = e => { throw e }
  const reducer = (acc, task) => acc.then(result => task(result, cancel))
  return tasks.reduce(reducer, Promise.resolve(input))
}

function sleep (ms) {
  return new Promise(r => setTimeout(r, ms))
}

async function task1 (context, cancel) {
  await sleep(1000)
  console.log(`previous output: ${context}`)
  return context.toUpperCase()
}

async function task2 (context, cancel) {
  await sleep(1000)
  console.log(`previous output: ${context}`)
  return context + "!"
}

async function task3 (context, cancel) {
  await sleep(1000)
  console.log(`previous output: ${context}`)
  cancel(Error("task3 cancelled"))
  return context + "?"
}

run([task1, task2, task2], "hello").then(console.log, console.error)

previous output: hello
previous output: HELLO
previous output: HELLO!
HELLO!!

这是cancel效果的演示 -

function run(tasks, input) {
  const cancel = e => { throw e }
  const reducer = (acc, task) => acc.then(result => task(result, cancel))
  return tasks.reduce(reducer, Promise.resolve(input))
}

function sleep (ms) {
  return new Promise(r => setTimeout(r, ms))
}

async function task1 (context, cancel) {
  await sleep(1000)
  console.log(`previous output: ${context}`)
  return context.toUpperCase()
}

async function task2 (context, cancel) {
  await sleep(1000)
  console.log(`previous output: ${context}`)
  return context + "!"
}

async function task3 (context, cancel) {
  await sleep(1000)
  console.log(`previous output: ${context}`)
  cancel(Error("task3 cancelled"))
  return context + "?"
}

run([task1, task2, task3, task2], "hello").then(console.log, console.error)

previous output: hello
previous output: HELLO
previous output: HELLO!
Error: "task3 cancelled"

您可以通过使用第二个参数来响应取消.then-

run([task1, task2, task3, task2], "hello")
  .then(console.log, console.error) // <-

或者你可以使用.catch-

run([task1, task2, task3, task2], "hello")
  .then(console.log)
  .catch(console.error) // <-

推荐阅读