首页 > 解决方案 > 检测流的可写最后一个块

问题描述

我在我们的系统中创建了一个连接到大型管道的流,该流在收到块(对象)Writable后写入数据库。BUFFER_SIZE

getStream() {
    const buffer = [];

    const stream = new Writable({
        objectMode: true,
        async write(chunk,enc,next) {
            buffer.push(chunk);
            if( buffer.length > BUFFER_SIZE ) {
                await insertToDB(buffer);
            }
            next();
        }
    });

    stream.on('finish', async () => {
        // insert last batch?
        if( buffer.length ) {
            await insertToDB(buffer);
        }
    });

    return stream;
}

async consumer() {
    await pipeline(...largePipeline, getStream());
    closeAll();
}

这工作正常,但我遇到的问题是on('finish', ...)事件处理程序被调用太晚,在函数closeAll()中被调用之后consumer()

有没有办法让该write()方法知道它刚刚收到最后一个 chunk?这样我就可以在调用最后一个缓冲区之前刷新缓冲区,next()并且一切都会同步。

请注意,在这个代码库中,管道、消费者和编写者之间有非常严格的分离,我不能也不会在这些组件之间交换承诺、标志或状态检查。可写流是一个独立的单元!我正在寻找节点流通过缓冲写入解决此问题的方法,必须有一种方法来检查Writable流是否被最后调用并等待它真正完成,但我无法理解它。

标签: node.jsnode-streams

解决方案


好的,我找到了正确的方法,Node Streamsfinal(next)在数据耗尽时调用一个方法,这就是你应该在“释放”流之前完成写作的地方:

getStream() {
    let buffer = [];

    return new Writable({
        objectMode: true,
        async write(chunk,enc,next) {
            buffer.push(chunk);
            if( buffer.length > BUFFER_SIZE ) {
                await insertToDB(buffer);
            }
            next();
        },
        async final(next) {
            // insert last batch?
            if( buffer.length ) {
                await insertToDB(buffer);
                buffer = [];
            }
            next();
        }

    });
}

async consumer() {
    await pipeline(...largePipeline, getStream());
    closeAll();
}

我还发现该writev()函数是编写缓冲块的更好方法,而不是在Writable流中实现自己的缓冲区:

getStream() {
    return new Writable({
        objectMode: true,
        highWaterMark: BUFFER_SIZE,
        async writev(chunks, next) {
            await insertToDB( chunks.map( chunk => chunk.chunk ) );
            next();
        }
    });
}

它利用highWaterMark每次发送给您的块对象数量的配置设置,这使 Node 可以更好地控制整个流管道的反压并简化您的Writable设计。


推荐阅读