首页 > 解决方案 > 同一_id下的指定索引覆盖日志

问题描述

我正在使用 filebeat - 6.5.1、Logstash - 6.5.1 和 elasticsearch - 6.5.1

我在单个配置文件中使用多个 GROK 并尝试将日志发送到 Elasticsearch

下面是我的 Filebeat.yml

filebeat.prospectors:

type: log
paths:

var/log/message
fields:
type: apache_access
tags: ["ApacheAccessLogs"]
type: log
paths:

var/log/indicate
fields:
type: apache_error
tags: ["ApacheErrorLogs"]
type: log
paths:

var/log/panda
fields:
type: mysql_error
tags: ["MysqlErrorLogs"]
output.logstash:
The Logstash hosts
hosts: ["logstash:5044"]

以下是我的 logstash 配置文件 -

input {
beats {
port => 5044
tags => [ "ApacheAccessLogs", "ApacheErrorLogs", "MysqlErrorLogs" ]
}
}
filter {
if "ApacheAccessLogs" in [tags] {
grok {
match => [
"message" , "%{COMBINEDAPACHELOG}+%{GREEDYDATA:extra_fields}",
"message" , "%{COMMONAPACHELOG}+%{GREEDYDATA:extra_fields}"
]
overwrite => [ "message" ]
}
mutate {
convert => ["response", "integer"]
convert => ["bytes", "integer"]
convert => ["responsetime", "float"]
}
geoip {
source => "clientip"
target => "geoip"
add_tag => [ "apache-geoip" ]
}
date {
match => [ "timestamp" , "dd/MMM/YYYY:HH:mm:ss Z" ]
remove_field => [ "timestamp" ]
}
useragent {
source => "agent"
}
}
if "ApacheErrorLogs" in [tags] {
grok {
match => { "message" => ["[%{APACHE_TIME:[apache2][error][timestamp]}] [%{LOGLEVEL:[apache2][error][level]}]( [client %{IPORHOST:[apache2][error][client]}])? %{GREEDYDATA:[apache2][error][message]}",
"[%{APACHE_TIME:[apache2][error][timestamp]}] [%{DATA:[apache2][error][module]}:%{LOGLEVEL:[apache2][error][level]}] [pid %{NUMBER:[apache2][error][pid]}(:tid %{NUMBER:[apache2][error][tid]})?]( [client %{IPORHOST:[apache2][error][client]}])? %{GREEDYDATA:[apache2][error][message1]}" ] }
pattern_definitions => {
"APACHE_TIME" => "%{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{YEAR}"
}
remove_field => "message"
}
mutate {
rename => { "[apache2][error][message1]" => "[apache2][error][message]" }
}
date {
match => [ "[apache2][error][timestamp]", "EEE MMM dd H:m:s YYYY", "EEE MMM dd H:m:s.SSSSSS YYYY" ]
remove_field => "[apache2][error][timestamp]"
}
}
if "MysqlErrorLogs" in [tags] {
grok {
match => { "message" => ["%{LOCALDATETIME:[mysql][error][timestamp]} ([%{DATA:[mysql][error][level]}] )?%{GREEDYDATA:[mysql][error][message]}",
"%{TIMESTAMP_ISO8601:[mysql][error][timestamp]} %{NUMBER:[mysql][error][thread_id]} [%{DATA:[mysql][error][level]}] %{GREEDYDATA:[mysql][error][message1]}",
"%{GREEDYDATA:[mysql][error][message2]}"] }
pattern_definitions => {
"LOCALDATETIME" => "[0-9]+ %{TIME}"
}
remove_field => "message"
}
mutate {
rename => { "[mysql][error][message1]" => "[mysql][error][message]" }
}
mutate {
rename => { "[mysql][error][message2]" => "[mysql][error][message]" }
}
date {
match => [ "[mysql][error][timestamp]", "ISO8601", "YYMMdd H:m:s" ]
remove_field => "[apache2][access][time]"
}
}
}

output {
if "ApacheAccessLogs" in [tags] {
elasticsearch { hosts => ["elasticsearch:9200"]
index => "apache"
document_id => "apacheaccess"
}
}
if "ApacheErrorLogs" in [tags] {
elasticsearch { hosts => ["elasticsearch:9200"]
index => "apache"
document_id => "apacheerror"
}
}
if "MysqlErrorLogs" in [tags] {
elasticsearch { hosts => ["elasticsearch:9200"]
index => "apache"
document_id => "sqlerror"
}
}
stdout { codec => rubydebug }
}

数据被发送到弹性搜索,但同一索引中的每个 document_id 只创建了 3 条记录。

只创建了 3 条记录,并且每个传入的新日志都被覆盖到相同的 document_id 上,而旧的日志将丢失。

你们能帮帮我吗?

标签: logstashelastic-stack

解决方案


document_id的定义是为一个事件提供一个唯一的文档 id。在您的情况下,由于它们是静态的(apacheaccessapacheerrorsqlerror),每个索引只有 1 个事件被摄取到 elasticsearch 中,被最新事件覆盖。

由于您有 3 种不同的数据类型,您似乎要为每种事件类型(ApacheAccessLogs、ApacheErrorLogs、MysqlErrorLogs)提供不同的索引,如下所示:

output {
  if "ApacheAccessLogs" in [tags] {
    elasticsearch {
      hosts => ["elasticsearch:9200"]
      index => "apache-access"
    }
  }
  if "ApacheErrorLogs" in [tags] {
    elasticsearch {
      hosts => ["elasticsearch:9200"]
      index => "apache-error"
    }
  }
  if "MysqlErrorLogs" in [tags] {
    elasticsearch {
      hosts => ["elasticsearch:9200"]
      index => "mysql-error"
    }
  }
  stdout {
    codec => rubydebug
  }
}

您需要手动设置 id 的情况并不多(例如,在重新摄取数据的情况下),因为 Logstash 和 Elasticsearch 将自行管理。

但如果是这种情况,并且您不能使用字段来单独识别每个事件,则可以使用为此而设计的logstash-filter-fingerprint


推荐阅读