node.js - Kinesis 到 S3 没有使用 nodejs lambda 的 firehose
问题描述
我们不能使用 firehose,因为 kinesis 流中的消息有一个“creationDateTime”字段。基于此,我们希望将数据转储到 S3 中。Firehose 会根据到达时间将消息转储到 S3 上。因此,我们有一个自定义 lambda,它将从 kinesis 流中读取 10,000 条记录并将其放入 S3。代码工作正常,但我们希望将消息写为 .gz 文件。
这是 lambda 代码
console.log('Loading function');
const AWS = require('aws-sdk');
const awsConfig = {
region: 'us-west-2',
apiVersion: '2012-08-10',
};
AWS.config.update(awsConfig);
const s3 = new AWS.S3();
const bucket = 'uis-prime-test';
// const uniqueId = Math.floor(Math.random() * 100000);
// initially create the map without any key
const map = {};
function addValueToList(key, value) {
// if the list is already created for the "key", then uses it
// else creates new list for the "key" to store multiple values in it.
map[key] = map[key] || [];
map[key].push(value);
}
function getS3Key(payload) {
const json = JSON.parse(payload);
const creationDateTime = new Date(json.executionContext.creationDateTime);
const year = creationDateTime.getUTCFullYear();
let month = creationDateTime.getUTCMonth() + 1;
const day = creationDateTime.getUTCDate();
const hour = creationDateTime.getUTCHours();
if (month < 10) { month = `0${month}`; }
return `${year}/${month}/${day}/${hour}/`;
}
exports.handler = function (event, context) {
try {
const uniqueId = context.awsRequestId;
event.Records.forEach((record) => {
const payload = Buffer.from(record.kinesis.data, 'base64').toString('ascii');
const key = getS3Key(payload) + uniqueId;
addValueToList(key, payload.toString());
});
Object.entries(map).forEach(([key, value]) => {
const params = { Bucket: bucket, Key: key, Body: value.join('\n') };
s3.putObject(params, (err, data) => {
if (err) {
throw err;
} else {
console.log('Successfully uploaded data');
}
});
});
} catch (err) {
console.log(err);
}
return `Successfully processed ${event.Records.length} records.`;
};
解决方案
如果要编写 .gz 文件,则必须自己 gzip。
幸运的是,在节点中做这件事相当简单:
const zlib = require('zlib'); // this is a node build-in so no packages required
// ...
Object.entries(map).forEach(([key, value]) => {
const Body = zlib.gzipSync(value.join('\n'));
const params = { Bucket: bucket, Key: key, Body };
s3.putObject(params, (err, data) => {
// ...
zlib
注意,如果您想并行执行此操作,则有异步版本的方法: https ://nodejs.org/docs/latest-v6.x/api/zlib.html#zlib_zlib
推荐阅读
- google-bigquery - 对分区表的 BigQuery 查询中的处理大小异常大
- flutter - 何时使用“flutter clean”命令?
- php - uniqid() 的危险是它创建相同的 ID 在同一微秒触发,还是后续的微时间会导致冲突?
- angular - 将布尔变量传递到路由器出口
- javascript - localhost:3000 de app.get 中的 JavaScript 问题
- ruby-on-rails - 未定义的局部变量或方法`cache_key_for_vendor_products'
- ruby-on-rails - 如何在 redmine 插件中调用 redirect_to
- javascript - 如何在 Angularfire Ionic 的单独页面中显示 Firebase 详细信息 [已回答]
- rust - 如何将计算 9 位位掩码中的个数的 C 函数转换为 Rust?
- python - 如何将具有多个分隔符的 df 列拆分为不同的列?