首页 > 解决方案 > 如何自定义 ReadablerStreamReader 来处理数组 JSON

问题描述

我有 user.json (假设这将是一个大文件,我想流式读取这个文件,但限制块大小)。

[
  {
    "name": "John Doe",
    "occupation": "gardener",
    "born": "1992-03-02"
  },
  {
    "name": "Brian Flemming",
    "occupation": "teacher",
    "born": "1967-11-22"
  },
  {
    "name": "Lucy Black",
    "occupation": "accountant",
    "born": "1995-04-07"
  },
  {
    "name": "William Bean",
    "occupation": "pilot",
    "born": "1977-10-31"
  }
]

我的示例代码。

const fs = require('fs');
const stream = require('stream');

async function logChunks(readable) {
  for await (const chunk of readable) {
    console.log('---------start')
    console.log(chunk.toString());
    console.log('---------end')
  }
}

const readStream = fs.createReadStream('users.json', {highWaterMark: 120 })
logChunks(readStream)

输出看起来像

---------start
[
  {
    "name": "John Doe",
    "occupation": "gardener",
    "born": "1992-03-02"
  }
  ,
  {
    "name": "Brian Flem
---------end
---------start
ming",
    "occupation": "teacher",
    "born": "1967-11-22"
  }
  ,
  {
    "name": "Lucy Black",
    "occupation": "ac
---------end
---------start
countant",
    "born": "1995-04-07"
  }
  ,
  {
    "name": "William Bean",
    "occupation": "pilot",
    "born": "1977
---------end
---------start
-10-31"
  }
]

---------end

我的目标是从多个块中提取 json 对象,以便它可以是 JSON.parse。

我没有找到 node.js 的任何 JSONStreamParse,所以我希望我能在这里得到一些专业的想法。谢谢


更新:

我有一个选择解决方案是使用第 3 方解决方案。流-json

await util.promisify(stream.pipeline)(
    readStream,
    StreamArray.withParser(),
    async function( parsedArrayEntriesIterable ){
      for await (const {key: arrIndex, value: arrElem} of parsedArrayEntriesIterable) {
        console.log("Parsed array element:", arrElem);
      }
    }
  )

标签: node.jsparsingjsonstream

解决方案


我阅读了您对您的问题的更新,并意识到我对您的问题留下的评论完全不合时宜。由于您使用的是流,因此您不想等待所有数据以避免内存耗尽。我应该在一开始就注意到这一点。

让我给你一些例子来说明我的道歉。我希望这有助于理解如何使用流。

为了让示例更真实,让我们像这样模拟从远程服务器获取 json node-fetchnode-fetch返回的实例ReadableStream也是asyncIterable. 我们可以通过将异步生成器函数传递给stream.Readable.from()如下来轻松创建它。

的定义fetch()

async function* asyncGenerator (chunks) {
  let counter = 1;
  for (const chunk of chunks) {
    await new Promise(resolve => setTimeout(resolve, 1000));
    console.log(`==== chunk ${counter++} transmitted =====================`);
    yield chunk;
  }
}

const stream = require('stream');

// simulates node-fetch
async function fetch (json) {
  const asyncIterable = asyncGenerator(json);
  // let the client wait for 0.5 sec.
  await new Promise(resolve => setTimeout(resolve, 500));
  return new Promise(resolve => {
    // returns the response object
    resolve({ body: stream.Readable.from(asyncIterable) });
  });
}

fetch()获取响应对象需要 0.5 秒。它返回Promise解析为body提供ReadableStream. 这个可读流每秒不断地向下游发送 json 数据块,如asyncGenerator().

我们的fetch()函数将一个分块的 json 数组作为参数而不是 URL。让我们使用您提供的那个,但我们在稍有不同的点将其拆分,因此在收到第二个块后,我们得到了两个完整的对象。

const chunkedJson = [
  // chunk 1
  `[
  {
    "name": "John Doe",
    "occupation": "gardener",
    "born": "1992-03-02"
  }
  ,
  {
    "name": "Brian Flem`,
  // chunk 2
  `ming",
    "occupation": "teacher",
    "born": "1967-11-22"
  }
  ,
  {
    "name": "Lucy Black",
    "occupation": "accountant",
    "born": "1995-04-07"
  }`,
  // chunk 3
  `,
  {
    "name": "William Bean",
    "occupation": "pilot",
    "born": "1977`,
  // chunk 4
  `-10-31"
  }
]`
];

现在,有了这些数据,您可以确认fetch()如下工作方式。

示例 1:测试fetch()

async function example1 () {
  const response = await fetch(chunkedJson);
  for await (const chunk of response.body) {
    console.log(chunk);
  }
}

