首页 > 解决方案 > Logstash:是否有可能以某种方式添加一个日期差为两个或多个日志行的字段?

问题描述

问题如下,我使用filebeat和logstash上传日志到elasticsearch。

"03.08.2020 10:56:38","Event LClick","Type Menu","t=0","beg"

"03.08.2020 10:56:38","Event LClick","Type Menu","Detail Impale","t=109","end"

"03.08.2020 10:56:40","Event LClick","t=1981","beg"

"03.08.2020 10:56:40","Event LClick","t=2090","end"

"03.08.2020 10:56:41","Event LClick","Type ToolBar","t=3026","beg"

"03.08.2020 10:56:44","Event FormActivate","Name SomeName","t=5444"

"03.08.2020 10:56:43","Event LClick","Type ToolBar","Detail Test","t=4477","end"

这些是用户在 Web 表单中执行的操作的日志。每个动作都有一个开始(“beg”在行尾)和一个结束(“end”在行尾)。

如果可能的话,我需要计算用户执行操作的时间差并将其作为字段输出(即使它为零)。

示例:“03.08.2020 10:56:44”-“03.08.2020 10:56:41”= 3 秒(这应该是一个新字段)

也许我需要以某种方式组合这些字段?

如果有一个在logstash中减去日期的解决方案,那么我该如何为在开始和结束之间有其他动作的动作实现这个,例如“Event FormActivate”。

也许这可以通过弹性搜索中已经存在的某些查询来解决。

我是一个完整的新手,将不胜感激任何帮助。我现在的logstash配置:

input {
    beats {
        port => '5044'
    }
}
 filter {
    mutate {
        remove_field => [ '@version', 'input', 'host', 'ecs', 'agent' ]
        remove_tag => [ 'beats_input_codec_plain_applied' ]
    }
    grok {
        patterns_dir => ['./patterns']
        match => { 'message' => '%{TIME:timestamp}(","Event\s)(?<event>([^"]+))(","Form\s)?(?<form>([^"]+))?(","ParentType\s)?(?<parent_type>([^"]+))?(","ParentName\s)?(?<parent_name>([^"]+))?(","Type\s)?(?<type>([^"]+))?(","Name\s)?(?<name>([^"]+))?(","Detail\s)?(?<detail>([^"]+))?(","t=)?(?<t>([\d]+))?' }
    }
    date {
        match => [ 'timestamp', 'dd.MM.yyyy HH:mm:ss' ]
        timezone => 'Europe/Moscow'
        target => '@timestamp'
        remove_field => 'timestamp'
    }
    mutate {
        rename => ['log', 'user_path']
        rename => ['@timestamp', 'logdate']
    }
}
output {
    elasticsearch {
        hosts => ['localhost:9200']
        index => 'test'
    }
}

更新:

我试图理解 Val 建议的线程中的操作。但我还是没有成功。这就是我对 logstash 配置所做的:

 filter {
    grok {
        patterns_dir => ['./patterns']
        match => { 'message' => '%{TIME:timestamp}(","Event\s)(?<event>([^"]+))(","Form\s)?(?<form>([^"]+))?(","ParentType\s)?(?<parent_type>([^"]+))?(","ParentName\s)?(?<parent_name>([^"]+))?(","Type\s)?(?<type>([^"]+))?(","Name\s)?(?<name>([^"]+))?(","Detail\s)?(?<detail>([^"]+))?(","t=)?(?<t>([\d]+))?(",")?(?<status>(end|beg))?' }
        add_tag => [ '%{status}' ]
    }
    date {
        match => [ 'timestamp', 'dd.MM.yyyy HH:mm:ss' ]
    }
    elapsed {
        unique_id_field => 'event'
        start_tag => 'beg'
        end_tag => 'end'
        new_event_on_match => true
        add_tag => ['1->2']
    }
    if '1->2' in [tags] and 'elapsed' in [tags] {
        aggregate {
            task_id => '%{event}'
            code => 'map["report"] = [(event["elapsed_time"]*1000).to_i]'
            map_action => 'create'
            end_of_task => true
        }
    }
}

但它只是行不通。在我看来,我很困惑:(

也许如果我在elasticsearch中展示我想看到的东西会更好。对于七行日志(帖子开头的日志),它应该如下所示:

{
                 "username" => "I will get the username from the log path and I want it to get here too",
               "elapsed_time" => date difference,
                  "event" => "event from line",
    "elapsed_timestamp_start" => "start time"
}

从elasticsearch的七行日志来看,应该有3条这样的记录。请帮我为这个任务写一个过滤器。谢谢!

聚合过滤器插件文档的另一个问题:

 You should be very careful to set Logstash filter workers to 1 (-w 1 flag) for this filter to work correctly otherwise events may be processed out of sequence and unexpected results will occur.

我找不到需要添加此标志的答案。也许这就是问题所在。

标签: elasticsearchlogstashkibanalogstash-configurationfilebeat

解决方案


推荐阅读