作者: 启卫
时间: 2017年4月11号
功能: 使用logstash解析apache web日志
- 使用Filebeat来发送Apache Web Logs做为接收端
- 解析这些日志
- 将各部分命名
- 将这些数据发送给elasticsearch
- 在配置文件中定义pipepline
1. 预前工作
#下载需求解析的日志样本
cd /opt
$wget https://download.elastic.co/demos/logstash/gettingstarted/logstash-tutorial.log.gz
#下载Filebeat
curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-5.3.0-x86_64.rpm
#安装
sudo rpm -vi filebeat-5.3.0-x86_64.rpm
#配置filebeat.yml
vi /etc/filebeat/filebeat.yml
filebeat.prospectors:
- input_type: log
paths:
- /path/to/file/logstash-tutorial.log
output.logstash:
hosts: ["localhost:5043"]
#运行filebeat
# /usr/share/filebeat/bin/filebeat -e -c /etc/filebeat/filebeat.yml -d "publish"
2017/04/11 13:21:33.329242 beat.go:285: INFO Home path: [/usr/share/filebeat/bin] Config path: [/usr/share/filebeat/bin] Data path: [/usr/share/filebeat/bin/data] Logs path: [/usr/share/filebeat/bin/logs]
2017/04/11 13:21:33.329266 beat.go:186: INFO Setup Beat: filebeat; Version: 5.3.0
2017/04/11 13:21:33.329350 output.go:254: INFO Loading template enabled. Reading template file: /usr/share/filebeat/bin/filebeat.template.json
2017/04/11 13:21:33.329602 metrics.go:23: INFO Metrics logging every 30s
2017/04/11 13:21:33.336511 output.go:265: INFO Loading template enabled for Elasticsearch 2.x. Reading template file: /usr/share/filebeat/bin/filebeat.template-es2x.json
2017/04/11 13:21:33.337007 client.go:123: INFO Elasticsearch url: http://localhost:9200
2017/04/11 13:21:33.337036 outputs.go:108: INFO Activated elasticsearch as output plugin.
2017/04/11 13:21:33.337069 logstash.go:90: INFO Max Retries set to: 3
2017/04/11 13:21:33.337097 outputs.go:108: INFO Activated logstash as output plugin.
2017/04/11 13:21:33.337101 publish.go:238: DBG Create output worker
2017/04/11 13:21:33.337130 publish.go:238: DBG Create output worker
2017/04/11 13:21:33.337153 publish.go:280: DBG No output is defined to store the topology. The server fields might not be filled.
2017/04/11 13:21:33.337172 publish.go:295: INFO Publisher name: elk.infoclue.net
2017/04/11 13:21:33.337267 async.go:63: INFO Flush Interval set to: 1s
2017/04/11 13:21:33.337276 async.go:64: INFO Max Bulk Size set to: 50
2017/04/11 13:21:33.337283 async.go:72: DBG create bulk processing worker (interval=1s, bulk size=50)
2017/04/11 13:21:33.337300 async.go:63: INFO Flush Interval set to: 1s
2017/04/11 13:21:33.337303 async.go:64: INFO Max Bulk Size set to: 2048
2017/04/11 13:21:33.337306 async.go:72: DBG create bulk processing worker (interval=1s, bulk size=2048)
2017/04/11 13:21:33.337349 modules.go:93: ERR Not loading modules. Module directory not found: /usr/share/filebeat/bin/module
2017/04/11 13:21:33.337412 beat.go:221: INFO filebeat start running.
2017/04/11 13:21:33.337448 registrar.go:85: INFO Registry file set to: /usr/share/filebeat/bin/data/registry
2017/04/11 13:21:33.337463 registrar.go:106: INFO Loading registrar data from /usr/share/filebeat/bin/data/registry
2017/04/11 13:21:33.337880 registrar.go:123: INFO States Loaded from registrar: 1
2017/04/11 13:21:33.337893 crawler.go:38: INFO Loading Prospectors: 1
2017/04/11 13:21:33.337953 prospector_log.go:61: INFO Prospector with previous states loaded: 1
2017/04/11 13:21:33.337991 prospector.go:124: INFO Starting prospector of type: log; id: 8306424514830368397
2017/04/11 13:21:33.337999 crawler.go:58: INFO Loading and starting Prospectors completed. Enabled prospectors: 1
2017/04/11 13:21:33.338004 registrar.go:236: INFO Starting Registrar
2017/04/11 13:21:33.338021 sync.go:41: INFO Start sending events to output
2017/04/11 13:21:33.338044 spooler.go:63: INFO Starting spooler: spool_size: 2048; idle_timeout: 5s
2017/04/11 13:21:38.338146 sync.go:70: DBG Events sent: 1
2 配置Logstash接收Filebeat输入
# 在logstash家目录下创建first-pipeline.conf文件
vi /opt/logstash-5.3.0/first-pipeline.conf
input {
beats {
port => "5043"
}
}
# The filter part of this file is commented out to indicate that it is
# optional.
# filter {
#
# }
output {
stdout { codec => rubydebug }
}
# 检查配置是否正确
bin/logstash -f first-pipeline.conf --config.test_and_exit
Sending Logstash's logs to /opt/logstash-5.3.0/logs which is now configured via log4j2.properties
Configuration OK
[2017-04-11T21:36:19,758][INFO ][logstash.runner ] Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash
# 启用Logstash
# --config.reload.automatic 选项开启自动重载配置文件,改变配置文件后,无需重启logstash服务
bin/logstash -f first-pipeline.conf --config.reload.automatic
Sending Logstash's logs to /opt/logstash-5.3.0/logs which is now configured via log4j2.properties
[2017-04-11T21:39:26,308][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://localhost:9200/]}}
[2017-04-11T21:39:26,378][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://localhost:9200/, :path=>"/"}
log4j:WARN No appenders could be found for logger (org.apache.http.client.protocol.RequestAuthCache).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
[2017-04-11T21:39:26,828][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>#<URI::HTTP:0x55fb35ba URL:http://localhost:9200/>}
[2017-04-11T21:39:26,852][INFO ][logstash.outputs.elasticsearch] Using mapping template from {:path=>nil}
[2017-04-11T21:39:27,075][INFO ][logstash.outputs.elasticsearch] Attempting to install template {:manage_template=>{"template"=>"logstash-*", "version"=>50001, "settings"=>{"index.refresh_interval"=>"5s"}, "mappings"=>{"_default_"=>{"_all"=>{"enabled"=>true, "norms"=>false}, "dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"*", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword"}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date", "include_in_all"=>false}, "@version"=>{"type"=>"keyword", "include_in_all"=>false}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}}
[2017-04-11T21:39:27,099][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>[#<URI::Generic:0x6e029ef7 URL://localhost:9200>]}
[2017-04-11T21:39:27,313][INFO ][logstash.filters.geoip ] Using geoip database {:path=>"/opt/logstash-5.3.0/vendor/bundle/jruby/1.9/gems/logstash-filter-geoip-4.0.4-java/vendor/GeoLite2-City.mmdb"}
[2017-04-11T21:39:27,457][INFO ][logstash.pipeline ] Starting pipeline {"id"=>"main", "pipeline.workers"=>1, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>125}
[2017-04-11T21:39:28,653][INFO ][logstash.inputs.beats ] Beats inputs: Starting input listener {:address=>"0.0.0.0:5043"}
[2017-04-11T21:39:28,803][INFO ][logstash.pipeline ] Pipeline main started
[2017-04-11T21:39:29,234][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9601}
#在控制台看到的结果
{
"@timestamp" => 2016-10-11T20:54:06.733Z,
"offset" => 325,
"@version" => "1",
"beat" => {
"hostname" => "My-MacBook-Pro.local",
"name" => "My-MacBook-Pro.local"
},
"input_type" => "log",
"host" => "My-MacBook-Pro.local",
"source" => "/path/to/file/logstash-tutorial.log",
"message" => "83.149.9.216 - - [04/Jan/2015:05:13:42 +0000] \"GET /presentations/logstash-monitorama-2013/images/kibana-search.png HTTP/1.1\" 200 203023 \"http://semicomplete.com/presentations/logstash-monitorama-2013/\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36\"",
"type" => "log",
"tags" => [
[0] "beats_input_codec_plain_applied"
]
}
...
3 使用Grok插件来解析日志
Grok过滤插件使用你能够将非格式化的日志解析成格式化和可查询的日志格式
一条web服务器数据样本如下所示:
83.149.9.216 - - [04/Jan/2015:05:13:42 +0000] "GET /presentations/logstash-monitorama-2013/images/kibana-search.png HTTP/1.1" 200 203023 "http://semicomplete.com/presentations/logstash-monitorama-2013/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"
为了解析日志,可以使用%{COMBINDAPACHELOG} grok 模式来匹配,如下表所示:
信息 | 字段名 |
IP Address | clientip |
User ID | ident |
User Authentication | auth |
timestamp | timestamp |
HTTP Verb | verb |
Request body | request |
HTTP Version | httpversion |
HTTP Status Code | response |
Bytes served | bytes |
Referrer URL | referrer |
User Agent | agent |
# 编辑first-pipeline.conf文件
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}"}
}
}
# 已经开启automatic config reloading, 无需重启logstash服务
# 需要强制filebeat重新读取文件信息
# 删除filebeat 注册文件
sudo rm /usr/share/filebeat/bin/data/registry
# 重启服务
/usr/share/filebeat/bin/filebeat -e -c /etc/filebeat/filebeat.yml -d "publish"
# console界面信息
{
"request" => "/presentations/logstash-monitorama-2013/images/kibana-search.png",
"agent" => "\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36\"",
"offset" => 325,
"auth" => "-",
"ident" => "-",
"input_type" => "log",
"verb" => "GET",
"source" => "/path/to/file/logstash-tutorial.log",
"message" => "83.149.9.216 - - [04/Jan/2015:05:13:42 +0000] \"GET /presentations/logstash-monitorama-2013/images/kibana-search.png HTTP/1.1\" 200 203023 \"http://semicomplete.com/presentations/logstash-monitorama-2013/\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36\"",
"type" => "log",
"tags" => [
[0] "beats_input_codec_plain_applied"
],
"referrer" => "\"http://semicomplete.com/presentations/logstash-monitorama-2013/\"",
"@timestamp" => 2016-10-11T21:04:36.167Z,
"response" => "200",
"bytes" => "203023",
"clientip" => "83.149.9.216",
"@version" => "1",
"beat" => {
"hostname" => "My-MacBook-Pro.local",
"name" => "My-MacBook-Pro.local"
},
"host" => "My-MacBook-Pro.local",
"httpversion" => "1.1",
"timestamp" => "04/Jan/2015:05:13:42 +0000"
}
4 使用Geoip插件来加强数据分析
geoip插件查询IP地址,查找出当前所在位置,将位置信息加入到日志信息中
# 将geoip数据插入到first-pipleine.conf配置文件中的filter中
# filter将按顺序执行过滤,所以grok必须在geoip之前
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}"}
}
geoip {
source => "clientip"
}
}
# 删除filebeat 注册文件
sudo rm /usr/share/filebeat/bin/data/registry
# 重启服务
/usr/share/filebeat/bin/filebeat -e -c /etc/filebeat/filebeat.yml -d "publish"
#console界面输出信息
{
"request" => "/presentations/logstash-monitorama-2013/images/kibana-search.png",
"agent" => "\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36\"",
"geoip" => {
"timezone" => "Europe/Moscow",
"ip" => "83.149.9.216",
"latitude" => 55.7522,
"continent_code" => "EU",
"city_name" => "Moscow",
"country_code2" => "RU",
"country_name" => "Russia",
"dma_code" => nil,
"country_code3" => "RU",
"region_name" => "Moscow",
"location" => [
[0] 37.6156,
[1] 55.7522
],
"postal_code" => "101194",
"longitude" => 37.6156,
"region_code" => "MOW"
},
...
5 将数据index到elasticsearch中
现在web日志信息已经分分成不同的段,Logstash可以将数据发送给elasticserch。
# 编辑first-pipleline.conf文件
# logstash使用http协议连接elasticsearch
output {
elasticsearch {
hosts => [ "localhost:9200" ]
}
}
# 最后first-pipeline.conf文件
input {
beats {
port => "5043"
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}"}
}
geoip {
source => "clientip"
}
}
output {
elasticsearch {
hosts => [ "localhost:9200" ]
}
}
# 删除filebeat 注册文件
sudo rm /usr/share/filebeat/bin/data/registry
# 重启服务
/usr/share/filebeat/bin/filebeat -e -c /etc/filebeat/filebeat.yml -d "publish"
#查询数据
# 将$DATA换成特定的日期YYYY.MM.DD格式
curl -XGET 'localhost:9200/logstash-$DATE/_search?pretty&q=response=200'
{
"took" : 21,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"failed" : 0
},
"hits" : {
"total" : 98,
"max_score" : 3.745223,
"hits" : [
{
"_index" : "logstash-2016.10.11",
"_type" : "log",
"_id" : "AVe14gMiYMkU36o_eVsA",
"_score" : 3.745223,
"_source" : {
"request" : "/presentations/logstash-monitorama-2013/images/frontend-response-codes.png",
"agent" : "\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36\"",
"geoip" : {
"timezone" : "Europe/Moscow",
"ip" : "83.149.9.216",
"latitude" : 55.7522,
"continent_code" : "EU",
"city_name" : "Moscow",
"country_code2" : "RU",
"country_name" : "Russia",
"dma_code" : null,
"country_code3" : "RU",
"region_name" : "Moscow",
"location" : [
37.6156,
55.7522
],
"postal_code" : "101194",
"longitude" : 37.6156,
"region_code" : "MOW"
},
"offset" : 2932,
"auth" : "-",
"ident" : "-",
"input_type" : "log",
"verb" : "GET",
"source" : "/path/to/file/logstash-tutorial.log",
"message" : "83.149.9.216 - - [04/Jan/2015:05:13:45 +0000] \"GET /presentations/logstash-monitorama-2013/images/frontend-response-codes.png HTTP/1.1\" 200 52878 \"http://semicomplete.com/presentations/logstash-monitorama-2013/\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36\"",
"type" : "log",
"tags" : [
"beats_input_codec_plain_applied"
],
"referrer" : "\"http://semicomplete.com/presentations/logstash-monitorama-2013/\"",
"@timestamp" : "2016-10-11T22:34:25.317Z",
"response" : "200",
"bytes" : "52878",
"clientip" : "83.149.9.216",
"@version" : "1",
"beat" : {
"hostname" : "My-MacBook-Pro.local",
"name" : "My-MacBook-Pro.local"
},
"host" : "My-MacBook-Pro.local",
"httpversion" : "1.1",
"timestamp" : "04/Jan/2015:05:13:45 +0000"
}
}
},
...
# 另一种查询方式
# 查询地址为Buffalo
# 将$DATA换成特定的日期YYYY.MM.DD格式
curl -XGET 'localhost:9200/logstash-2017.04.10/_search?pretty&q=geoip.city_name=Buffalo'
{
"took" : 3,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"failed" : 0
},
"hits" : {
"total" : 3,
"max_score" : 2.6390574,
"hits" : [
{
"_index" : "logstash-2016.10.11",
"_type" : "log",
"_id" : "AVe14gMjYMkU36o_eVtO",
"_score" : 2.6390574,
"_source" : {
"request" : "/?flav=rss20",
"agent" : "\"-\"",
"geoip" : {
"timezone" : "America/New_York",
"ip" : "108.174.55.234",
"latitude" : 42.9864,
"continent_code" : "NA",
"city_name" : "Buffalo",
"country_code2" : "US",
"country_name" : "United States",
"dma_code" : 514,
"country_code3" : "US",
"region_name" : "New York",
"location" : [
-78.7279,
42.9864
],
"postal_code" : "14221",
"longitude" : -78.7279,
"region_code" : "NY"
},
"offset" : 21471,
"auth" : "-",
"ident" : "-",
"input_type" : "log",
"verb" : "GET",
"source" : "/path/to/file/logstash-tutorial.log",
"message" : "108.174.55.234 - - [04/Jan/2015:05:27:45 +0000] \"GET /?flav=rss20 HTTP/1.1\" 200 29941 \"-\" \"-\"",
"type" : "log",
"tags" : [
"beats_input_codec_plain_applied"
],
"referrer" : "\"-\"",
"@timestamp" : "2016-10-11T22:34:25.318Z",
"response" : "200",
"bytes" : "29941",
"clientip" : "108.174.55.234",
"@version" : "1",
"beat" : {
"hostname" : "My-MacBook-Pro.local",
"name" : "My-MacBook-Pro.local"
},
"host" : "My-MacBook-Pro.local",
"httpversion" : "1.1",
"timestamp" : "04/Jan/2015:05:27:45 +0000"
}
},
...