首页 > 解决方案 > 用 Javascript 从 Parquet 编写 CSV 的更好方法

问题描述

我正在使用 javascript 从 Parquet 转换为 CSV。

下面的示例有效,但我将从 Parquet 读取的值数组存储在内存中的记录中。

Parquet 库使用AsyncIterator而 CSV 库使用 Node Stream API

我想知道如何实现更优雅的解决方案,利用流并减少内存占用。TIA

- Parquet:https ://github.com/ironSource/parquetjs CSV:https ://csv.js.org/

import pts from 'parquets'
let { ParquetSchema, ParquetWriter, ParquetReader } = pts

import * as fs from 'fs'
import stringify from 'csv-stringify'

// declare a schema for the `PI` table
let schema = new ParquetSchema({
    Source: { type: 'UTF8' },
    TagID: { type: 'UTF8' },
    Timestamp: { type: 'TIMESTAMP_MILLIS' },
    Value: { type: 'DOUBLE' },
});

const WriterParquet = async () => {
    
    // create new ParquetWriter that writes to 'pi.parquet`
    let writer = await ParquetWriter.openFile(schema, 'pi.parquet')

    // append a few rows to the file
    await writer.appendRow({Source: 'PI/NO-SVG-PISRV01', TagID: 'OGP8TI198Z.PV', Timestamp: new Date(), Value: 410 })
    await writer.appendRow({Source: 'PI/NO-SVG-PISRV01', TagID: 'OGP8TI198Z.PV', Timestamp: new Date(), Value: 420 }) 
    await writer.close()

}

const WriterCSV = async () => {
    
    // create new ParquetReader that reads from 'pi.parquet`
    let reader = await ParquetReader.openFile('pi.parquet')

    // create a new cursor
    let cursor = reader.getCursor()

    // read all records from the file and print them
    let records = []
    let record = null;
    while (record = await cursor.next()) {
        console.log(record)
        records.push(record)
    }

    await reader.close()

    // write to CSV
    stringify(records, {
        header: true
    }, function (err, output) {
        console.log(output)
        fs.writeFile('./pi.csv', output, () => {});
    })
    
}

const Main = async () => {
    
    console.log('writing parquet...')
    await WriterParquet()

    console.log('reading parquet and writing csv...')
    await WriterCSV()

}

Main()

标签: javascriptcsvparquet

解决方案


我没有使用光标,而是使用Readable.from(reader)创建了一个ReadableStream,在此之后,很容易通过管道输入 csv-stringify:

const WriterCSV = async () => {

    // create new ParquetReader that reads from 'pi.parquet`
    let reader = await ParquetReader.openFile('pi.parquet')

    // read all records from the file and print them
    const readStream = Readable.from(reader)

    readStream.pipe(
        stringify({
            header: true,
            columns: {
                Source: 'Source',
                TagID: 'TagID',
                Timestamp: 'Timestamp',
                Value: 'Value'
            }
        }, function (error, output) {                
            fs.writeFile('./pi.csv', output, () => {});
        }))

    readStream.on('end', async function () {
        await reader.close();
    });

}

推荐阅读