首页 > 解决方案 > 如何在具有背压支持的节点中快速流式传输巨大的字符串?

问题描述

真正的用例是通过 SSH 发送一个巨大的字符串,但为简单起见,我将用两个过程来演示它。请参阅下面的代码。

当我将这个巨大的流转换为可读流并将其通过管道传输到子进程时,它会发送非常小的数据块,每个数据块只有几个字节(~80),因此传输速率非常慢。

作为替代方案,如果我直接写入子进程 ( stdin.write),它会非常快,并且每个块的大小都是正确的 - 8KB。但是,这种方法不支持背压,所以如果字符串很大,消费者很慢,就会不堪重负。

我的问题是,如何将字符串通过管道传输到子进程但具有正常的块大小。

父.js

const stream = require('stream');
const spawn = require('child_process').spawn;

const child = spawn('node', ['child.js'], {});
const strSize = 1000000;
const hugeStr = [...Array(strSize)].map(i=>(~~(Math.random()*36)).toString(36)).join('');
console.log('PARENT: hugeStr size: ', hugeStr.length);

const readable = stream.Readable.from(hugeStr);
readable.pipe(child.stdin);

// an alternative to the pipe, but with no support for back-pressure
// child.stdin.write(hugeStr);
// child.stdin.end();

child.js

const fs = require('fs');

const debug = (str) => fs.writeFileSync('debug.log', `\n${str}`, { flag: 'a' });

function getDataFromStdin() {
  let data = '';
  return new Promise((resolve, reject) => {
    process.stdin.on('data', (chunk) => {
      data += chunk.toString();
      const size = chunk.byteLength;
      debug(`DATA ${size}B. ${Math.floor(size / 1024)}KB`);
    });

    process.stdin.on('end', () => {
      const size = data.length;
      debug(`END TOTAL DATA ${size}B. ${Math.floor(size / 1024)}KB ${Math.floor(size / 1024 / 1024)}MB`);
      resolve();
    });

    process.stderr.on('data', (dataErr) => debug(`child error ${dataErr.toString()}`));
  });

}

getDataFromStdin()
  .then(() => debug('CHILD: COMPLETED'))
  .catch(err => debug(`CHILD ERR: ${err}`))

标签: node.jsstream

解决方案


推荐阅读