example1();
console.log("==== Example 1 Started ==============");

示例 1 的输出。

==== Example 1 Started ==============
==== chunk 1 transmitted =====================
[
  {
    "name": "John Doe",
    "occupation": "gardener",
    "born": "1992-03-02"
  }
  ,
  {
    "name": "Brian Flem
==== chunk 2 transmitted =====================
ming",
    "occupation": "teacher",
    "born": "1967-11-22"
  }
  ,
  {
    "name": "Lucy Black",
    "occupation": "accountant",
    "born": "1995-04-07"
  }
==== chunk 3 transmitted =====================
,
  {
    "name": "William Bean",
    "occupation": "pilot",
    "born": "1977
==== chunk 4 transmitted =====================
-10-31"
  }
]

现在,让我们处理这个 json 数据的每个元素,而不是等待整个数据到达。

StraemArraystream.Transform的子类。所以它同时具有 和 的ReadableStream接口WritableStream。如果流实例与您连接,pipe()则不必担心背压,因此我们通过管道传输两个流,即。和实例一起ReadableStream获得,如下面的示例 2 所示。fetch()StreamArrayresponse.body.pipe(StreamArray.withParser())

为方法链接返回自身的实例,因此该变量pipe(StreamArray.withParser())现在保存对转换流的引用,该转换流也是一个可读流。我们可以将事件侦听器附加到它以使用转换后的数据。StreamArraypipeline

StreamArraydata当从可读源解析单个对象时发出事件。因此pipiline.on('data', callback),无需等待整个 json 数据即可逐块处理。

当事件监听器注册到data事件时pipiline.on('data', callback),流开始流动。

由于我们模拟的是异步取数据,所以你可以!!!! MAIN THREAD !!!!在数据传输的中间看到控制台中的 。您可以确认主线程在等待解析数据时没有被阻塞。

示例 2:测试stream-json在每个数组元素到达时对其进行处理

const StreamArray = require('stream-json/streamers/StreamArray');

async function example2 () {
  const response = await fetch(chunkedJson);
  const pipeline = response.body.pipe(StreamArray.withParser());
  const timer = setInterval(() => console.log("!!!! MAIN THREAD !!!!"), 500);
  pipeline.on('data', ({ key, value }) => {
    console.log("====== stream-json StreamArray() RESULT ========");
    console.log(value); // do your data processing here
  }).on('close', () => {
    clearInterval(timer); // stop the main thread console.log
  });
}

example2();
console.log("==== Example 2 Started ==============");

示例 2 的输出。

==== Example 2 Started ==============
!!!! MAIN THREAD !!!!
==== chunk 1 transmitted =====================
====== stream-json StreamArray() RESULT ========
{ name: 'John Doe', occupation: 'gardener', born: '1992-03-02' }
!!!! MAIN THREAD !!!!
!!!! MAIN THREAD !!!!
==== chunk 2 transmitted =====================
====== stream-json StreamArray() RESULT ========
{ name: 'Brian Flemming', occupation: 'teacher', born: '1967-11-22' }
====== stream-json StreamArray() RESULT ========
{ name: 'Lucy Black', occupation: 'accountant', born: '1995-04-07' }
!!!! MAIN THREAD !!!!
!!!! MAIN THREAD !!!!
==== chunk 3 transmitted =====================
!!!! MAIN THREAD !!!!
!!!! MAIN THREAD !!!!
==== chunk 4 transmitted =====================
====== stream-json StreamArray() RESULT ========
{ name: 'William Bean', occupation: 'pilot', born: '1977-10-31' }

由于所有流都是实例,EventEmitter因此您可以简单地将回调附加到data事件以使用示例 2 中的最终数据。但是,由于处理背压,因此最好使用pipe()最终数据消耗。pipe()

当下游的数据消耗比上游的数据馈送慢时,就会出现背压问题。例如,当您的数据处理需要时间时,您可能希望异步处理每个块。如果处理下一个块在前一个块之前完成,则下一个块在第一个块之前被推送到下游。如果下游在处理下一个块之前依赖于第一个块,则会导致麻烦。

当您使用事件监听器时,您必须手动控制暂停和恢复以避免背压(请参阅此示例)。但是,如果您将流与pipe()背压问题连接起来,则会在内部进行处理。这意味着当下游比上游慢时,pipe()将自动暂停向下游进料。

因此,让我们创建自己WritableStream的以连接到StreamArraywith pipe()。在我们的例子中,我们从上游(即。StreamArray)而不是字符串接收二进制数据,我们必须设置objectModetrue. 我们覆盖_write()将在内部调用的函数write()。您将所有数据处理逻辑放在这里并callback()在完成时调用。上游不会提供下一个数据,直到当流连接时调用回调pipe()

