首页 > 解决方案 > AWS Kinesis Firehose 到 Lambda,Lambda 到 S3 使用 java

问题描述

对于我的 expolartion,创建了 AWS firehose 流并配置了 Lambda 函数并将数据移动到 S3。Firehose 到 S3 它运行良好,没有任何问题。如果我启用 lamda 功能,在 S3 失败的存储桶中会出现以下错误。

{"attemptsMade":4,"arrivalTimestamp":1570727830210,"errorCode":"Lambda.FunctionError","errorMessage":"The Lambda function was successfully invoked but it returned an error result."

lambda java代码:

public class LambdaFunctionHandler implements RequestHandler<KinesisEvent, String> {

    @Override
    public String handleRequest(KinesisEvent event, Context context) {
        context.getLogger().log("Input: " + event);
        StringBuffer sb = new StringBuffer();
        for (KinesisEventRecord record : event.getRecords()) {
            String payload = new String(record.getKinesis().getData().array());
            if (payload.toLowerCase().contains("scala"))
                sb.append(payload);
            sb.append("\n");
        }

        return sb.toString();
    }
}

基本上,过滤传入的流数据并推送到 S3。我也有问题。1. 我正在将打孔 JSON 数据传递给 firehose。“record.getKinesis().getData()” 方法将逐行读取记录并捆绑到整个 json 字符串。2.写日志语句。在哪里查看我的日志。我该如何处理这种情况?请指教

标签: amazon-s3aws-lambdaamazon-kinesis-firehose

解决方案


AWS Lambda Java Events 2.x库支持KinesisFirehoseEvent。1.x 库没有这个类。

您的代码将类似于:

public class LambdaFunctionHandler implements RequestHandler<KinesisFirehoseEvent, String> {

    @Override
    public String handleRequest(KinesisFirehoseEvent event, Context context) {
    }
}

在 Lambda 测试环境中,事件将如下所示:

{
  "invocationId": "invocationIdExample",
  "deliveryStreamArn": "arn:aws:kinesis:EXAMPLE",
  "region": "us-west-2",
  "records": [
    {
      "recordId": "49546986683135544286507457936321625675700192471156785154",
      "approximateArrivalTimestamp": 1495072949453,
      "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0IDEyMy4="
    }
  ]
}

推荐阅读