node.js - 如何在“Transform#flush”期间正确处理背压
问题描述
在 Transform 的方法实现中处理背压的正确方法是_flush
什么?换句话说,如果.push()
在刷新时返回 false,是否有任何机制可以正确处理来自下游的背压?
文档规定一旦.push()
返回 false 就停止推送,但是当下游想要恢复读取时,Transform 没有办法监听,除了覆盖this.read
;但这会是什么样子,这样做有什么危险吗?
这是一个您可以使用的工作示例。
const stream = require('stream');
// a string large enough to overflow the buffer
const S_OVERFLOW = '-'.repeat((new stream.Readable()).readableHighWaterMark+1);
class example extends stream.Transform {
constructor() {
super({
writableObjectMode: true,
});
// some internal queue that will be emptied once writable side ends
Object.assign(this, {
internal_queue: [],
});
}
_transform(g_chunk, s_encoding, fk_transform) {
// store chunk in internal queue
this.internal_queue.push(g_chunk);
// done with transform (no writes)
fk_transform();
}
_flush(fk_flush) {
console.warn('starting to flush');
// now that writable side has ended, flush internal queue
this.resumeFlush(fk_flush);
}
resumeFlush(fk_flush) {
let a_queue = this.internal_queue;
// still data left in internal queue
while(a_queue.length) {
// remove an item from queue
a_queue.pop();
// intentionally overflow buffer
if(!this.push(S_OVERFLOW)) {
//
// WHAT TO DO HERE?
//
// go asynchronous
return;
}
}
console.warn('finished flush');
// callback
fk_flush();
}
}
// instantiate transform
let ds_transform = new example();
// pipe to stdout
ds_transform.pipe(process.stdout);
// write some data (needs to happen twice)
ds_transform.write({
item: 0,
});
ds_transform.write({
item: 1,
});
// end stream
ds_transform.end();
管道 stdout 以/dev/null
使 stderr 仍打印到控制台:
$ node transform.js > /dev/null
starting to flush
解决方案
这里真正的问题是您应该使用 Duplex 而不是 Transform。由于每次调用_transform
实际上是在缓冲数据而不是对它们应用一些(a/)同步转换,因此这种类型的实现更适合作为双工,即调用_write()
缓冲数据,并调用_read()
开始推送,直到检测到背压。
const stream = require('stream');
// a string large enough to overflow the buffer
const S_OVERFLOW = '-'.repeat((new stream.Readable()).readableHighWaterMark+1);
class example extends stream.Duplex {
constructor() {
super({
writableObjectMode: true,
});
// some internal queue that will be emptied once writable side ends
Object.assign(this, {
internal_queue: [],
});
}
_write(g_chunk, s_encoding, fk_write) {
// store chunk in internal queue
this.internal_queue.push(g_chunk);
// done with transform (no writes)
fk_write();
}
_read() {
console.warn('called _read()');
let a_queue = this.internal_queue;
// still data left in internal queue
while(a_queue.length) {
// remove an item from queue
a_queue.pop();
// intentionally overflow buffer
if(!this.push(S_OVERFLOW)) {
// go asynchronous
return;
}
}
console.warn('finished reading');
// nothing more to read
this.push(null);
}
}
// instantiate transform
let ds_transform = new example();
// pipe to stdout
ds_transform.pipe(process.stdout);
// write some data (needs to happen twice)
ds_transform.write({
item: 0,
});
ds_transform.write({
item: 1,
});
// end stream
ds_transform.end();
然后你会得到:
$ node duplex.js > /dev/null
called _read()
called _read()
called _read()
finished reading
推荐阅读
- java - Spring Kafka,覆盖 max.poll.interval.ms?
- clang - 如何获取Clang编译器内置函数的源码?
- c - C中符号楔的右对齐
- css - 如何在 mat-对话框中放置列和行
- javascript - 如何使用 CSS 渐变在模拟时钟上突出显示 25 分钟?
- javascript - 未捕获的 TypeError:time0.getHours 不是函数
- c# - P/Invoke RemoveMenu SetLastError 不起作用
- python - if/elif 语句的 Python 缩进错误
- python - PyPDF2脚本在文件夹中拆分pdf的每一页
- javascript - 使用