首页 > 技术文章 > node随笔-数据流Stream

xiaowang0808 2018-05-10 17:16 原文

一. 流(stream)

在 Node.js 中是处理流数据的抽象接口(abstract interface), stream 模块提供了基础的 API 。使用这些 API 可以很容易地来构建实现流接口的对象。

Node.js 提供了多种流对象。 例如, HTTP 请求 和 process.stdout 就都是流的实例。

流可以是可读的、可写的,或是可读写的。所有的流都是 EventEmitter 的实例。

二. 流的类型

Readable 可读的流  例如 fs.createReadStream()

Writable  可写的流  例如 fs.createWriteStream()

Duplex 可读写的流 例如 net.socket

Transform 在读写过程中可以修改和变换数据的 Duplex 流 例如 zlib.createDeflate(

三.缓冲

Writable 和 Readable流都会将数据存储到内部的缓冲器buffer中,这些缓冲器可以通过相应的writable._writableState.getBuffer() 或 readable._readableState.buffer 来获取。

缓冲器的大小取决于传递给流构造函数的highWaterMark 选项,对于普通的流,hignWaterMark选项指定了总共的字节数,对于工作在对象模式的流,hignWaterMark指定了对象的总数。

可读流的实现调用stream.push(chunk)方法时,数据被放到缓冲器中,如果流的消费者没有调用stream.read()方法,这些数据会始终存在于内部队列中,直到被消费。

 

四.可写流 Writable

可写流是对数据写入目的地的一种抽象

相关事件:

close事件将在流或其底层资源(比如一个文件关闭后触发),该事件触发后 流不在触发任何事件。

error事件 事件在写入数据出错或者使用管道出错时触发,事件触发时,回调函数会接受到一个error的参数。注意error事件发生时,流并不会关闭。

finish 事件 在调用了stream.end()方法,且缓冲区数据都已经传给底层系统(underlying system)之后, finish事件将被触发。

pipe 事件  在可读流上调用stream.pipe()方法时,并在目标流向 (destinations) 中添加当前可写流 ( writable ) 时,将会在可写流上触发 pipe事件。

unpipe事件 在 可读流上调用 stream.unpipe方法,从目标流向中移除当前 Writable时,将会触发 unpipe事件。

相关的方法

writable.end([chunk], [,encoding], [,callback]) 可选的,需要写入的数据。对于非对象模式下的流, chunk 必须是字符串、或 Buffer、或 Uint8Array。对于对象模式下的流, chunk 可以是任意的 JavaScript 值,除了 null

在调用了writable.end()之后,在调用writeable.write()会报错

五.可读流Readable streams

可读流(Readable streams)是对提供数据的 源头 (source)的抽象。

1.可读流事实上工作在下面两种模式之一:flowing 和 paused 。

在flowing模式下,可读流自动从系统底层读取数据,并通过EventEmitter接口的事件尽快将数据供给应用。

在paused模式下,必须显示的调用stream.read()方法来从流中读取数据片段。

所有初始工作模式为paused的Readable流,可以通过以下三种途径切换为flowing模式:

1.监听data事件. 2.调用stream.resume()  3.调用stream.pipe()方法将数据发送到Writable.

可读流可以从以下三种路径切换为paused模式:

1.如果不存在管道目标,可以通过stream.paused()实现。

2.如果存在管道目标,可以通过取消监听data事件,并调用stream.unpipe()方法移除所有管道目标来实现。

可读流监听的事件

1.‘’close"事件将在流或其底层资源(比如一个文件)关闭后触发。”close" 事件触发后,该流将不会再触发任何事件。

2. 'data' 事件会在流将数据传递给消费者时触发。当流转换到 flowing 模式时会触发该事件。调用 readable.pipe(), readable.resume() 方法,或为 'data' 事件添加回调可以将流转换到 flowing 模式。 'data' 事件也会在调用 readable.read() 方法并有数据返回时触发。

3.end事件将在流中没有数据可供消费时触发, 'end' 事件只有在数据被完全消费后 才会触发 。 可以在数据被完全消费后,通过将流转换到 flowing 模式, 或反复调用 stream.read() 方法来实现这一点。

4.error事件可以在任何可读流中实现,通常,这会在底层系统内部从而不能产生数据,或当流的实现试图传递错误数据时触发。

5.readable事件将在流中有数据可供读取时触发

readable.isPaused() 方法返回可读流的当前操作状态。 该方法主要是在 readable.pipe() 方法的底层机制中用到。大多数情况下,没有必要直接使用该方法。

readable.pause() 方法将会使 flowing 模式的流停止触发 'data' 事件, 进而切出 flowing 模式。任何可用的数据都将保存在内部缓存中。

const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
  console.log(`Received ${chunk.length} bytes of data.`);
  readable.pause();
  console.log('There will be no additional data for 1 second.');
  setTimeout(() => {
    console.log('Now data will start flowing again.');
    readable.resume();
  }, 1000);
});

readable.pipe() 绑定一个 Writable 到 readable 上, 将可写流自动切换到 flowing 模式并将所有数据传给绑定的 Writable。数据流将被自动管理。这样,即使是可读流较快,目标可写流也不会超负荷(overwhelmed)。

readable.pipe() 方法返回 目标流 的引用

readable.readableHighWaterMark 返回构造该可读流时传入的 'highWaterMark' 属性。

readable.read([size]) 从内部缓冲区中抽出并返回一些数据,如果没有可读的数据 则返回null。readable.read()方法默认数据将做为Buffer数据返回,除非已经使用readable.setEncoding()方法设置编码或流运行在对象模式。

推荐阅读