首页 > 解决方案 > 通过流将 CSV 文件转换为 lambda 中的多个小 JSON 文件

问题描述

我正在尝试编写一个 lambda 函数,该函数可以将一个巨大的 csv 文件流转换为多个小的 json 文件(比如 2000 行的 json 文件),从一个 s3 存储桶到一个 s3 存储桶。我虽然有一些限制,比如在 256 MB 的有限 RAM 内存中运行。

我可以通过将文件作为文件而不是像下面的流来做同样的事情。

但是由于内存限制,我需要在流中处理这个。有没有办法使用流做同样的事情?

// transformationClass.js

const csv = require('csvtojson');

const extension = '.json';
class S3CsvToJson {
    static async perform(input, output, headers) {
        let jsonArray = null;
        const s3Object = await s3.getObject(); // getting the s3 object
        const csvString = s3Object.Body.toString('utf8');
        await csv({
            noheader: false,
        })
            .fromString(csvString)
            .then((csvRow) => {
                jsonArray = csvRow;
            });
        const fileNames = await S3CsvToJson.writeToFile(jsonArray, output);
        return { files: fileNames };
    }

    static async writeToFile(jsonArray, output) {
        const minNumber = 0;
        const maxNumber = 1999;
        const fileNames = [];
        let outFile;
        if (jsonArray && Array.isArray(jsonArray)) {
            let fileIterator = 1;
            while (jsonArray.length) {
                outFile = `${output.key}-${fileIterator}${extension}`;
                await // s3.putObject(). writing to s3
              .putObject(
                    outFile,
                    output.bucketName,
                    JSON.stringify(jsonArray.splice(minNumber, maxNumber)),
                );
                console.log('rows left :', jsonArray.length);
                fileNames.push(outFile);
                fileIterator += 1;
            }
        }
        return fileNames;
    }
}

module.exports = S3CsvToJson;

这是处理函数

// handler.js
module.exports.perform = async (event, context, callback) => {

    context.callbackWaitsForEmptyEventLoop = false;
    await s3CsvToJson.perform(event.input, event.output, event.headerMapping)
        .then((result) => callback(null, result));
    console.log('leaving - ', Date.now());
};

提前致谢!!

标签: node.jsamazon-s3aws-lambdanode-streamscsvtojson

解决方案


在经历了很多事情之后,我终于找到了一种方法来完成它。

我要做的是,将整个过程包装成一个承诺并返回它。我从 s3 创建了一个读取流,将其转发到解析器,然后转发到写入流。我希望在这里分享它,以便对其他人有用。也开放任何更好的优化解决方案。

// transformationClass.js

const csv = require('fast-csv');
const { Transform, pipeline } = require('stream');

const extension = '.json';
class S3CsvToJson {
  static async perform(input, output, headers) {
    console.log(input, output, headers);
    const threshold = 2000;
    try {
      const promiseTransformData = () => new Promise((resolve, reject) => {
        try {
          let jsonOutput = [];
          let fileCounter = 0;
          const fileNames = [];
          const writableStream = new Transform({
            objectMode: true,
            autoDestroy: true,
            async transform(data, _, next) {
              if (jsonOutput.length === threshold) {
                fileCounter += 1;
                const fileUpload = new Promise((resolveWriter) => {
                  s3
                    .putObject(
                      `${output.key}-${fileCounter}${extension}`,
                      output.bucketName,
                      JSON.stringify(jsonOutput),
                    )
                    .then(() => resolveWriter());
                });
                await fileUpload;
                fileNames.push(`${output.key}-${fileCounter}${extension}`);
                jsonOutput = [];
              }
              jsonOutput.push(data);
              next();
            },
          });
          const readFileStream = s3.getReadStream(input.key, input.bucketName);
          pipeline(
            readFileStream,
            csv.parse({ headers: true }),
            writableStream,
            (error) => {
              // if (err) throw new Error('Pipeline error');
              if (error) {
                console.error(`Error occurred in pipeline - ${error}`);
                resolve({ errorMessage: error.message });
              }
            },
          );
          writableStream.on('finish', async () => {
            if (jsonOutput.length) {
              fileCounter += 1;
              const fileUpload = new Promise((resolveWriter) => {
                s3
                  .putObject(
                    `${output.key}-${fileCounter}${extension}`,
                    output.bucketName,
                    JSON.stringify(jsonOutput),
                  )
                  .then(() => resolveWriter());
              });
              await fileUpload;
              fileNames.push(`${output.key}-${fileCounter}${extension}`);
              jsonOutput = [];
            }
            console.log({ status: 'Success', files: fileNames });
            resolve({ status: 'Success', files: fileNames });
          });
        } catch (error) {
          console.error(`Error occurred while transformation - ${error}`);
          resolve({ errorMessage: error ? error.message : null });
        }
      });
      return await promiseTransformData();
    } catch (error) {
      return error.message || error;
    }
  }
}

module.exports = S3CsvToJson;

对于处理程序,我像这样调用 S3CsvToJson

// handler.js
module.exports.perform = async (event, context, callback) => {
    context.callbackWaitsForEmptyEventLoop = false;
    await s3CsvToJson.perform(event.input, event.output, event.headerMapping)
        .then((result) => callback(null, result))
        .catch((error) => callback(error));
};

希望对您有所帮助。谢谢!


推荐阅读