首页 > 解决方案 > AWS 使用 DynamoDB Lambda 配置 Kinesis Stream

问题描述

从这个问题开始,AWS DynamoDB Stream into Redshift

DynamoDB --> DynamoDBStreams --> Lambda 函数 --> Kinesis Firehose --> Redshift。

如何配置我的 Kinesis 函数以获取 Lambda 函数源?

我创建了一个 DynamoDB 表(购买销售),并添加了 DynamoDB 流。然后我配置了 Lambda 函数来获取 DynamoDB 流。我的问题是如何配置 Kinesis 以获取 Lambda 函数 Source?我知道如何配置 Lambda 转换,但想选择作为源。不确定如何配置下面的 Direct Put Source。

谢谢,在此处输入图像描述

执行了这些步骤: 在此处输入图像描述

在此处输入图像描述

标签: amazon-web-servicesaws-lambdaamazon-redshiftamazon-kinesis-firehose

解决方案


在您的情况下,您会将 dynamodb 流式传输到 redshift

DynamoDB --> DynamoDBStreams --> Lambda Function --> Kinesis Firehose --> Redshift.

首先,您需要一个 lambda 函数来处理 DynamoDBStream。对于每个 DynamoDBStream 事件,使用 firehose PutRecordAPI 将数据发送到 firehose。从例子

var firehose = new AWS.Firehose();
firehose.putRecord({
  DeliveryStreamName: 'STRING_VALUE', /* required */
  Record: { /* required */
    Data: new Buffer('...') || 'STRING_VALUE' /* Strings will be Base-64 encoded on your behalf */ /* required */
  }
}, function(err, data) {
  if (err) console.log(err, err.stack); // an error occurred
  else     console.log(data);           // successful response
});

接下来,我们必须知道数据是如何插入到 RedShift 中的。从消防水带文件中,

对于向 Amazon Redshift 传输数据,Kinesis Firehose 首先以前面描述的格式将传入数据传输到您的 S3 存储桶。Kinesis Firehose 然后发出 Amazon Redshift COPY 命令,将数据从 S3 存储桶加载到 Amazon Redshift 集群。

因此,我们应该知道让COPY命令将数据映射到 RedShift 模式的数据格式。我们必须遵循redshift COPY 命令的数据格式要求

默认情况下,COPY 命令期望源数据是字符分隔的 UTF-8 文本。默认分隔符是竖线字符 ( | )。

因此,您可以对输入 dynamodb 流事件的 lambda 进行编程,将其转换为管道 (|) 分隔的行记录,然后将其写入 firehose。

var firehose = new AWS.Firehose();
firehose.putRecord({
  DeliveryStreamName: 'YOUR_FIREHOSE_NAME',
  Record: { /* required */
    Data: "RED_SHIFT_COLUMN_1_DATA|RED_SHIFT_COLUMN_2_DATA\n"
  }
}, function(err, data) {
  if (err) console.log(err, err.stack); // an error occurred
  else     console.log(data);           // successful response
});

记得添加\n,因为 firehose 不会为您添加新行。


推荐阅读