首页 > 解决方案 > AWS 负载均衡器日志从 s3 到弹性搜索

问题描述

我已将我的 ELB 日志启用到 S3 存储桶。我正在尝试使用以下 lambda 脚本将 S3 日志发送到 Elasticsearch。

日志以 *.log.gz 格式存储在 s3 存储桶中。如何以 json 格式将发送 zip 文件发送到 elasticsearch。

我尝试参考 https://docs.aws.amazon.com/elasticsearch-service/latest/developerguide/es-aws-integrations.html#es-aws-integrations-s3-lambda-es没有运气。

让我知道是否有更好的方法来做到这一点。

import boto3
import re
import requests
from requests_aws4auth import AWS4Auth

region = '' # e.g. us-west-1
service = 'es'
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)

host = '' # the Amazon ES domain, including https://
index = 'lambda-s3-index'
type = 'lambda-type'
url = host + '/' + index + '/' + type

headers = { "Content-Type": "application/json" }  
s3 = boto3.client('s3')

# Regular expressions used to parse some simple log lines
ip_pattern = re.compile('(\d+\.\d+\.\d+\.\d+)')
time_pattern = re.compile('\[(\d+\/\w\w\w\/\d\d\d\d:\d\d:\d\d:\d\d\s-\d\d\d\d)\]')

message_pattern = re.compile('\"(.+)\"')

# Lambda execution starts here
def handler(event, context):
    for record in event['Records']:

        # Get the bucket name and key for the new file
        bucket = record['s3']['bucket']['name']
        key = record['s3']['object']['key']

        # Get, read, and split the file into lines
        obj = s3.get_object(Bucket=bucket, Key=key)
        body = obj['Body'].read()
        lines = body.splitlines()

        # Match the regular expressions to each line and index the JSON
        for line in lines:
            ip = ip_pattern.search(line).group(1)
            timestamp = time_pattern.search(line).group(1)
            message = message_pattern.search(line).group(1)

            document = { "ip": ip, "timestamp": timestamp, "message": message }
            r = requests.post(url, auth=awsauth, json=document, headers=headers)

标签: amazon-web-serviceselasticsearchamazon-s3aws-sdkboto3

解决方案


更好的方法是使用安装了 s3 输入插件的 Logstash 来解析 ELB 访问日志并将其发送到弹性搜索。

logstash.conf:

input { 
  s3 {
    access_key_id => "..."
    secret_access_key => "..."
    bucket => "..."
    region => "eu-central-1"
    prefix => "dxlb/AWSLogs/.../elasticloadbalancing/eu-central-1/2019/09/"
    type   => "elb"
  }
}

filter {
   if [type] == "elb" {
    grok {
         match => [ "message", "%{WORD:connection} %{TIMESTAMP_ISO8601:timestamp} %{NOTSPACE:elb} %{IP:clientip}:%{INT:clientport:float} (?:(%{IP:backendip}:?:%{INT:backendport:int})|-) %{NUMBER:request_processing_time:float} %{NUMBER:backend_processing_time:float} %{N
UMBER:response_processing_time:float} (?:-|%{INT:elb_status_code:int}) (?:-|%{INT:backend_status_code:int}) %{INT:received_bytes:int} %{INT:sent_bytes:int} \"%{ELB_REQUEST_LINE}\" \"(?:-|%{DATA:user_agent})\" (?:-|%{NOTSPACE:ssl_cipher}) (?:-|%{NOTSPACE:ssl_protocol})
" ]
        #match => ["message", "%{ELB_ACCESS_LOG} \"%{DATA:userAgent}\"( %{NOTSPACE:ssl_cipher} %{NOTSPACE:ssl_protocol})?"]
   }
   date {
      match => [ "timestamp", "ISO8601" ]
    }
     geoip {
      source => "clientip"
    }
  }
}

output {
if [type] == "elb" {
        elasticsearch {
                hosts => ["http://node:9200"]
                index => "logstash-%{+YYYY.MM}" 
                user => "..."
                password => "..."
        }
}
}

推荐阅读