node.js - 如何在具有背压支持的节点中快速流式传输巨大的字符串?
问题描述
真正的用例是通过 SSH 发送一个巨大的字符串,但为简单起见,我将用两个过程来演示它。请参阅下面的代码。
当我将这个巨大的流转换为可读流并将其通过管道传输到子进程时,它会发送非常小的数据块,每个数据块只有几个字节(~80),因此传输速率非常慢。
作为替代方案,如果我直接写入子进程 ( stdin.write
),它会非常快,并且每个块的大小都是正确的 - 8KB。但是,这种方法不支持背压,所以如果字符串很大,消费者很慢,就会不堪重负。
我的问题是,如何将字符串通过管道传输到子进程但具有正常的块大小。
父.js
const stream = require('stream');
const spawn = require('child_process').spawn;
const child = spawn('node', ['child.js'], {});
const strSize = 1000000;
const hugeStr = [...Array(strSize)].map(i=>(~~(Math.random()*36)).toString(36)).join('');
console.log('PARENT: hugeStr size: ', hugeStr.length);
const readable = stream.Readable.from(hugeStr);
readable.pipe(child.stdin);
// an alternative to the pipe, but with no support for back-pressure
// child.stdin.write(hugeStr);
// child.stdin.end();
child.js
const fs = require('fs');
const debug = (str) => fs.writeFileSync('debug.log', `\n${str}`, { flag: 'a' });
function getDataFromStdin() {
let data = '';
return new Promise((resolve, reject) => {
process.stdin.on('data', (chunk) => {
data += chunk.toString();
const size = chunk.byteLength;
debug(`DATA ${size}B. ${Math.floor(size / 1024)}KB`);
});
process.stdin.on('end', () => {
const size = data.length;
debug(`END TOTAL DATA ${size}B. ${Math.floor(size / 1024)}KB ${Math.floor(size / 1024 / 1024)}MB`);
resolve();
});
process.stderr.on('data', (dataErr) => debug(`child error ${dataErr.toString()}`));
});
}
getDataFromStdin()
.then(() => debug('CHILD: COMPLETED'))
.catch(err => debug(`CHILD ERR: ${err}`))
解决方案
推荐阅读
- tsql - 我可以使用“WAITFOR DELAY”来执行程序吗
- android - android - 单击后退按钮时,Google 地图在进度视图后面闪烁
- robots.txt - 禁止 /lev1/*.html,但允许 /lev1/lev2/*.html
- python - 如何将多个网格节点附加到搅拌机中的一个对象?
- docker - 在代理后面运行命令时,我无法从 docker 注册表中提取
- android - react-native:如何从 android native 模块中显示的 JS 视图访问 react-native 库
- javascript - 条件环境变量 tree-shaking
- javascript - Chart.js 图例对齐左侧
- python - 就地更改熊猫系列/数据框列的类型
- tensorflow2.0 - TensorFlow 2.0 中是否有 global_step 的替代品?