首页 > 解决方案 > 节点,创建一个流对象,内部已经有一些流管道

问题描述

我想在nodejs中创建一个可读可写的流Objetc,它在内部简单地连接了许多流对象,以便为项目提供某种程度的抽象。

最优雅的方法是什么?

这里有一些代码示例来演示我尝试做的事情。

function main(transformerType) {
    var readStream = new ReadStream();
    var writeStream = new WriteStream();
    var transformerStream;

    if (transformerType === "a") {
        transformerStream = getATransformer();
    }
    else {
        transformerStream = getDefaultTransformer();
    }

    readStream.pipe(transformerStream).pipe(writeStream);
}

function getDefaultTransfromer() {
    return new DefaultTransformerStream();
}

function getATransformer() {
    var xStream = new LibraryXStream();
    var yStream = new LibraryYStream();
    xStream.pipe(yStream);

    // This next line won't work
    // I want to return a stream-object that has a function pipe which calls pipe from yStream
    // and when getting piped to will pipe into xStream
    // sort of a wrapper that has xStream and yStream already connected
    return xxx?;
}

标签: node.jsstreampipe

解决方案


我创建了类 TransformerWrapper 似乎解决了这个问题,至少在下面的测试代码中。当然,在那个简单的示例中,所有这些流都可以连接在一起形成一条线。然而,我们的目标是找到一种方法来返回几个连接的流,这些流看起来是一个可读/可写的流对象给调用函数。

如果有人找到更小、更好的东西,我将不胜感激。

const stream = require("stream");

class TransformerWrapper extends stream.Transform {
    constructor (transformers) {
        super();
        var i = 1;

        this.firstTransformer = transformers[0];
        this.lastTransformer = transformers[transformers.length-1];

        this.lastTransformer.on("data", (chunk) => {
            this.push(chunk);
        });

        for (; i < transformers.length; i++) {
            transformers[i-1].pipe(transformers[i]);
        }
    }
    _write(chunk, encoding, callback) {
        this.firstTransformer.write(chunk, encoding, callback);
    }
}

class TransformerA extends stream.Transform {
    _transform(chunk, encoding, callback) {
        this.push(chunk + '_A');
        callback();
    }   
}

class TransformerB extends stream.Transform {
    _transform(chunk, encoding, callback) {
        this.push(chunk + '_B');
        callback();
    }   
}

class Writer extends stream.Writable {
    _write(chunk, encoding, callback) {
        console.log("writer", chunk.toString());
        callback();
    }   
}

var transformerA = new TransformerA();
var transformerB = new TransformerB();

var transformerWrapper = new TransformerWrapper([transformerA, transformerB]);
transformerWrapper.pipe(new Writer());
transformerWrapper.write("foo");
transformerWrapper.write("bar");

推荐阅读