elasticsearch - Logstash:配置聚合+经过的过滤器
问题描述
我有这些日志:
"03.08.2020 10:56:38","Event LClick","Type Menu","t=0","beg"
"03.08.2020 10:56:38","Event LClick","Type Menu","Detail SomeDetail","t=109","end"
"03.08.2020 10:56:40","Event LClick","t=1981","beg"
"03.08.2020 10:56:40","Event LClick","t=2090","end"
"03.08.2020 10:56:41","Event LClick","Type ToolBar","t=3026","beg"
"03.08.2020 10:56:43","Event LClick","Type ToolBar","Detail User_Desktop","t=4477","end"
"03.08.2020 10:56:44","Event FormActivate","Name Form_Name:IsaA","t=5444"
"03.08.2020 10:56:51","Event LClick","t=12543","beg"
"03.08.2020 10:56:51","Event LClick","t=12605","end"
"03.08.2020 10:56:52","Event LClick","Form ","Type Label","Name Application.for.training","t=13853","beg"
"03.08.2020 10:57:54","Event LClick","Form Application.for.training","Type Label","Name Application.for.training","t=75442","end"
"03.08.2020 10:57:54","Event FormActivate","Name List.form","t=75785"
"03.08.2020 10:58:04","Event Wheel","Form List.form","Type FormTable","Name Список","t=85769","beg"
"03.08.2020 10:58:04","Event Wheel","Form List.form","Type FormTable","Name Список","t=85769","end"
"03.08.2020 10:58:04","Event Wheel","Form List.form","Type FormTable","Name Список","t=85847","beg"
"03.08.2020 10:58:04","Event Wheel","Form List.form","Type FormTable","Name Список","t=85847","end"
"03.08.2020 10:58:04","Event Wheel","Form List.form","Type FormTable","Name Список","t=85879","beg"
"03.08.2020 10:58:04","Event Wheel","Form List.form","Type FormTable","Name Список","t=85879","end"
"03.08.2020 10:58:04","Event Wheel","Form List.form","Type FormTable","Name Список","t=85925","beg"
"03.08.2020 10:58:04","Event Wheel","Form List.form","Type FormTable","Name Список","t=85925","end"
"03.08.2020 10:58:08","Event LClick","Form List.form","Type FormTable","Name Список","t=89373","beg"
"03.08.2020 10:58:08","Event LClick","Form List.form","Type FormTable","Name Список","Detail Data","t=89451","end"
"03.08.2020 10:58:15","Event LClick","Form List.form","Type FormTable","Name Список","t=96580","beg"
"03.08.2020 10:58:15","Event LClick","Form List.form","Type FormTable","Name Список","Detail Data","t=96643","end"
"03.08.2020 10:58:15","Event LBtnDbl","Form List.form","Type FormTable","Name Список","t=96752","beg"
"03.08.2020 10:59:22","Event FormActivate","Name Another.Form","t=164004"
"03.08.2020 10:59:22","Event LBtnDbl","Form Another.Form","Type FormTable","Name Список","Detail Data","t=164004","end"
"03.08.2020 10:59:25","Event LClick","Form Another.Form","Type ToolBar","Name КоманднаяПанельПереченьРеквизитов","t=167171","beg"
"03.08.2020 10:59:26","Event LClick","Form Another.Form","Type ToolBar","Name КоманднаяПанельПереченьРеквизитов","Detail Заполнить","t=167249","end"
...
我的logstash配置:
input {
beats {
port => '5044'
}
}
filter {
grok {
patterns_dir => ['./patterns']
match => { 'message' => '%{TIME:timestamp}(","Event\s)(?<Event>([^"]+))(","Form\s)?(?<Form>([^"]+))?(","ParentType\s)?(?<parent_type>([^"]+))?(","ParentName\s)?(?<parent_name>([^"]+))?(","Type\s)?(?<type>([^"]+))?(","Name\s)?(?<Name_of_form>([^"]+))?(","Detail\s)?(?<Detail>([^"]+))?(","t=)?(?<t>([\d]+))?(",")?(?<Status>(end|beg))?' }
add_tag => [ '%{Status}' ]
}
dissect {
mapping => {
'[log][file][path]' => 'C:\Program Files\Filebeat\logs\%{somethingtoo}\%{something}\%{user}\%{filename}.txt'
}
}
date {
match => [ 'timestamp', 'dd.MM.yyyy HH:mm:ss' ]
}
elapsed {
unique_id_field => 'Event'
start_tag => 'beg'
end_tag => 'end'
new_event_on_match => false
}
if 'elapsed' in [tags] {
aggregate {
task_id => '%{Event}'
code => 'map["duration"] = [(event.get("elapsed_time")*1000).to_i]'
map_action => 'create'
}
}
mutate {
remove_field => ['timestamp', 'ecs', 'log', 'tags', 'message', '@version', 'something', 'somethingtoo', 'filename', 'input', 'host', 'agent', 't', 'parent_type', 'parent_name', 'type']
rename => {'elapsed_time' => 'Event_duration'}
}
}
output {
elasticsearch {
hosts => ['localhost:9200']
index => 'test'
}
}
问题:现在我使用 beg(动作的开始)和 end(动作的结束)计算行之间的时间差。但这没有多大意义,因为它几乎总是 0 秒。我将如何实现它:当表单字段出现在行中时(如果消息中的“表单”),请考虑某个表单的第一次出现和最后一次出现之间的区别。如果不清楚将 end_tag 绑定到什么,我该如何实现。
对于上面的日志,我应该得到以下信息:
- 某些表单被激活(Event FormActivate),在一个单独的字段表单名称(Name Name_of_form)中。然后是这个表单的动作(必须计算其时间),然后是新表单的激活,依此类推,直到文件结束。
- 只要形式是一样的。继续前行。
- 此表格连续最后一次出现
- Elapsed_time: "08/03/2020 10:58:15" - "08/03/2020 10:58:04" = 11 秒
如果有任何帮助,我将不胜感激!
解决方案
推荐阅读
- django - django manage.py 迁移脚本失败后如何使postgresql数据库摆脱死锁
- wordpress - 如何在 Wordpress 网站中添加 AUTOSHIP 功能
- python - 函数返回无效的 int() 字面量,基数为 10:''
- webpack - Webpack 4. ReferenceError: require is not defined
- javascript - 导出 jQuery 自定义插件
- python-3.x - Python yaml 解析错误
- node.js - 添加多个占位符时,快速路由不起作用
- clojure - 如何在 Clojure 中访问表单的各个字段?
- firebase - 如何在 Firebase 函数中使用通知?错误:无法处理请求
- javascript - Ruby on Rails 和 React/Redux, Uncaught TypeError: (0 , _tasks.getTask) is not a function