node.js - 如何流式传输到 Promise?
问题描述
当我POST
将数据写入我的应用程序时,我将其写入文件并通过PUT
. 我想从POST
每个流(文件和 PUT)的状态返回。
putResults
是一个数组,它是封闭类的一部分,用于保存每个请求的结果。
我如何收集回复?我可以从中返回一系列 Promises,createWriteStreams
但我怎么能req.pipe
返回它们呢?你可以流式传输到 Promise 吗?
post(req, res, next) {
let listeners = this.getWriteStreams();
let c = listeners.length;
for (i = 0; i < c; i++) {
req.pipe(listeners[i]);
}
/* What do I put here to return after all requests finish? */
}
put(req, res, next) {
var fstream = fs.createWriteStream(this.path);
req.pipe(fstream);
req.on('end', () => {
fstream.close();
res.status(201).send("OK");
});
req.on('error', (err) => {
res.status(500).send(err);
});
}
createWriteStreams() {
let listeners = [];
// We always want to save to a file
listeners.push(fs.createWriteStream(this.path).on('close', ()=>{
this.putResults.push({ host: hutil.myHost, status: 201 });
}));
// If there are other servers in current env, send to them, too!
let otherGuys = hostutil.otherServers();
if (otherGuys.length > 0) {
for (i = 0; i < otherGuys.length; i++) {
let opts = {
hostname: hutil.fq(otherGuys[i]),
port: this.port,
path: this.endpoint,
method: 'PUT',
};
let req = https.request(opts, res => {
this.putResults.push({ host: opts.hostname, status: res.statusCode});
});
req.on('error', (e) => {
this.putResults.push({ host: opts.hostname, status: e });
});
listeners.push(req);
}
}
return listeners;
}
解决方案
好吧,如果有人有这个问题,知识的关键点是可以传递输入流,就好像它上面有多个自定义龙头一样 - 打开一个似乎不会过早地将数据喷洒到你的其他地方'正在放置软管!
因此,由于您无法流式传输到 Promise,您仍然可以流式传输到流,并且您显然可以花时间设置它们。这是我的解决方案:将请求传递给包装在 Promise 中的流。
function post(req, res, next) {
let promises = this.streamPromises(req);
Promise.allSettled(promises).then((results) => {
// Remove the Promise container junk - results come in 2 completely different object structures. Great design, jeez. :-\
let noContainer = results.map(item => item.value).filter(i => i != undefined);
noContainer.push(...results.map(item => item.reason).filter(i => i != undefined));
res.status(200).json(noContainer);
}).catch(err => {
log.warn(`POST request for ${this.filename} failed, at least in part: ${err}`)
res.status(200).json({ host: hutil.myHost, status: err });
});
}
function put(req, res, next) {
var fstream = fs.createWriteStream(this.fqFile);
req.pipe(fstream);
req.on('end', () => {
fstream.close();
log.info(`${req.transID} Saved data to ${this.fqFile} sent by ${req.referer}`);
res.status(201).send("OK");
});
req.on('error', (err) => {
log.warn(`${req.transID} Error receiving/saving PUT file ${this.fqFile} sent by ${req.referer}`);
res.status(500).send(err);
});
}
function streamPromises(postReq) {
let listeners = [];
listeners.push(this.streamLocalFrom(postReq)); // add file first
// Send to other servers in the environment
let otherGuys = hosts.peerServers();
if (otherGuys.length > 0) {
for (i = 0; i < otherGuys.length; i++) {
let opts = {
hostname: hosts.fq(otherGuys[i]),
thatHost: otherGuys[i], // ducked this into the object to avoid parsing fq hostname
port: appconfig.port, // we are all listening here
path: this.endpoint,
method: 'PUT',
timeout: 1000,
ca: fs.readFileSync(appconfig.authorityFile)
};
let p = new Promise((resolve, reject) => {
let req = https.request(opts, res => {
log.info(`${this.filename}: Response from ${opts.hostname}:${opts.port}: ${res.statusCode}`);
// let hn = opts.hostname.match(/(.*?)\./)[1] || opts.hostname;
resolve({ host: opts.thatHost, status: res.statusCode });
});
req.on('error', (e) => {
log.warn(`Error piping ${this.filename} to ${opts.hostname}:${opts.port}: ${e}`);
reject({ host: opts.thatHost, status: e });
});
postReq.pipe(req);
});
listeners.push(p);
}
}
return listeners;
}
function streamLocalFrom(postReq) {
return new Promise((resolve, reject) => {
let fileError = false;
let fstream = fs.createWriteStream(this.fqFile);
fstream.on('close', (err) => {
if (!fileError) {
log.info(`Saved data to file at ${this.fqFile}`);
resolve({ host: me, status: 201 });
}
});
fstream.on('error', (err) => {
log.warn(`Could not save ${this.fqFile} because ${err}`);
reject({ host: hutil.myHost, status: 500 });
fileError = true;
fstream.close();
});
postReq.pipe(fstream);
});
}
推荐阅读
- c++ - 我将如何处理具有不同类的成员函数的函数查找表?
- python - 如何传输 tkinter 窗口但删除重新启动的窗口(已关闭)
- javascript - 按键名分组和聚合对象数组
- node.js - Smartsheet API 的 Node.js 查询参数
- python-3.x - 如何使索引值显示在 x 轴刻度上?
- python - 将 **kwargs 直接传递到多处理池时如何处理它们?
- javascript - Selenium / Python - 如何访问/计算具有虚拟滚动的容器内的所有元素
- reactjs - 如何使用 apollo 查询的结果调度操作?
- python - Python中的私有类变量?
- android - 无法在 Android API 23 的小屏幕上选择 DatePicker 对话框中的所有日期