首页 > 解决方案 > 节点双工流在读取时实际上并不发出数据

问题描述

我正在研究一种最终用于 nodejs 中键值对象流的数据接收器。

我偶然发现了双工流,并开始玩弄它们以弄湿我的脚,但我尝试的一切似乎都不起作用。

目前,我有这个双工流:

class StorageStream extends stream.Duplex {
  constructor() {
    super({
      objectMode: true
    })

    this.values = new Map();
    this.entries = this.values.entries();
  }

  _write(data, encoding, next) {
    const { key, value } = data;

    this.values.set(key, value);

    next();
  }

  _read() {
    const next = this.entries.next();

    if (next.value) {
      this.push(next.value);
    }
  }
}

这是一个超级做作的例子,但本质上,当我写入这个流时,它应该将键和值存储在 Map 中,当我从这个流中读取时,它应该开始从映射中读取并将它们传递到流中。但是,这不起作用,基本上执行以下操作

const kvstream = createKVStreamSomeHow(); // a basic, readable stream with KV Pairs

const logger = createLoggerStreamSomeHow(); // writable stream, logs the data coming through

const storage = new StorageStream();

kvstream.pipe(storage).pipe(logger);

导致该过程刚刚结束。所以我想我对我应该在方法内部做什么有点困惑_read

标签: javascriptnode.jsduplexnode-streams

解决方案


OP提供的代码中的一些观察结果:

  1. 遍历返回的键的迭代器read()是在设置任何键之前生成的,在:this.entries = this.values.entries();. 因此,调用 read() 永远不会产生输出。
  2. 如果在 Map 中设置了新键,则不会将其推送到读取缓冲区以供后续可写处理

可以使用内置的Transform (docs)构造函数来简化双工实现。转换构造函数非常适合存储转发场景。

这是如何在这种情况下应用流转换的示例。请注意,该pipeline()函数不是必需的,并且已在此示例中用于简化等待 readable 已发出其所有数据:

const { Writable, Readable, Transform, pipeline } = require('stream');

class StorageStream extends Transform {
  constructor() {
    super({
      objectMode: true
    })

    this.values = new Map();
  }

  _transform(data, encoding, next) {
    const { key, value } = data;

    this.values.set(key, value);
    console.log(`Setting Map key ${key} := ${value}`)

    next(null, data);
  }
}

(async ()=>{
  await new Promise( resolve => {
    pipeline(
      new Readable({
        objectMode: true,
        read(){
          this.push( { key: 'foo', value: 'bar' } );
          this.push( null );
        }
      }),
      new StorageStream(),
      new Writable({
        objectMode: true,
        write( chunk, encoding, next ){
          console.log("propagated:", chunk);
          next();
        }
      }),
      (error) => {
        if( error ){
          reject( error );
        }
        else {
          resolve();
        }
      }
    );
  });
})()
  .catch( console.error );

这会产生以下输出

> Setting Map key foo := bar
> propagated: { key: 'foo', value: 'bar' }

并且可以用作

kvstream.pipe(new StorageStream()).pipe(logger);

推荐阅读