首页 > 解决方案 > 如何使用 AWS Lambda for Firehose 和 CDK 启用转换源记录

问题描述

我正在尝试使用 CDK 将资源转换(使用 Lambda)启用到 Kinesis Firehose。我已经知道如何使用控制台执行此操作,但我不知道如何使用 AWS CDK 来实现此操作。这是我到目前为止使用 Typescript 的代码

// KINESIS STREAM
const kinesisStream = new kinesis.CfnDeliveryStream(this, `${props.name}-Kinesis`, {
  deliveryStreamName: `${props.name}-Stream`,
  deliveryStreamType: 'DirectPut',
  s3DestinationConfiguration: {
    bucketArn: props.eventsBucketArn,
    bufferingHints: {
      intervalInSeconds: 300,
      sizeInMBs: 5,
    },
    compressionFormat: 'UNCOMPRESSED',
    prefix: 'year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/',
    errorOutputPrefix: 'Errors/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/!{firehose:error-output-type}',
    roleArn: kinesisRole.roleArn
  }
});

在此先感谢您的帮助!

标签: typescriptamazon-s3aws-lambdaaws-cdkamazon-kinesis-firehose

解决方案


检查ProcessingConfigurationProperty。Java代码:

List<Object> transformParams = new ArrayList<>();
transformParams.add(ProcessorParameterProperty.builder().
        parameterName("LambdaArn").
        parameterValue(transform.getFunctionArn()).
        build());
transformParams.add(ProcessorParameterProperty.builder().
        parameterName("RoleArn").
        parameterValue(transform.getRole().getRoleArn()).
        build());

extendedS3DestinationConfiguration(ExtendedS3DestinationConfigurationProperty.builder().
        cloudWatchLoggingOptions(CloudWatchLoggingOptionsProperty.builder().
                enabled(true).
                logGroupName(logGroup.getLogGroupName()).
                logStreamName(logStream.getLogStreamName()).
                build()).
        bucketArn(bucket.getBucketArn()).
        bufferingHints(BufferingHintsProperty.builder().
                intervalInSeconds(180).
                sizeInMBs(1).
                build()).
        compressionFormat("UNCOMPRESSED").
        roleArn(role.getRoleArn()).
        processingConfiguration(ProcessingConfigurationProperty.builder().
                enabled(Boolean.TRUE).
                processors(processors).
                build()).
        build()).

推荐阅读