json - 如何从 Kinesis Firehose 向 Splunk 发送非 cloudwatch JSON 事件?
问题描述
我正在尝试从 Kinesis Firehose 向 Splunk 发送非 cloudwatch 事件。我正在使用 Lambda 处理事件,并以下列格式将其反馈回 Firehose(Firehose 需要):
{
"records": [
{
"recordId": "2345678",
"result": "Ok",
"data": [base64-encoded custom JSON]
}
]
}
但是,一旦到达 Splunk,它就会抛出一个模糊的解析错误,并且帮助链接无处可去:
"errorCode":"Splunk.InvalidDataFormat","errorMessage":"The data is not formatted correctly. To see how to properly format data for Raw or Event HEC endpoints, see Splunk Event Data (http://dev.splunk.com/view/event-collector/SP-CAAAE6P#data)"
我在这里想念什么?HEC 端点无法以标准格式解析来自 Firehose 的消息,这似乎很奇怪。
我正在使用aws_kinesis_firehose_delivery_stream Terraform 模块中的 splunk_configuration 块将消息发送到 HEC 事件端点。
解决方案
弄清楚了!对于后代,因为这没有很好的记录:
Kinesis Firehosedata
有效负载中的字段必须是遵循Splunk 事件收集器规范的 base64 编码对象。
只要Firehose和 Splunk 都可以读取 Lambda 返回的有效负载,它就不会引发错误。
这是 Kinesis Firehose 转换器 Lambda(node12 运行时)的代码:
/*
* Transformer for sending Kinesis Firehose events to Splunk
*
* Properly formats incoming messages for Splunk ingestion
* Returned object gets fed back into Kinesis Firehose and sent to Splunk
*/
'use strict';
console.log('Loading function');
exports.handler = (event, context, callback) => {
let success = 0; // Number of valid entries found
let failure = 0; // Number of invalid entries found
let dropped = 0; // Number of dropped entries
/* Process the list of records and transform them to adhere to Splunk specs */
const output = event.records.map((record) => {
try {
const entry = (Buffer.from(record.data, 'base64')).toString('utf8');
/*
* IMPORTANT: `data` object should follow Splunk event formatting specs prior to encoding.
* Otherwise, it will throw a parsing error.
* https://docs.splunk.com/Documentation/Splunk/8.0.3/Data/FormateventsforHTTPEventCollector
*/
const obj = {
sourcetype: "aws:firehose:json", // Required, will error
event: JSON.parse(entry)
}
const payload = (Buffer.from(JSON.stringify(obj), 'utf8')).toString('base64');
success++;
return {
recordId: record.recordId,
result: 'Ok',
data: payload,
};
} catch (e) {
failure++
console.error(e.message());
return {
recordId: record.recordId,
result: 'ProcessingFailed'
};
}
});
console.log(`Processing completed. Successful records ${success}. Failed records ${failure}.`);
callback(null, {records: output});
}
推荐阅读
- memory-management - 主管和工作人员的风暴记忆设置
- prometheus - 如何编写 PromQL 以计算每个端点应用程序收到的 http 请求数?
- python - 使用 Python C 扩展获取当前工作目录
- mysql - My SQL 5.7.33 错误代码 - 1126。无法打开共享库
- firebase - 无法在firebase实时数据库中的一个节点中获取一个孩子的所有孩子
- sql - 需要支持将 SQL Server 查询转换为 Oracle
- c# - 如何使用 NPOI C# 包创建数据透视表并保存它?
- python - 使用 split 函数应用:'expand' 是 split() 的无效关键字参数
- python - python/dataframe - 合并重复的行
- sql - Laravel 中的 SQL 查询没有返回正确的结果