为了模拟背压,我们将块 1 和 3 处理 1.5 秒,将块 0 和 4 处理为零秒。

示例 3:管道我们自己的流实例

class MyObjectConsumerStream extends stream.Writable {
  constructor(options) {
    super({ ...options, objectMode: true });
  }

  _write(chunk, encoding, callback) {
    const { key, value } = chunk; // receive from StreamArray of stream-json
    console.log("===== started to processing the chunk ........... ");
    setTimeout(() => {
      console.log("====== Example 3 RESULT ========");
      console.log(value); // do your data processing here
      callback(); // pipe() will pause the upstream until callback is called
    }, key % 2 === 0 ? 1500 : 0); // for second and fourth chunk it processes 0 sec!
  }
}

//--- Example 3: We write our own WritableStream to consume chunked data ------
async function example3 () {
  const response = await fetch(chunkedJson);
  response.body.pipe(StreamArray.withParser()).pipe(new MyObjectConsumerStream()).on('finish', () => {
    clearInterval(timer); // stop the main thread console.log
  });
  const timer = setInterval(() => console.log("!!!! MAIN THREAD !!!!"), 500);
}

example3();
console.log("==== Example 3 Started ==============");

示例 3 的输出。

==== Example 3 Started ==============
!!!! MAIN THREAD !!!!
==== chunk 1 transmitted =====================
===== started to processing the chunk ........... 
!!!! MAIN THREAD !!!!
!!!! MAIN THREAD !!!!
==== chunk 2 transmitted =====================
!!!! MAIN THREAD !!!!
====== Example 3 RESULT ========
{ name: 'John Doe', occupation: 'gardener', born: '1992-03-02' }
===== started to processing the chunk ........... 
!!!! MAIN THREAD !!!!
====== Example 3 RESULT ========
{ name: 'Brian Flemming', occupation: 'teacher', born: '1967-11-22' }
===== started to processing the chunk ........... 
==== chunk 3 transmitted =====================
!!!! MAIN THREAD !!!!
!!!! MAIN THREAD !!!!
====== Example 3 RESULT ========
{ name: 'Lucy Black', occupation: 'accountant', born: '1995-04-07' }
==== chunk 4 transmitted =====================
===== started to processing the chunk ........... 
!!!! MAIN THREAD !!!!
====== Example 3 RESULT ========
{ name: 'William Bean', occupation: 'pilot', born: '1977-10-31' }

您可以确认接收到的数据是有序的。您还可以看到第二个块的传输在处理第一个对象时开始,因为我们将其设置为 1.5 秒。现在,让我们使用事件监听器做同样的事情,如下所示。

示例 4:简单回调的背压问题

async function example4 () {
  const response = await fetch(chunkedJson);
  const pipeline = response.body.pipe(StreamArray.withParser());
  const timer = setInterval(() => console.log("!!!! MAIN THREAD !!!!"), 500);
  pipeline.on('data', ({ key, value }) => {
    console.log("===== started to processing the chunk ........... ");
    setTimeout(() => {
      console.log(`====== Example 4 RESULT ========`);
      console.log(value); // do your data processing here
    }, key % 2 === 0 ? 1500 : 0); // for second and thrid chunk it processes 0 sec!
  }).on('close', () => {
    clearInterval(timer); // stop the main thread console.log
  });
}

example4();
console.log("==== Example 4 Started ==============");

示例 4 的输出。

==== Example 4 Started ==============
!!!! MAIN THREAD !!!!
==== chunk 1 transmitted =====================
===== started to processing the chunk ........... 
!!!! MAIN THREAD !!!!
!!!! MAIN THREAD !!!!
==== chunk 2 transmitted =====================
===== started to processing the chunk ........... 
===== started to processing the chunk ........... 
!!!! MAIN THREAD !!!!
====== Example 4 RESULT ========
{ name: 'Brian Flemming', occupation: 'teacher', born: '1967-11-22' }
====== Example 4 RESULT ========
{ name: 'John Doe', occupation: 'gardener', born: '1992-03-02' }
!!!! MAIN THREAD !!!!
==== chunk 3 transmitted =====================
!!!! MAIN THREAD !!!!
====== Example 4 RESULT ========
{ name: 'Lucy Black', occupation: 'accountant', born: '1995-04-07' }
!!!! MAIN THREAD !!!!
==== chunk 4 transmitted =====================
===== started to processing the chunk ........... 
====== Example 4 RESULT ========
{ name: 'William Bean', occupation: 'pilot', born: '1977-10-31' }

