首页 > 解决方案 > 使用 Elasticsearch Logstash 索引日志(使用预处理 Python 脚本)

问题描述

我对 Elasticsearch Logstash 有疑问。我的目标是使用 logstash 自动将日志发送到 elasticsearch。

我的原始日志看起来像这样:

2016-09-01T10:58:41+02:00 INFO (6):     165.225.76.76   entreprise1 email1@gmail.com    POST    /application/controller/action Mozilla/5.0 (Windows NT 6.1; Trident/7.0; rv:11.0) like Gecko    {"getid":"1"}   86rkt2dqsdze5if1bqldfl1
2016-09-01T10:58:41+02:00 INFO (6):     165.225.76.76   entreprise2 email2@gmail.com    POST    /application/controller2/action2    Mozilla/5.0 (Windows NT 6.1; Trident/7.0; rv:11.0) like Gecko   {"getid":"2"}   86rkt2rgdgdfgdfgeqldfl1
2016-09-01T10:58:41+02:00 INFO (6):     165.225.76.76   entreprise3 email3@gmail.com    POST    /application/controller2/action2    Mozilla/5.0 (Windows NT 6.1; Trident/7.0; rv:11.0) like Gecko   {"getid":"2"}

问题是我不想以这种形式插入我的日志。我想在 python 中使用预处理脚本,以便在使用 logstash 注入 Elastic 之前转换我的数据。
一开始,我只想使用 python 脚本登录到 elasticsearch。但是我在很多文件夹和文件中分离了大量的日志,并且不断更新,所以我认为使用 logstash 或 filebeat 更强大。我正在尝试使用 filebeat 和 gork 过滤器(对于我的情况来说还不够),但我认为在记录之前使用预处理脚本是不可能的。

日志在 python 脚本的末尾应该是这样的:

{"page": "/application/controller/action", "ip": "165.225.76.76", "browser": "Mozilla/5.0 (Windows NT 6.1; Trident/7.0; rv:11.0) like Gecko", "action": "action", "client": "entreprise1", "email": "email1@gmail.com", "feature": "application_controller_action", "time": "2016-09-01 10:58:41", "method": "POST", "controller": "controller", "application": "application"} 
{"page": "/application/controller2/action2", "ip": "165.225.76.76", "browser": "Mozilla/5.0 (Windows NT 6.1; Trident/7.0; rv:11.0) like Gecko", "action": "action2", "client": "entreprise2", "email": "email2@gmail.com", "feature": "application_controller2_action2", "time": "2016-09-01 10:58:41", "method": "POST", "controller": "controller2", "application": "application"} 
{"page": "/application3/controller/action3", "ip": "165.225.76.76", "browser": "Mozilla/5.0 (Windows NT 6.1; Trident/7.0; rv:11.0) like Gecko", "action": "action3", "client": "entreprise3", "email": "email3@gmail.com", "feature": "application_controller3_action3", "time": "2016-09-01 10:58:41", "method": "POST", "controller": "controller3", "application": "application"}

我正在努力在 logstash 过滤器中实现 python 脚本。我知道这是可以实现的,但基本上它是用 ruby​​ 脚本完成的(参见:https ://www.elastic.co/guide/en/logstash/current/plugins-filters-ruby.html )

1)您认为可以使用 logstash 解决我的问题吗?

2) 如果是,我的 python 脚本应该将原始日志行作为输入,将 json 格式的行作为输出?

3)当日志文件中添加一行日志时,每次都重新插入整个文件,我该如何处理?

4)你认为用 filebeat 可以做到吗?根据你的说法,什么是最适合我的情况?

目前,我的配置 logstash 文件如下所示:

input {
  file {
    path => "/logpath/logs/*/*.txt"
    start_position => "beginning"
  }
}

filter {
  # Here is where I should use my script to transform my logs into my json needed format
  date {
    match => ["time", "YYYY-MM-dd HH:mm:ss" ]
  }

  geoip {
    source => "ip"
    target => "geoip"
  }


}

output {
  stdout  {
    codec => dots {}
  }

  elasticsearch {
    index => "logs_index"
    document_type => "logs"
    template => "./logs_template.json"
    template_name => "logs_test"
    template_overwrite => true
  }

}

我真的要提前感谢任何可以帮助我并考虑我的请求的人。

迪米特里

PS:对不起语法,英语不是我的主要语言。

标签: pythonelasticsearchlogginglogstashfilebeat

解决方案


将日志转换为 json 格式的标准方法是在 logstash 配置中使用 grok,json 过滤器。为了减少 Logstash 处理日志的负载,filebeat 可以与您的配置一起使用。

因此,可以解决此问题的最佳配置是 filebeat->logstash->Elasticsearch 堆栈。

您不需要 python 脚本,而是使用 filebeat 从特定位置捕获所有日志并将其转发到 logstash。

在积累所有日志的服务器上安装filebeat,如果将所有日志定向到特定文件夹中会很好。先安装filebeat,再设置配置转发日志到logstash

这是filebeat配置:

filebeat:
  prospectors:
    -
      paths:
        - "*log_path_of_all_your_log_files*"
      input_type: log
      json.message_key: statement
      json.keys_under_root: true

  idle_timeout: 1s
  registry_file: /var/lib/filebeat/registry
output:

  logstash:
    hosts: ["*logstash-host-ip:5044*"]
    worker: 4
    bulk_max_size: 1024
shipper:
logging:
  files:
    rotateeverybytes: 10485760 # = 10MB
    level: debug

现在在这里,连同您的 logstash 配置,您需要使用 GROK 过滤器将您的日志转换为 json 格式(在 logstash 配置文件中进行更改),然后将其转发到 elasticsearch kibana 或您想要的任何地方。


推荐阅读