首页 > 解决方案 > 如何正确使用 NiFi ExtractGrok

问题描述

我正在使用:NiFi v1.8.0 和 Logstash v7.1.1

我的任务是将我们所有的 Logstash 配置转移到 NiFi。我试图了解 NiFi ExtractGrok 的工作原理,但找不到任何示例。这打算如何使用?以及如何使用这个 grok 处理器设置 NiFi 属性?当我指的是示例时,我指的是实际示例,这些示例向您展示了之前和之后的内容,以便人们可以了解正在发生的事情。我已经阅读了 NiFi ExtractGrok 文档,但它非常有限,并且似乎假设您了解它是如何工作的。

这是我能找到的唯一示例:如何在 ApacheNifi 中使用 ExtractGrok 处理器获取多行?

标签: logstashapache-nifilogstash-grok

解决方案


根据您所说,您需要的处理器是ConvertRecord而不是ExtractGrok. ExtractGrok只会将某些字段提取到 FlowFile 属性或内容中。

如果您想将日志文件格式化为可行的格式(如 JSON,如果您想将这些文件发送到 ElasticSearch),那么您可以GrokReader用作Record ReaderJsonRecordSetWriter 。Record Writer

然后,您将Schema Text在两者中配置您的(或使用架构注册表)RecordReaderRecordWriter作为您的架构,并设置Grok Expression为您的GrokReader.

例如:

我的日志消息记录如下:

2019-12-09 07:59:59,136 this is the first log message
2019-12-09 09:59:59,136 this is the first log message with a stack trace: org.springframework.boot.actuate.jdbc.DataSourceHealthIndicator - DataSource health check failed
org.springframework.jdbc.CannotGetJdbcConnectionException: Failed to obtain JDBC Connection; nested exception is org.apache.commons.dbcp.SQLNestedException: Cannot create PoolableConnectionFactory (Communications link failure
The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.)
    at org.springframework.jdbc.datasource.DataSourceUtils.getConnection(DataSourceUtils.java:81)......

所以,我的想法是: %{TIMESTAMP_ISO8601:timestamp}\s+%{GREEDYDATA:log_message}

我的架构是:

{
  "name": "MyClass",
  "type": "record",
  "namespace": "com.acme.avro",
  "fields": [
    {
      "name": "timestamp",
      "type": "string"
    },
    {
      "name": "log_message",
      "type": "string"
    },
    {
      "name": "stackTrace",
      "type": "string"
    }
  ]
}

请注意stackTrace我添加到架构中的字段。自动将GrokReader堆栈跟踪映射到自己的字段中。stackTrace因此,如果您也想映射它,则必须添加字段。log_message然后,您可以根据需要使用 Jolt将其放入现场。其输出ConvertRecord将是:

[ {
  "timestamp" : "2019-12-09 07:59:59,136",
  "log_message" : "this is the first log message",
  "stackTrace" : null
}, {
  "timestamp" : "2019-12-09 09:59:59,136",
  "log_message" : "this is the first log message with a stack trace: org.springframework.boot.actuate.jdbc.DataSourceHealthIndicator - DataSource health check failed",
  "stackTrace" : "org.springframework.jdbc.CannotGetJdbcConnectionException: Failed to obtain JDBC Connection; nested exception is org.apache.commons.dbcp.SQLNestedException: Cannot create PoolableConnectionFactory (Communications link failure
The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.)\nat org.springframework.jdbc.datasource.DataSourceUtils.getConnection(DataSourceUtils.java:81)......"
} ]

推荐阅读