现在,我们看到第二个元素“Brian”在“John”之前到达。如果块 1 和 3 的处理时间增加到 3 秒,则最后一个元素“William”也会在第三个元素“Lucy”之前到达。

因此,pipe()当数据到达的顺序很重要时,使用而不是事件侦听器来消费数据是一个很好的做法。

您可能想知道为什么API 文档中的示例代码使用它们自己的chain()函数来创建管道。它是 Node.js 流式编程中错误处理的推荐设计模式。如果错误在管道的下游抛出,它不会将错误传播到上游。因此,您必须按如下方式在管道中的每个流上附加回调(这里我们假设有三个流a, b, c)。

a.on('error', callbackForA)
 .pipe(b).on('error', callbackForB)
 .pipe(c).on('error', callbackForC)

与可以简单添加到链尾的 Promise 链相比,它看起来很麻烦.catch()。即使我们如上所述设置了所有错误处理程序,但这仍然不够。

当下游抛出错误时,错误导致的流将从管道中分离出来unpipe(),但是,上游不会自动销毁。这是因为有可能将多个流连接到上游以分支流线。所以当你使用pipe().

为了解决这些问题,社区提供了管道构建库。我认为chain()from流链就是其中之一。从 Node 版本 10 开始,为此功能添加了stream.pipeline 。我们可以使用这个官方的管道构造函数,因为其中的所有流stream-json都是常规流实例的子类。

在展示stream.pipiline让我们修改MyObjectConsumerStream类以在处理第二个对象时抛出错误的用法之前。

引发错误的自定义流

class MyErrorStream extends MyObjectConsumerStream {
  _write(chunk, encoding, callback) {
    const { key, value } = chunk; // receive from StreamArray of stream-json
    console.log("===== started to processing the chunk ........... ");
    if (key === 2)
      throw new Error("Error in key 2");
    setTimeout(() => {
      console.log("====== Example 5 RESULT ========");
      console.log(value); // do your data processing here
      callback(); // pipe() will pause the upstream until callback is called
    }, key % 2 === 0 ? 1500 : 0); // for second and fourth chunk it processes 0 sec!
  };
}

stream.pipeline按顺序接收多个流以及最后的错误处理程序。错误处理程序接收抛出错误的实例Error,并null在成功完成时接收。

示例 5:使用stream.pipeline

async function example5 () {
  const response = await fetch(chunkedJson);
  const myErrorHandler = (timerRef) => (error) => {
    if (error)
      console.log("Error in the pipiline", error.message);
    else
      console.log("Finished Example 5 successfully");
    clearInterval(timerRef); // stop the main thread console.log
  }
  const timer = setInterval(() => console.log("!!!! MAIN THREAD !!!!"), 500);
  stream.pipeline(
    response.body,
    StreamArray.withParser(),
    new MyErrorStream(),
    myErrorHandler(timer)
  );
  console.log("==== Example 5 Started ==============");
}

example5();

示例 5 的输出

==== Example 5 Started ==============
!!!! MAIN THREAD !!!!
!!!! MAIN THREAD !!!!
==== chunk 1 transmitted =====================
===== started to processing the chunk ........... 
!!!! MAIN THREAD !!!!
!!!! MAIN THREAD !!!!
==== chunk 2 transmitted =====================
!!!! MAIN THREAD !!!!
====== Example 5 RESULT ========
{ name: 'John Doe', occupation: 'gardener', born: '1992-03-02' }
===== started to processing the chunk ........... 
====== Example 5 RESULT ========
{ name: 'Brian Flemming', occupation: 'teacher', born: '1967-11-22' }
===== started to processing the chunk ........... 
/Users/shito/Documents/git-repositories/javascript/stackoverflow/JS/FailToParseJasonStream/ParseChunkedJsonAnswer.js:211
      throw new Error("Error in key 2");
      ^

Error: Error in key 2
    at MyErrorStream._write (/Users/shito/Documents/git-repositories/javascript/stackoverflow/JS/FailToParseJasonStream/ParseChunkedJsonAnswer.js:211:13)
    at doWrite (internal/streams/writable.js:377:12)
    at clearBuffer (internal/streams/writable.js:529:7)
    at onwrite (internal/streams/writable.js:430:7)
    at Timeout._onTimeout (/Users/shito/Documents/git-repositories/javascript/stackoverflow/JS/FailToParseJasonStream/ParseChunkedJsonAnswer.js:215:7)
    at listOnTimeout (internal/timers.js:554:17)
    at processTimers (internal/timers.js:497:7)

抛出错误时,stream.pipeline()调用stream.destroy(error)所有未正确关闭或完成的流。所以我们不必担心内存泄漏。


推荐阅读