首页 > 解决方案 > 如何从 Kinesis Firehose 向 Splunk 发送非 cloudwatch JSON 事件?

问题描述

我正在尝试从 Kinesis Firehose 向 Splunk 发送非 cloudwatch 事件。我正在使用 Lambda 处理事件,并以下列格式将其反馈回 Firehose(Firehose 需要):

{ 
    "records": [
        {
          "recordId": "2345678",
          "result": "Ok",
          "data": [base64-encoded custom JSON]
        }
    ]
}

但是,一旦到达 Splunk,它就会抛出一个模糊的解析错误,并且帮助链接无处可去:

"errorCode":"Splunk.InvalidDataFormat","errorMessage":"The data is not formatted correctly. To see how to properly format data for Raw or Event HEC endpoints, see Splunk Event Data (http://dev.splunk.com/view/event-collector/SP-CAAAE6P#data)"

我在这里想念什么?HEC 端点无法以标准格式解析来自 Firehose 的消息,这似乎很奇怪。

我正在使用aws_kinesis_firehose_delivery_stream Terraform 模块中的 splunk_configuration 块将消息发送到 HEC 事件端点。

标签: jsonparsingterraformsplunkamazon-kinesis-firehose

解决方案


弄清楚了!对于后代,因为这没有很好的记录:

Kinesis Firehosedata有效负载中的字段必须是遵循Splunk 事件收集器规范的 base64 编码对象。

只要Firehose和 Splunk 都可以读取 Lambda 返回的有效负载,它就不会引发错误。

这是 Kinesis Firehose 转换器 Lambda(node12 运行时)的代码:

/*
* Transformer for sending Kinesis Firehose events to Splunk
*
* Properly formats incoming messages for Splunk ingestion
* Returned object gets fed back into Kinesis Firehose and sent to Splunk
*/

'use strict';
console.log('Loading function');

exports.handler = (event, context, callback) => {
    let success = 0; // Number of valid entries found
    let failure = 0; // Number of invalid entries found
    let dropped = 0; // Number of dropped entries

    /* Process the list of records and transform them to adhere to Splunk specs */
    const output = event.records.map((record) => {
        try {
            const entry = (Buffer.from(record.data, 'base64')).toString('utf8');

            /*
             * IMPORTANT: `data` object should follow Splunk event formatting specs prior to encoding.
             * Otherwise, it will throw a parsing error.
             * https://docs.splunk.com/Documentation/Splunk/8.0.3/Data/FormateventsforHTTPEventCollector
             */
            const obj = {
                sourcetype: "aws:firehose:json", // Required, will error
                event: JSON.parse(entry)
            }
            const payload = (Buffer.from(JSON.stringify(obj), 'utf8')).toString('base64');
            success++;
            return {
                recordId: record.recordId,
                result: 'Ok',
                data: payload,
            };
        } catch (e) {
            failure++
            console.error(e.message());
            return {
                recordId: record.recordId,
                result: 'ProcessingFailed'
            };
        }
    });
    console.log(`Processing completed.  Successful records ${success}. Failed records ${failure}.`);
    callback(null, {records: output});
}

推荐阅读