首页 > 解决方案 > 解析大型 JSON 文件时两个 NodeJS 流之间的竞争条件

问题描述

我必须解析具有以下格式的大型(100+ MB)JSON 文件:

{
 "metadata": {
   "account_id": 1234
   // etc.
 },
 "transactions": [
   {
     "transaction_id": 1234,
     "amount": 2
   },
   // etc. for (potentially) 1000's of lines
 ]
}

此解析的输出是一个 JSON 数组,account_id每个都附加transactions

[
 {
   "account_id": 1234,
   "transaction_id": 1234,
   "amount": 2
 },
 // etc.
]

我正在使用stream-json库来避免将整个文件同时加载到内存中。stream-json 允许我选择单个属性,然后一次流式传输它们,具体取决于它们是数组还是对象

我还试图通过将 JSON 文件的读取传输到两个单独的流来避免两次解析 JSON,这在 nodejs 中是可能的

我使用Transform流来生成输出,在存储account_id.

下面的伪代码(具有明显的竞争条件):

const { parser } = require('stream-json');
const { pick } = require('stream-json/filters/Pick');
const { streamArray } = require('stream-json/streamers/StreamArray');
const { streamObject } = require('stream-json/streamers/StreamObject');
const Chain = require('stream-chain');
const { Transform } = require('stream');

let createOutputObject = new Transform({
 writableObjectMode:true,
 readableObjectMode:true,
 transform(chunk, enc, next) => {
  if (createOuptutObject.account_id !== null) {
     // generate the output object
  } else {
     // Somehow store the chunk until we get the account_id...
  } 
 } 
});
createOutputObject.account_id = null;

let jsonRead = fs.createReadStream('myJSON.json');
let metadataPipline = new Chain([
  jsonRead,
  parser(),
  pick({filter: 'metadata'}),
  streamObject(),
]);

metadataPipeline.on('data', data => {
 if (data.key === 'account_id') {
  createOutputObject.account_id = data.value;
 }
});

let generatorPipeline = new Chain([
 jsonRead, // Note same Readable stream as above
 parser(),
 pick({filter: 'tracks'}),
 streamArray(),
 createOutputObject,
 transformToJSONArray(),
 fs.createWriteStream('myOutput.json')
]);

为了解决这个竞争条件(即在设置之前转换为 JSON 数组account_id),我尝试过:

我考虑过使用 stream-json 的streamValues函数,它可以让我做一个pickofmetadatatransactions. 但是文档让我相信所有内容transactions都会被加载到内存中,这是我试图避免的:

与每个流媒体一样,它假设单个对象可以放入内存中,但应该流式传输整个文件或任何其他源。

还有其他东西可以解决这种竞争状况吗?无论如何我可以避免两次解析这个大的 JSON 流吗?

标签: node.jsjson

解决方案


推荐阅读