logstash - 如何正确使用 NiFi ExtractGrok
问题描述
我正在使用:NiFi v1.8.0 和 Logstash v7.1.1
我的任务是将我们所有的 Logstash 配置转移到 NiFi。我试图了解 NiFi ExtractGrok 的工作原理,但找不到任何示例。这打算如何使用?以及如何使用这个 grok 处理器设置 NiFi 属性?当我指的是示例时,我指的是实际示例,这些示例向您展示了之前和之后的内容,以便人们可以了解正在发生的事情。我已经阅读了 NiFi ExtractGrok 文档,但它非常有限,并且似乎假设您了解它是如何工作的。
这是我能找到的唯一示例:如何在 ApacheNifi 中使用 ExtractGrok 处理器获取多行?
解决方案
根据您所说,您需要的处理器是ConvertRecord而不是ExtractGrok
. ExtractGrok
只会将某些字段提取到 FlowFile 属性或内容中。
如果您想将日志文件格式化为可行的格式(如 JSON,如果您想将这些文件发送到 ElasticSearch),那么您可以将GrokReader用作Record Reader
JsonRecordSetWriter 。Record Writer
然后,您将Schema Text
在两者中配置您的(或使用架构注册表)RecordReader
和RecordWriter
作为您的架构,并设置Grok Expression
为您的GrokReader
.
例如:
我的日志消息记录如下:
2019-12-09 07:59:59,136 this is the first log message
2019-12-09 09:59:59,136 this is the first log message with a stack trace: org.springframework.boot.actuate.jdbc.DataSourceHealthIndicator - DataSource health check failed
org.springframework.jdbc.CannotGetJdbcConnectionException: Failed to obtain JDBC Connection; nested exception is org.apache.commons.dbcp.SQLNestedException: Cannot create PoolableConnectionFactory (Communications link failure
The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.)
at org.springframework.jdbc.datasource.DataSourceUtils.getConnection(DataSourceUtils.java:81)......
所以,我的想法是:
%{TIMESTAMP_ISO8601:timestamp}\s+%{GREEDYDATA:log_message}
我的架构是:
{
"name": "MyClass",
"type": "record",
"namespace": "com.acme.avro",
"fields": [
{
"name": "timestamp",
"type": "string"
},
{
"name": "log_message",
"type": "string"
},
{
"name": "stackTrace",
"type": "string"
}
]
}
请注意stackTrace
我添加到架构中的字段。自动将GrokReader
堆栈跟踪映射到自己的字段中。stackTrace
因此,如果您也想映射它,则必须添加字段。log_message
然后,您可以根据需要使用 Jolt将其放入现场。其输出ConvertRecord
将是:
[ {
"timestamp" : "2019-12-09 07:59:59,136",
"log_message" : "this is the first log message",
"stackTrace" : null
}, {
"timestamp" : "2019-12-09 09:59:59,136",
"log_message" : "this is the first log message with a stack trace: org.springframework.boot.actuate.jdbc.DataSourceHealthIndicator - DataSource health check failed",
"stackTrace" : "org.springframework.jdbc.CannotGetJdbcConnectionException: Failed to obtain JDBC Connection; nested exception is org.apache.commons.dbcp.SQLNestedException: Cannot create PoolableConnectionFactory (Communications link failure
The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.)\nat org.springframework.jdbc.datasource.DataSourceUtils.getConnection(DataSourceUtils.java:81)......"
} ]
推荐阅读
- javascript - 过滤器没有在 dom Angular 上显示结果?
- angular - Ionic [Angular] - 反应形式:使用 setValue() selectionStart/End 后不移动光标
- mongodb - 允许文档中的嵌套数组与集合中的其他文档共享相同的对象
- mongodb - 为什么它返回 null 而不是 value?
- caching - 定期缓存和更新复杂数据
- python-3.x - Python 快速入门 ModuleNotFoundError:没有名为“google_auth_oauthlib”的模块
- c++ - 如何有效地从另一个字符串中找到的字符串中删除重复字符?
- project-reactor - 从 EmitterProcessor 移动到 Sinks.many()
- scala - 根据 WrappedArray 列表中的值加入数据框
- python - 点不安装包?