node.js - Nodejs 承诺 - 按顺序从异步函数组成一个管道,支持提前退出
问题描述
最近我在 nodejs 中玩异步函数组合,并意识到如果我可以将它们包装到管道中以便在其他地方重用它们会很好。前一个异步函数的每个输出都将作为下一个异步函数的输入。对于基本实现,我使用了如下代码:
pipeline.js
function run(tasks, input) {
let context = input;
const reducer = (acc, task) => acc.then((result) => {
context = result;
return task(context);
};
return tasks.reduce(reducer, Promise.resolve(context));
}
tasks.js
/* do nothing, return input */
const task1 = async (context) => {
console.log(`previous output: ${context}`);
return context;
}
/* do nothing, return input */
const task2 = async (context) => {
console.log(`previous output: ${context}`);
return context;
}
...
test.js
/* import tasks -> tas1, task2, ... */
const tasks = require('tasks.js');
const pipeline = require('pipeline.js');
pipeline.run(tasks, null)
.then((output) => console.log('done')
.catch((err) => console.error(err));
在哪里 :
pipeline.js
: 流水线实现tasks.js
:我可以从中组成管道的函数定义test.js
: 测试一切的代码
这是大部分时间都能满足我需求的基本场景。但是,我得出的结论是,当满足某些条件时,我很可能需要提前退出(即提前完成管道)。我找不到干净利落地实现这一目标的方法,唯一对我有用的是拒绝承诺并出现一些错误,这将起到断路器的作用。这是我使用的方法:
pipeline.js
function run(tasks, input) {
let context = input;
/* added cancel closure */
const cancel = () => {
throw new PipelineCancelError();
}
const reducer = (acc, task) => acc.then((result) => {
context = result;
return task(context);
};
return tasks.reduce(reducer, Promise.resolve(context));
}
所以我的任务会收到额外的函数作为参数,所以我可以取消一些管道阶段:
tasks.js
/* do nothing, return input */
const task3 = async (context, cancel) => {
console.log(`previous output: ${context}`);
if (someCondition) {
/* cancel pipeline */
cancel();
}
return context;
}
我的问题是:当满足某些条件而不抛出错误时,是否有一些干净的方法可以实现提前退出和完成管道?
另外,我发现这个项目看起来很有前途(express
样式处理程序),但是当任务抛出未处理的错误时我很难处理错误:promise-pipeline
请注意,我不是经验丰富的 nodejs 开发人员。
谢谢!
解决方案
您忘记传递cancel
给您的task
-
function run(tasks, input) {
let context = input;
/* added cancel closure */
const cancel = () => {
throw new PipelineCancelError();
}
const reducer = (acc, task) => acc.then((result) => {
context = result;
return task(context); // <- pass cancel to task
};
return tasks.reduce(reducer, Promise.resolve(context));
}
让我们看看如何解决这个问题。我们将添加一个简单sleep
的用于演示目的 -
function run(tasks, input) {
const cancel = e => { throw e }
const reducer = (acc, task) => acc.then(result => task(result, cancel))
return tasks.reduce(reducer, Promise.resolve(input))
}
function sleep (ms) {
return new Promise(r => setTimeout(r, ms))
}
async function task1 (context, cancel) {
await sleep(1000)
console.log(`previous output: ${context}`)
return context.toUpperCase()
}
async function task2 (context, cancel) {
await sleep(1000)
console.log(`previous output: ${context}`)
return context + "!"
}
async function task3 (context, cancel) {
await sleep(1000)
console.log(`previous output: ${context}`)
cancel(Error("task3 cancelled"))
return context + "?"
}
run([task1, task2, task2], "hello").then(console.log, console.error)
previous output: hello
previous output: HELLO
previous output: HELLO!
HELLO!!
这是cancel
效果的演示 -
function run(tasks, input) {
const cancel = e => { throw e }
const reducer = (acc, task) => acc.then(result => task(result, cancel))
return tasks.reduce(reducer, Promise.resolve(input))
}
function sleep (ms) {
return new Promise(r => setTimeout(r, ms))
}
async function task1 (context, cancel) {
await sleep(1000)
console.log(`previous output: ${context}`)
return context.toUpperCase()
}
async function task2 (context, cancel) {
await sleep(1000)
console.log(`previous output: ${context}`)
return context + "!"
}
async function task3 (context, cancel) {
await sleep(1000)
console.log(`previous output: ${context}`)
cancel(Error("task3 cancelled"))
return context + "?"
}
run([task1, task2, task3, task2], "hello").then(console.log, console.error)
previous output: hello
previous output: HELLO
previous output: HELLO!
Error: "task3 cancelled"
您可以通过使用第二个参数来响应取消.then
-
run([task1, task2, task3, task2], "hello")
.then(console.log, console.error) // <-
或者你可以使用.catch
-
run([task1, task2, task3, task2], "hello")
.then(console.log)
.catch(console.error) // <-
推荐阅读
- mongodb - 通过聚合在 mongo db 中的总数
- wpf - WPF 绑定到 Listview 的 Itemtemplate 的可见性不起作用
- java - 字符串功能垫或在java中subsritng凝视
- python - sendp 的奇怪行为,将数据包发送到错误的目的地 [scapy]
- html - 我的 CSS 没有填满网页的底部
- node.js - 使用 nodejs 进行电子邮件验证
- c# - 使用 C# 从全名获取短字符串
- ios - 动画 UICollectionView 框架时的布局和单元格问题
- redhat - 启用基于 jdbc 的会话持久性后应用程序无法加载
- android - 使用同一库的不同版本 - 发现多个文件与操作系统无关的路径“builddef.lst”