首页 > 解决方案 > 如何有效地将 Postgres 数据从 Query 传输到 S3

问题描述

我的 node.js 应用服务当前使用提供的选择查询访问 postgres,将其转换为 csv,然后将该 CSV 文件上传到 S3。

我想知道是否有更好的方法可以更有效地处理更大的数据拉取?

标签: node.jsdatabasepostgresqlamazon-s3etl

解决方案


这应该会让你达到 90% 左右。我还没有测试过这个确切的实现,可能有一两个错字,但我现在有类似的代码在生产中运行。

const { Transform } = require('json2csv');
const { Client, Query } = require('pg')
const { S3 } = require('aws-sdk');
const { Passthrough } = require('stream')

const client = new Client()
const s3 = new S3({ region: 'us-east-1' });


const opts = { fields: ['field1', 'field2', 'field3'] };
const transformOpts = { highWaterMark: 8192, encoding: 'utf-8', objectMode: true };

const transform = new Transform(opts, transformOpts);
const passthrough = new Passthrough();
transform.pipe(passthrough)


client.connect()

const query = new Query('SELECT field1, field2, field3 FROM table')
client.query(query)

query.on('row', row => {
  transform.push(row);
  console.log('row!', row) // { field1: 1, field2: 2, field3: 3 }
})
query.on('end', () => {
  transform.push(null)
  console.log('query done')
})
query.on('error', err => {
  transform.end();
  console.error(err.stack)
})

s3.upload({ Body: passthrough, Key: 'somefile.csv', Bucket: 'some_bucket' })
.send((err, data) => {
  if (err) {
    console.error({ err });
    passthrough.destroy(err);
  } else {
    console.log(`File uploaded and available at ${data.Location}`);
    passthrough.destroy();
  }
});

推荐阅读