node.js - 检测流的可写最后一个块
问题描述
我在我们的系统中创建了一个连接到大型管道的流,该流在收到块(对象)Writable
后写入数据库。BUFFER_SIZE
getStream() {
const buffer = [];
const stream = new Writable({
objectMode: true,
async write(chunk,enc,next) {
buffer.push(chunk);
if( buffer.length > BUFFER_SIZE ) {
await insertToDB(buffer);
}
next();
}
});
stream.on('finish', async () => {
// insert last batch?
if( buffer.length ) {
await insertToDB(buffer);
}
});
return stream;
}
async consumer() {
await pipeline(...largePipeline, getStream());
closeAll();
}
这工作正常,但我遇到的问题是on('finish', ...)
事件处理程序被调用太晚,在函数closeAll()
中被调用之后consumer()
。
有没有办法让该write()
方法知道它刚刚收到最后一个 chunk
?这样我就可以在调用最后一个缓冲区之前刷新缓冲区,next()
并且一切都会同步。
请注意,在这个代码库中,管道、消费者和编写者之间有非常严格的分离,我不能也不会在这些组件之间交换承诺、标志或状态检查。可写流是一个独立的单元!我正在寻找节点流通过缓冲写入解决此问题的方法,必须有一种方法来检查Writable
流是否被最后调用并等待它真正完成,但我无法理解它。
解决方案
好的,我找到了正确的方法,Node Streamsfinal(next)
在数据耗尽时调用一个方法,这就是你应该在“释放”流之前完成写作的地方:
getStream() {
let buffer = [];
return new Writable({
objectMode: true,
async write(chunk,enc,next) {
buffer.push(chunk);
if( buffer.length > BUFFER_SIZE ) {
await insertToDB(buffer);
}
next();
},
async final(next) {
// insert last batch?
if( buffer.length ) {
await insertToDB(buffer);
buffer = [];
}
next();
}
});
}
async consumer() {
await pipeline(...largePipeline, getStream());
closeAll();
}
我还发现该writev()
函数是编写缓冲块的更好方法,而不是在Writable
流中实现自己的缓冲区:
getStream() {
return new Writable({
objectMode: true,
highWaterMark: BUFFER_SIZE,
async writev(chunks, next) {
await insertToDB( chunks.map( chunk => chunk.chunk ) );
next();
}
});
}
它利用highWaterMark
每次发送给您的块对象数量的配置设置,这使 Node 可以更好地控制整个流管道的反压并简化您的Writable
设计。
推荐阅读
- angular - 如何在控制台中访问 ngrx 商店?
- flutter - 在flutter中从firestore获取的应用程序中格式化日期时间
- flutter - 错误:没有名为“onRatingChanged”的命名参数
- java - 停止在jsp中将页面上表单的先前数据重新加载到DB
- gcc - gcc中的字符转换
- python - 遍历文件目录
- reactjs - react-bootstrap 导航栏奇怪的行为
- html - HTML/CSS 问题:由于跨域读取阻塞问题,无法加载背景图像
- python - python 'replace' 会返回错误的错误信息吗?
- python - []内的Python打印参数