node.js - 如何有效地将 Postgres 数据从 Query 传输到 S3
问题描述
我的 node.js 应用服务当前使用提供的选择查询访问 postgres,将其转换为 csv,然后将该 CSV 文件上传到 S3。
我想知道是否有更好的方法可以更有效地处理更大的数据拉取?
解决方案
这应该会让你达到 90% 左右。我还没有测试过这个确切的实现,可能有一两个错字,但我现在有类似的代码在生产中运行。
const { Transform } = require('json2csv');
const { Client, Query } = require('pg')
const { S3 } = require('aws-sdk');
const { Passthrough } = require('stream')
const client = new Client()
const s3 = new S3({ region: 'us-east-1' });
const opts = { fields: ['field1', 'field2', 'field3'] };
const transformOpts = { highWaterMark: 8192, encoding: 'utf-8', objectMode: true };
const transform = new Transform(opts, transformOpts);
const passthrough = new Passthrough();
transform.pipe(passthrough)
client.connect()
const query = new Query('SELECT field1, field2, field3 FROM table')
client.query(query)
query.on('row', row => {
transform.push(row);
console.log('row!', row) // { field1: 1, field2: 2, field3: 3 }
})
query.on('end', () => {
transform.push(null)
console.log('query done')
})
query.on('error', err => {
transform.end();
console.error(err.stack)
})
s3.upload({ Body: passthrough, Key: 'somefile.csv', Bucket: 'some_bucket' })
.send((err, data) => {
if (err) {
console.error({ err });
passthrough.destroy(err);
} else {
console.log(`File uploaded and available at ${data.Location}`);
passthrough.destroy();
}
});
推荐阅读
- stored-procedures - ADF 复制数据活动 - 从动态表达式中为 Sink 存储过程参数值引用 Source 列
- python - 检测类属性值更改,然后更改另一个类属性
- python - 将 wav 数据的左通道读入 numpy 数组
- vb.net - 当我在我的线程中添加一个列表框项目时,我需要发出哔哔声。它在一个线程中,线程子程序不会让我做任何事情,而是添加项目
- lua - 即使值不正确,Roblox Lua if 语句仍在执行
- json - PowerShell:无论深度如何,如何扫描每个 PSObject 属性名称并替换句点字符?
- python - 没有双for循环的两个python矩阵之间的欧几里得距离?
- javascript - 使用 Google Apps 脚本动态填充电子表格单元格
- java - 内部微服务通信是否应该通过zuul网关?
- angular - Angular 9 On Page 加载读取自定义响应标头