首页 > 解决方案 > 从低内存消耗的流中提取二进制值

问题描述

我正在使用 ExpressJS 构建 NodeJS 服务器,该服务器处理通过桌面应用程序的POST 请求发送的数据( 50KB>100MB ) ,以进行处理和返回。桌面应用 gzip 在发送前压缩数据(50KB 变为 4KB)。

我希望服务器解压缩数据,从数据中提取值(字符串、整数、字符、数组、json 等),处理该数据,然后用处理后的数据进行响应。

我从这个开始:

apiRoute.route("/convert").post(bodyParser.raw({limit: '100Mb'}), (req, res) =>{
    let outputData;
    //extract values from req.body Buffer and do math on them.
    //save processed data in outputData

    res.json({
        status: true,
        data: outputData
    });
});

这是有效的,因为 body-parser 将数据解压缩到req.body存储在内存中的 Buffer 中。这是我的主要问题......内存使用情况。我不想将整个数据集存储在内存中。


为了解决这个问题,我删除了 body-parser,而是将请求流直接通过管道传输到 zlib 转换流中:

apiRoute.route("/convert").post((req, res) =>{
    req.pipe(zlib.createGunzip());
});

现在的问题是我不知道如何从流中提取二进制值。


这就是我希望能够做到的:

apiRoute.route("/convert").post((req, res) =>{
    let binaryStream = new stream.Transform();

    req
    .pipe(zlib.createGunzip())
    .pipe(binaryStream);

    let aValue = binaryStream.getBytes(20);//returns 20 bytes
    let bValue = binaryStream.getBytes(20000);//returns the next 20000 bytes
    //etc...

});

但是我不知道有什么方法可以做到这一点。像Dissolve这样的模块很接近,但是它们需要您提前设置解析逻辑,并且所有抓取的值都存储在内存中。

另外,我不知道如何响应 outputData 而不将其全部加载到内存中。


所以我的问题是,我该如何...

标签: node.jsexpressasynchronousstreamgzip

解决方案


我解决了我自己的问题。我不是 100% 相信这是实现这一目标的最佳方式,所以我愿意接受建议。

我创建了一个子类stream.Transform并实现了该_transform方法。我发现下一个数据块只有在_transform调用回调时才会得到输入。知道了这一点,我将该回调函数存储为一个属性,并且仅在需要下一个块时才调用它。

getBytes(size)是一种方法,将从当前块(也保存为属性)中获取指定数量的字节,并在需要下一个块时调用先前保存的回调。这是递归完成的,以考虑不同大小的块和不同数量的请求字节。

然后结合 async/await 和 promises,我能够保持整个过程异步(afaik)和背压。

const {Transform} = require('stream'),
events = require('events');

class ByteStream extends Transform{

    constructor(options){
        super(options);

        this.event_emitter = new events.EventEmitter();
        this.hasStarted = false;
        this.hasEnded = false;
        this.currentChunk;
        this.nextCallback;
        this.pos = 0;

        this.on('finish', ()=>{
            this.hasEnded = true;
            this.event_emitter.emit('chunkGrabbed');
        });
    }

    _transform(chunk, enc, callback){
        this.pos = 0;
        this.currentChunk = chunk;
        this.nextCallback = callback;

        if(!this.hasStarted){
            this.hasStarted = true;
            this.event_emitter.emit('started');
        }
        else{
            this.event_emitter.emit('chunkGrabbed');
        }
    }

    doNextCallback(){
        return new Promise((resolve, reject) =>{
            this.event_emitter.once('chunkGrabbed', ()=>{resolve();});
            this.nextCallback();
        });
    }

    async getBytes(size){
        if(this.pos + size > this.currentChunk.length)
        {
            let bytes = this.currentChunk.slice(this.pos, this.currentChunk.length);

            if(!this.hasEnded)
            {
                var newSize = size-(this.currentChunk.length - this.pos);
                //grab next chunk
                await this.doNextCallback();
                if(!this.hasEnded){
                    this.pos = 0;
                    let recurseBytes; await this.getBytes(newSize).then(bytes => {recurseBytes = bytes;});
                    bytes = Buffer.concat([bytes, recurseBytes]);
                }
            }

            return bytes;
        }
        else{
            let bytes = this.currentChunk.slice(this.pos, this.pos+size);
            this.pos += size;
            return bytes;
        }
    }
}

module.exports = {
    ByteStream : ByteStream 
}

我的快速路线现在是:

apiRoute.route("/convert").post((req, res)=>{

    let bStream = new ByteStream({});
    let gStream = zlib.createGunzip();

    bStream event_emitter.on('started', async () => {
        console.log("started!");

        let myValue; await bStream.getBytes(60000).then(bytes => {myValue = bytes});
        console.log(myValue.length);
    });

    req
    .pipe(gStream)
    .pipe(bStream);
});

通过检查事件started,我可以知道第一个块何时流入bStream. 从那里开始,只需getBytes()使用我想要的字节数进行调用,然后将承诺的值分配给一个变量。它可以满足我的需要,尽管我还没有进行任何严格的测试。


推荐阅读