amazon-web-services - Kinesis Stream 未获取日志
问题描述
我在 Kinesis 数据流中接收 cloudtrail 日志。我正在调用此处描述的流处理 lambda 函数。然后将返回到流的最终结果存储到 S3 存储桶中。截至目前,处理失败,在 S3 存储桶中创建了以下错误文件:
{"attemptsMade":4,"arrivalTimestamp":1619677225356,"errorCode":"Lambda.FunctionError","errorMessage":"Check your function and make sure the output is in required format. In addition to that, make sure the processed records contain valid result status of Dropped, Ok, or ProcessingFailed","attemptEndingTimestamp":1619677302684,
在此处添加 Python lambda 函数以供参考:
import base64
import gzip
import json
import logging
# Setup logging configuration
logging.basicConfig()
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
def unpack_kinesis_stream_records(event):
# decode and decompress each base64 encoded data element
return [gzip.decompress(base64.b64decode(k["data"])).decode('utf-8') for k in event["records"]]
def decode_raw_cloud_trail_events(cloudTrailEventDataList):
#Convert Raw Event Data List
eventList = [json.loads(e) for e in cloudTrailEventDataList]
#Filter out-non DATA_MESSAGES
filteredEvents = [e for e in eventList if e["messageType"] == 'DATA_MESSAGE']
#Covert each indidual log Event Message
events = []
for f in filteredEvents:
for e in f["logEvents"]:
events.append(json.loads(e["message"]))
logger.info("{0} Event Logs Decoded".format(len(events)))
return events
def handle_request(event, context):
#Log Raw Kinesis Stream Records
#logger.debug(json.dumps(event, indent=4))
# Unpack Kinesis Stream Records
kinesisData = unpack_kinesis_stream_records(event)
#[logger.debug(k) for k in kinesisData]
# Decode and filter events
events = decode_raw_cloud_trail_events(kinesisData)
####### INTEGRATION CODE GOES HERE #########
return f"Successfully processed {len(events)} records."
def lambda_handler(event, context):
return handle_request(event, context)
谁能帮我理解这里的问题。
解决方案
我相信您使用的是“kinesis firehose”服务,而不是“kinesis data stream”。您正在使用的代码用于直接从 kinesis 数据流中读取并处理 cloudtrail 事件。
kinesis firehose 数据转换 lambda 函数是不同的。Firehose 将收到的 cloudtrail 事件发送到 lambda 函数。Lambda 处理/转换事件并将这些事件发送回 firehose,以便 firehose 可以将它们传送到目标 S3 存储桶。
您的 lambda 函数应该以与 firehose 期望的格式完全相同的格式返回记录,并且每条记录的状态都应该是 [Dropped、Ok 或 ProcessingFailed]。您可以在 aws文档中阅读更多内容
推荐阅读
- java - Java中的打印到文件
- php - SQL 错误代码错误:42S22。为什么我在页面错误代码 500 上收到此错误。 Laravel FWC
- amazon-web-services - 通过 OAuth 在 AWS 中进行第三方管理员访问
- react-native - 我可以在本机反应中找出给定纬度,对数坐标的商店名称吗
- angular - 如何在测试时修复ngrx store throwing错误?
- safari - Hangouts 如何在 Safari 上获取桌面捕获?
- linux - XV6 os - Docker ubuntu 映像错误:“无可启动设备”/qemu-nox 命令
- kotlin - Kotlin 从通用实现中获取运行时类
- c++ - 解析字符串中的双精度和单词
- python - 使用 Django 发送电子邮件报告