首页 > 解决方案 > Kinesis Stream 未获取日志

问题描述

我在 Kinesis 数据流中接收 cloudtrail 日志。我正在调用此处描述的流处理 lambda 函数。然后将返回到流的最终结果存储到 S3 存储桶中。截至目前,处理失败,在 S3 存储桶中创建了以下错误文件:

{"attemptsMade":4,"arrivalTimestamp":1619677225356,"errorCode":"Lambda.FunctionError","errorMessage":"Check your function and make sure the output is in required format. In addition to that, make sure the processed records contain valid result status of Dropped, Ok, or ProcessingFailed","attemptEndingTimestamp":1619677302684,

在此处添加 Python lambda 函数以供参考:

import base64
import gzip
import json
import logging

# Setup logging configuration
logging.basicConfig()
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)

def unpack_kinesis_stream_records(event):

    # decode and decompress each base64 encoded data element
    return [gzip.decompress(base64.b64decode(k["data"])).decode('utf-8') for k in event["records"]]

def decode_raw_cloud_trail_events(cloudTrailEventDataList):

    #Convert Raw Event Data List
    eventList =  [json.loads(e) for e in cloudTrailEventDataList]

    #Filter out-non DATA_MESSAGES
    filteredEvents = [e for e in eventList if e["messageType"] == 'DATA_MESSAGE']

    #Covert each indidual log Event Message
    events = []
    for f in filteredEvents:
        for e in f["logEvents"]:
            events.append(json.loads(e["message"]))

    logger.info("{0} Event Logs Decoded".format(len(events)))
    return events

def handle_request(event, context):

    #Log Raw Kinesis Stream Records
    #logger.debug(json.dumps(event, indent=4))

    # Unpack Kinesis Stream Records
    kinesisData = unpack_kinesis_stream_records(event)
    #[logger.debug(k) for k in kinesisData]

    # Decode and filter events
    events = decode_raw_cloud_trail_events(kinesisData)

    ####### INTEGRATION CODE GOES HERE #########

    return f"Successfully processed {len(events)} records."

def lambda_handler(event, context):
    return handle_request(event, context)

谁能帮我理解这里的问题。

标签: amazon-web-servicesamazon-athenaamazon-kinesis

解决方案


我相信您使用的是“kinesis firehose”服务,而不是“kinesis data stream”。您正在使用的代码用于直接从 kinesis 数据流中读取并处理 cloudtrail 事件。

kinesis firehose 数据转换 lambda 函数是不同的。Firehose 将收到的 cloudtrail 事件发送到 lambda 函数。Lambda 处理/转换事件并将这些事件发送回 firehose,以便 firehose 可以将它们传送到目标 S3 存储桶。

您的 lambda 函数应该以与 firehose 期望的格式完全相同的格式返回记录,并且每条记录的状态都应该是 [Dropped、Ok 或 ProcessingFailed]。您可以在 aws文档中阅读更多内容


推荐阅读