amazon-web-services - AWS Firehose - 要在 lambda 中处理的事件格式的定义在哪里?
问题描述
Amazon Kinesis Data Firehose 数据转换不提供有关来自 Firehose 的 lambda 函数的事件数据格式的任何信息。
如果没有这些信息,我们如何编写 lambda 函数来进行转换?
解决方案
花了很多时间后:
- 无服务器应用程序示例/python/kinesis-firehose-process-record-python
- 在 AWS 上构建现代应用程序 - 模块 5
- Firehose API 参考 - PutRecord
- Firehose API 参考 - 记录
让事件从 Firehose 传到 Lambda。
$ sam local generate-event kinesis kinesis-firehose
{
"invocationId": "invocationIdExample",
"deliveryStreamArn": "arn:aws:kinesis:EXAMPLE",
"region": "us-east-1",
"records": [
{
"recordId": "49546986683135544286507457936321625675700192471156785154",
"approximateArrivalTimestamp": 1495072949453,
"data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0IDEyMy4="
}
]
}
测试 Firehose/lambda
测试在 AWS 上构建现代应用程序 - 模块 5 firehose 和 lambda 与 CLI。
测试 lambda
aws lambda invoke --function-name ${FUNCTION_NAME} \
--qualifier ${FUNCTION_ALIAS} \
--payload file://./event.json \
response.json
- 事件.json
{
"records": [
{
"recordId": "1",
"data": "'eyJ1c2VySWQiOiAiY3VycmVudFVzZXJJZCIsICJteXNmaXRJZCI6ICI0ZTUzOTIwYy01MDVhLTRhOTAtYTY5NC1iOTMwMDc5MWYwYWUifQ=='"
}
]
}
结果 Lambda 日志。
START RequestId: e15a50f9-20a5-48ce-9942-9681291910fe Version: 13
{'records': [{'recordId': '1', 'data': "'eyJ1c2VySWQiOiAiY3VycmVudFVzZXJJZCIsICJteXNmaXRJZCI6ICI0ZTUzOTIwYy01MDVhLTRhOTAtYTY5NC1iOTMwMDc5MWYwYWUifQ=='"}]}
Processing record: 1
{
"userId": "currentUserId",
"mysfitId": "4e53920c-505a-4a90-a694-b9300791f0ae",
"goodevil": "Evil",
"lawchaos": "Lawful",
"species": "Chimera"
}
Successfully processed 1 records.
测试 firehose+lambda
echo "Testing Firehose put-record using --record file://./data.json"
aws firehose put-record --delivery-stream-name ${DELIVERY_STREAM_NAME} \
--record file://./data.json
echo "Testing put-record using --record='{"Data": "{\"userId\": \"2\",\"mysfitId\": \"2b473002-36f8-4b87-954e-9a377e0ccbec\"}"}'"
# aws firehose put-record --delivery-stream-name mystream --record="{\"Data\":\"1\"}"
aws firehose put-record --delivery-stream-name "${DELIVERY_STREAM_NAME}" \
--record='{"Data": "{\"userId\": \"2\",\"mysfitId\": \"2b473002-36f8-4b87-954e-9a377e0ccbec\"}"}'
echo "Testing Firehose put-record using --cli-input-json"
aws firehose put-record \
--cli-input-json '
{
"DeliveryStreamName": '\"${DELIVERY_STREAM_NAME}\"',
"Record": {
"Data": "{\"userId\": \"2\",\"mysfitId\": \"2b473002-36f8-4b87-954e-9a377e0ccbec\"}"
}
}'
数据.json
{
"Data":"{\"userId\": \"2\",\"mysfitId\": \"2b473002-36f8-4b87-954e-9a377e0ccbec\"}"
}
结果
START RequestId: 94007e93-31d8-4da5-8231-c7cafa0d363a Version: 13
{'invocationId': '6bd3e736-2ad8-41d4-9485-a0aad1806990', 'deliveryStreamArn': 'arn:aws:firehose:us-east-2:200506027189:deliverystream/masa-ecs_monolith-firehose-extended-s3-firehose-click-stream', 'region': 'us-east-2', 'records': [{'recordId': '49605256299907973028537486643826326105740520545077690370000000', 'approximateArrivalTimestamp': 1584590301809, 'data': 'eyJ1c2VySWQiOiAiMiIsIm15c2ZpdElkIjogIjJiNDczMDAyLTM2ZjgtNGI4Ny05NTRlLTlhMzc3ZTBjY2JlYyJ9'}, {'recordId': '49605256299907973028537486643827535031560135311691350018000000', 'approximateArrivalTimestamp': 1584590303745, 'data': 'eyJ1c2VySWQiOiAiMiIsIm15c2ZpdElkIjogIjJiNDczMDAyLTM2ZjgtNGI4Ny05NTRlLTlhMzc3ZTBjY2JlYyJ9'}, {'recordId': '49605256299907973028537486643828743957379750009585532930000000', 'approximateArrivalTimestamp': 1584590305222, 'data': 'eyJ1c2VySWQiOiAiMiIsIm15c2ZpdElkIjogIjJiNDczMDAyLTM2ZjgtNGI4Ny05NTRlLTlhMzc3ZTBjY2JlYyJ9'}]}
Processing record: 49605256299907973028537486643826326105740520545077690370000000
{
"userId": "2",
"mysfitId": "2b473002-36f8-4b87-954e-9a377e0ccbec",
"goodevil": "Neutral",
"lawchaos": "Lawful",
"species": "Cyclops"
}
Processing record: 49605256299907973028537486643827535031560135311691350018000000
{
"userId": "2",
"mysfitId": "2b473002-36f8-4b87-954e-9a377e0ccbec",
"goodevil": "Neutral",
"lawchaos": "Lawful",
"species": "Cyclops"
}
Processing record: 49605256299907973028537486643828743957379750009585532930000000
{
"userId": "2",
"mysfitId": "2b473002-36f8-4b87-954e-9a377e0ccbec",
"goodevil": "Neutral",
"lawchaos": "Lawful",
"species": "Cyclops"
}
Successfully processed 3 records.
参考
恐怕 AWS Firehose 文档写得太差了,不能作为技术文档。
不要花时间在静脉上,个人会浏览博客和 github 存储库,而不是 AWS Firehose 文档。
- 使用 AWS Lambda 进行 Amazon Kinesis Firehose 数据转换
- serverless-app-examples/python或AWS Serverless Application Repository
- CLI 将数据放入 AWS Firehose
我确实希望 AWS 能够认真改进文档,这样我们就不必在 github、博客、实验中进行大量搜索。
推荐阅读
- java - 升级到 Mockito 2.21 时使用 any() 的验证错误
- pyarrow - 模块“pyarrow”没有属性“十进制”
- mysql - MYSQL 存储过程迭代 Json 数组数据
- python-3.x - 使用 sum 函数在 caretsian 平面上添加点
- python - 如何从pyqt4中的组合框向特定下拉列表添加背景颜色
- spring - 表上的@Query 更新命令抛出空指针异常
- oauth-2.0 - 使用 Bot Framework 进行图形身份验证的最大超时限制是多少?如何设置?
- python - E128 延续线缩进不足,用于视觉缩进和 E122 延续线缺少缩进或在 flake8 中缩进
- spring-boot - junit 测试中的自定义属性,例如 application.setDefaultProperties
- html - 独特按钮形状上的伪元素和背景的透明度问题