javascript - 用 Javascript 从 Parquet 编写 CSV 的更好方法
问题描述
我正在使用 javascript 从 Parquet 转换为 CSV。
下面的示例有效,但我将从 Parquet 读取的值数组存储在内存中的记录中。
Parquet 库使用AsyncIterator而 CSV 库使用 Node Stream API。
我想知道如何实现更优雅的解决方案,利用流并减少内存占用。TIA
库- Parquet:https ://github.com/ironSource/parquetjs CSV:https ://csv.js.org/
import pts from 'parquets'
let { ParquetSchema, ParquetWriter, ParquetReader } = pts
import * as fs from 'fs'
import stringify from 'csv-stringify'
// declare a schema for the `PI` table
let schema = new ParquetSchema({
Source: { type: 'UTF8' },
TagID: { type: 'UTF8' },
Timestamp: { type: 'TIMESTAMP_MILLIS' },
Value: { type: 'DOUBLE' },
});
const WriterParquet = async () => {
// create new ParquetWriter that writes to 'pi.parquet`
let writer = await ParquetWriter.openFile(schema, 'pi.parquet')
// append a few rows to the file
await writer.appendRow({Source: 'PI/NO-SVG-PISRV01', TagID: 'OGP8TI198Z.PV', Timestamp: new Date(), Value: 410 })
await writer.appendRow({Source: 'PI/NO-SVG-PISRV01', TagID: 'OGP8TI198Z.PV', Timestamp: new Date(), Value: 420 })
await writer.close()
}
const WriterCSV = async () => {
// create new ParquetReader that reads from 'pi.parquet`
let reader = await ParquetReader.openFile('pi.parquet')
// create a new cursor
let cursor = reader.getCursor()
// read all records from the file and print them
let records = []
let record = null;
while (record = await cursor.next()) {
console.log(record)
records.push(record)
}
await reader.close()
// write to CSV
stringify(records, {
header: true
}, function (err, output) {
console.log(output)
fs.writeFile('./pi.csv', output, () => {});
})
}
const Main = async () => {
console.log('writing parquet...')
await WriterParquet()
console.log('reading parquet and writing csv...')
await WriterCSV()
}
Main()
解决方案
我没有使用光标,而是使用Readable.from(reader)创建了一个ReadableStream,在此之后,很容易通过管道输入 csv-stringify:
const WriterCSV = async () => {
// create new ParquetReader that reads from 'pi.parquet`
let reader = await ParquetReader.openFile('pi.parquet')
// read all records from the file and print them
const readStream = Readable.from(reader)
readStream.pipe(
stringify({
header: true,
columns: {
Source: 'Source',
TagID: 'TagID',
Timestamp: 'Timestamp',
Value: 'Value'
}
}, function (error, output) {
fs.writeFile('./pi.csv', output, () => {});
}))
readStream.on('end', async function () {
await reader.close();
});
}
推荐阅读
- node.js - ejs在脚本标签内显示错误
- tensorflow - 使用 AWS 在虚拟机上部署我自己的 tensorflow 模型
- api - 保护公共 API 免受来自网站的 AJAX 调用
- angular - JSON 数组到 MyObject []
- php - 使用codeigniter错误上传图片
- node.js - nodejs systray:SysTray 不是构造函数
- hyperledger-fabric - 培养订购者
- json - JSON 嵌套和 React JS
- javascript - Firebase 身份验证错误 auth/network-requested-failed
- python-3.x - 将数字附加到字符串但不是 ascii 值