node.js - 从低内存消耗的流中提取二进制值
问题描述
我正在使用 ExpressJS 构建 NodeJS 服务器,该服务器处理通过桌面应用程序的POST 请求发送的数据( 50KB到>100MB ) ,以进行处理和返回。桌面应用 gzip 在发送前压缩数据(50KB 变为 4KB)。
我希望服务器解压缩数据,从数据中提取值(字符串、整数、字符、数组、json 等),处理该数据,然后用处理后的数据进行响应。
我从这个开始:
apiRoute.route("/convert").post(bodyParser.raw({limit: '100Mb'}), (req, res) =>{
let outputData;
//extract values from req.body Buffer and do math on them.
//save processed data in outputData
res.json({
status: true,
data: outputData
});
});
这是有效的,因为 body-parser 将数据解压缩到req.body
存储在内存中的 Buffer 中。这是我的主要问题......内存使用情况。我不想将整个数据集存储在内存中。
为了解决这个问题,我删除了 body-parser,而是将请求流直接通过管道传输到 zlib 转换流中:
apiRoute.route("/convert").post((req, res) =>{
req.pipe(zlib.createGunzip());
});
现在的问题是我不知道如何从流中提取二进制值。
这就是我希望能够做到的:
apiRoute.route("/convert").post((req, res) =>{
let binaryStream = new stream.Transform();
req
.pipe(zlib.createGunzip())
.pipe(binaryStream);
let aValue = binaryStream.getBytes(20);//returns 20 bytes
let bValue = binaryStream.getBytes(20000);//returns the next 20000 bytes
//etc...
});
但是我不知道有什么方法可以做到这一点。像Dissolve这样的模块很接近,但是它们需要您提前设置解析逻辑,并且所有抓取的值都存储在内存中。
另外,我不知道如何响应 outputData 而不将其全部加载到内存中。
所以我的问题是,我该如何...
- 以我自己的速度异步读取流中的数据并提取其中的值
- 将处理后的数据发送回桌面应用程序,而不将其全部放入内存
解决方案
我解决了我自己的问题。我不是 100% 相信这是实现这一目标的最佳方式,所以我愿意接受建议。
我创建了一个子类stream.Transform
并实现了该_transform
方法。我发现下一个数据块只有在_transform
调用回调时才会得到输入。知道了这一点,我将该回调函数存储为一个属性,并且仅在需要下一个块时才调用它。
getBytes(size)
是一种方法,将从当前块(也保存为属性)中获取指定数量的字节,并在需要下一个块时调用先前保存的回调。这是递归完成的,以考虑不同大小的块和不同数量的请求字节。
然后结合 async/await 和 promises,我能够保持整个过程异步(afaik)和背压。
const {Transform} = require('stream'),
events = require('events');
class ByteStream extends Transform{
constructor(options){
super(options);
this.event_emitter = new events.EventEmitter();
this.hasStarted = false;
this.hasEnded = false;
this.currentChunk;
this.nextCallback;
this.pos = 0;
this.on('finish', ()=>{
this.hasEnded = true;
this.event_emitter.emit('chunkGrabbed');
});
}
_transform(chunk, enc, callback){
this.pos = 0;
this.currentChunk = chunk;
this.nextCallback = callback;
if(!this.hasStarted){
this.hasStarted = true;
this.event_emitter.emit('started');
}
else{
this.event_emitter.emit('chunkGrabbed');
}
}
doNextCallback(){
return new Promise((resolve, reject) =>{
this.event_emitter.once('chunkGrabbed', ()=>{resolve();});
this.nextCallback();
});
}
async getBytes(size){
if(this.pos + size > this.currentChunk.length)
{
let bytes = this.currentChunk.slice(this.pos, this.currentChunk.length);
if(!this.hasEnded)
{
var newSize = size-(this.currentChunk.length - this.pos);
//grab next chunk
await this.doNextCallback();
if(!this.hasEnded){
this.pos = 0;
let recurseBytes; await this.getBytes(newSize).then(bytes => {recurseBytes = bytes;});
bytes = Buffer.concat([bytes, recurseBytes]);
}
}
return bytes;
}
else{
let bytes = this.currentChunk.slice(this.pos, this.pos+size);
this.pos += size;
return bytes;
}
}
}
module.exports = {
ByteStream : ByteStream
}
我的快速路线现在是:
apiRoute.route("/convert").post((req, res)=>{
let bStream = new ByteStream({});
let gStream = zlib.createGunzip();
bStream event_emitter.on('started', async () => {
console.log("started!");
let myValue; await bStream.getBytes(60000).then(bytes => {myValue = bytes});
console.log(myValue.length);
});
req
.pipe(gStream)
.pipe(bStream);
});
通过检查事件started
,我可以知道第一个块何时流入bStream
. 从那里开始,只需getBytes()
使用我想要的字节数进行调用,然后将承诺的值分配给一个变量。它可以满足我的需要,尽管我还没有进行任何严格的测试。
推荐阅读
- git - 如何以不会被跟踪的方式将文件存储在 GIT 中?
- android-biometric-prompt - 由于“您需要使用 Theme.AppCompat 主题”而导致生物识别提示崩溃
- javascript - 使用 location.href = "site" 时如何启用历史记录返回按钮?
- java - 在 Java 中将 int 数组保存为图像时出现问题
- sql - 如何从 REGEXP_EXTRACT_ALL 旋转数组构建
- ios - 集合视图不显示数据
- python - RuntimeError:针对 API 版本 0xe 编译的模块,但此版本的 numpy 为 0xd
- php - Laravel 8 - 关系返回 null
- collections - 以相同的方式处理单个和多个元素(“透明”映射运算符)
- reactjs - 为什么我的 React 应用程序没有在 Vercel 或 Nettlify 上提取 API?