node.js - 解析大型 JSON 文件时两个 NodeJS 流之间的竞争条件
问题描述
我必须解析具有以下格式的大型(100+ MB)JSON 文件:
{
"metadata": {
"account_id": 1234
// etc.
},
"transactions": [
{
"transaction_id": 1234,
"amount": 2
},
// etc. for (potentially) 1000's of lines
]
}
此解析的输出是一个 JSON 数组,account_id
每个都附加transactions
:
[
{
"account_id": 1234,
"transaction_id": 1234,
"amount": 2
},
// etc.
]
我正在使用stream-json库来避免将整个文件同时加载到内存中。stream-json 允许我选择单个属性,然后一次流式传输它们,具体取决于它们是数组还是对象
我还试图通过将 JSON 文件的读取传输到两个单独的流来避免两次解析 JSON,这在 nodejs 中是可能的。
我使用Transform流来生成输出,在存储account_id
.
下面的伪代码(具有明显的竞争条件):
const { parser } = require('stream-json');
const { pick } = require('stream-json/filters/Pick');
const { streamArray } = require('stream-json/streamers/StreamArray');
const { streamObject } = require('stream-json/streamers/StreamObject');
const Chain = require('stream-chain');
const { Transform } = require('stream');
let createOutputObject = new Transform({
writableObjectMode:true,
readableObjectMode:true,
transform(chunk, enc, next) => {
if (createOuptutObject.account_id !== null) {
// generate the output object
} else {
// Somehow store the chunk until we get the account_id...
}
}
});
createOutputObject.account_id = null;
let jsonRead = fs.createReadStream('myJSON.json');
let metadataPipline = new Chain([
jsonRead,
parser(),
pick({filter: 'metadata'}),
streamObject(),
]);
metadataPipeline.on('data', data => {
if (data.key === 'account_id') {
createOutputObject.account_id = data.value;
}
});
let generatorPipeline = new Chain([
jsonRead, // Note same Readable stream as above
parser(),
pick({filter: 'tracks'}),
streamArray(),
createOutputObject,
transformToJSONArray(),
fs.createWriteStream('myOutput.json')
]);
为了解决这个竞争条件(即在设置之前转换为 JSON 数组account_id
),我尝试过:
- 用于
createOutputObject.cork()
保存数据直到account_id
被设置。- 数据只是传递到
transformToJSONArray()
.
- 数据只是传递到
- 将
chunk
s 保存在数组中,createOutputObject
直到account_id
被设置。- 无法弄清楚如何在设置
chunk
后重新添加存储的 saccount_id
。
- 无法弄清楚如何在设置
- 稍后使用
setImmediate()
andprocess.nextTick()
调用createOutputObject.transform
,希望account_id
已设置。- 堆栈重载,什么都做不了。
我考虑过使用 stream-json 的streamValues
函数,它可以让我做一个pick
ofmetadata
和transactions
. 但是文档让我相信所有内容transactions
都会被加载到内存中,这是我试图避免的:
与每个流媒体一样,它假设单个对象可以放入内存中,但应该流式传输整个文件或任何其他源。
还有其他东西可以解决这种竞争状况吗?无论如何我可以避免两次解析这个大的 JSON 流吗?
解决方案
推荐阅读
- javascript - 错误解析错误:将函数部署到firebase时正则表达式标志无效
- r - 使用插入符号中的指标 ROC 优化模型
- google-bigquery - 为什么 bq 查询命令只将 100 行写入文件?
- ms-access - 从数据表复制到 ListBox
- django - Django:将其转换为 DetailView - 使用来自 URL 的两个参数的视图函数
- javascript - 将列表传递给 js、thymeleaf、springboot
- python - Python 脚本:从地图查询中检索多个结果
- c - 在 C 中的函数调用中使用结构参数
- javascript - 获取 URL 参数 elemId 并将其添加到按钮 onClick 属性中
- javascript - Javascript FileReader 在第一次尝试时工作,但不再工作(没有更改任何代码行)