首页 > 解决方案 > 如何仅在将特定文件(密钥)写入 S3 存储桶时触发 AWS 事件规则

问题描述

我正在尝试制作一个 AWS 事件(在 CloudWatch 或 EventBridge 中),当特定文件放入 S3 存储桶时触发 AWS Step Function 的运行。我的规则事件模式如下所示:

{
  "source": [
    "aws.s3"
  ],
  "detail-type": [
    "AWS API Call via CloudTrail"
  ],
  "detail": {
    "eventSource": [
      "s3.amazonaws.com"
    ],
    "eventName": [
      "PutObject"
    ],
    "requestParameters": {
      "bucketName": [
        "bucketname"
      ],
      "key": [
        "date={{TODAYS DATE}}/_SUCCESS"
      ]
    }
  }
}

理想情况下,我希望key元素指向TODAYS DATE代表当前日期的路径,并且_SUCCCESS是一个空文件,一旦成功完成,我的工作就会打印到目录中(例如,如果今天是 2019 年 10 月 31 日,则完整的存储桶路径检查将是bucketname/date=20191031/_SUCCESS)。最终目标是让事件规则触发一个 Step Function,该函数控制许多其他日常作业,这些作业只有在将_SUCCESS文件输出到存储桶的第一个作业成功完成后才能运行。

最好我希望_SUCCESS使用当天的当前日期对文件进行密钥检查。但是,如果没有处理日期的好方法,如果有一种方法可以在将新目录放入存储桶时触发规则一次(例如,在date=XXXXXX创建目录时触发),我也应该能够使某些事情起作用。每次将任何新文件放入存储桶时,我都无法激活触发器,因为初始作业将在date=XXXXXX目录中创建许多输出文件,这些文件用作以下作业的输入。

能够通过 AWS CloudFormation 创建此规则也将非常有帮助,因此,如果 CloudFormation 有任何方法来处理这些问题,那就太好了。

提前感谢您的任何帮助,非常感谢。

标签: amazon-web-servicesamazon-s3amazon-cloudformationamazon-cloudwatchaws-step-functions

解决方案


我不确定我是否理解您在这里想要实现的目标,但您为什么不将 lambda 函数订阅到存储文件的存储桶(订阅它放置事件),做任何您想要以编程方式执行的检查在该 lambda 函数内部,如果满足所有条件,只需从 lambda 函数中调用提到的 step 函数。

如果不满足任何条件,则根本不启动 step 功能。

以下是如何将 lambda 函数订阅到 S3 put 事件(通过 Web 控制台)。

  1. 去 S3
  2. 选择你的桶
  3. Properties tab
  4. 选择Events
  5. 检查PUT事件
  6. Send to,挑选Lambda Function
  7. 选择现有的 lambda 函数(您需要创建该 lambda 函数)

如何从 lambda 函数中访问事件的存储桶名称、对象键和时间戳等属性。(使用 Python)

def handler_name(event, context): 
    // get bucket name
    print(event['Records'][0]['s3']['bucket']['name'])

    // get object key
    print(event['Records'][0]['s3']['object']['key'])

    // get event timestamp
    print(event['Records'][0]['eventTime'])

    return 0

这是完整的event对象(即S3事件对象)供参考。

{
  "Records": [
    {
      "eventVersion": "2.1",
      "eventSource": "aws:s3",
      "awsRegion": "us-east-2",
      "eventTime": "2019-09-03T19:37:27.192Z",
      "eventName": "ObjectCreated:Put",
      "userIdentity": {
        "principalId": "AWS:AIDAINPONIXQXHT3IKHL2"
      },
      "requestParameters": {
        "sourceIPAddress": "205.255.255.255"
      },
      "responseElements": {
        "x-amz-request-id": "D82B88E5F771F645",
        "x-amz-id-2": "vlR7PnpV2Ce81l0PRw6jlUpck7Jo5ZsQjryTjKlc5aLWGVHPZLj5NeC6qMa0emYBDXOo6QBU0Wo="
      },
      "s3": {
        "s3SchemaVersion": "1.0",
        "configurationId": "828aa6fc-f7b5-4305-8584-487c791949c1",
        "bucket": {
          "name": "lambda-artifacts-deafc19498e3f2df",
          "ownerIdentity": {
            "principalId": "A3I5XTEXAMAI3E"
          },
          "arn": "arn:aws:s3:::lambda-artifacts-deafc19498e3f2df"
        },
        "object": {
          "key": "b21b84d653bb07b05b1e6b33684dc11b",
          "size": 1305107,
          "eTag": "b21b84d653bb07b05b1e6b33684dc11b",
          "sequencer": "0C0F6F405D6ED209E1"
        }
      }
    }

  ]
}

如何从 Lambda 函数中执行 Step Function(使用 Python + Boto3)

import boto3

sfn_client = boto3.client('stepfunctions')

def handler_name(event, context): 

    response = sfn_client.start_execution(
        stateMachineArn='string',
        name='string',
        input='string'
    )

    return 0

其中stateMachineArn是要执行的状态机的 Amazon 资源名称 (ARN),name(可选)是执行的名称,并且input是包含执行的 JSON 输入数据的字符串。


推荐阅读