amazon-web-services - AWS 负载均衡器日志从 s3 到弹性搜索
问题描述
我已将我的 ELB 日志启用到 S3 存储桶。我正在尝试使用以下 lambda 脚本将 S3 日志发送到 Elasticsearch。
日志以 *.log.gz 格式存储在 s3 存储桶中。如何以 json 格式将发送 zip 文件发送到 elasticsearch。
让我知道是否有更好的方法来做到这一点。
import boto3
import re
import requests
from requests_aws4auth import AWS4Auth
region = '' # e.g. us-west-1
service = 'es'
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)
host = '' # the Amazon ES domain, including https://
index = 'lambda-s3-index'
type = 'lambda-type'
url = host + '/' + index + '/' + type
headers = { "Content-Type": "application/json" }
s3 = boto3.client('s3')
# Regular expressions used to parse some simple log lines
ip_pattern = re.compile('(\d+\.\d+\.\d+\.\d+)')
time_pattern = re.compile('\[(\d+\/\w\w\w\/\d\d\d\d:\d\d:\d\d:\d\d\s-\d\d\d\d)\]')
message_pattern = re.compile('\"(.+)\"')
# Lambda execution starts here
def handler(event, context):
for record in event['Records']:
# Get the bucket name and key for the new file
bucket = record['s3']['bucket']['name']
key = record['s3']['object']['key']
# Get, read, and split the file into lines
obj = s3.get_object(Bucket=bucket, Key=key)
body = obj['Body'].read()
lines = body.splitlines()
# Match the regular expressions to each line and index the JSON
for line in lines:
ip = ip_pattern.search(line).group(1)
timestamp = time_pattern.search(line).group(1)
message = message_pattern.search(line).group(1)
document = { "ip": ip, "timestamp": timestamp, "message": message }
r = requests.post(url, auth=awsauth, json=document, headers=headers)
解决方案
更好的方法是使用安装了 s3 输入插件的 Logstash 来解析 ELB 访问日志并将其发送到弹性搜索。
logstash.conf:
input {
s3 {
access_key_id => "..."
secret_access_key => "..."
bucket => "..."
region => "eu-central-1"
prefix => "dxlb/AWSLogs/.../elasticloadbalancing/eu-central-1/2019/09/"
type => "elb"
}
}
filter {
if [type] == "elb" {
grok {
match => [ "message", "%{WORD:connection} %{TIMESTAMP_ISO8601:timestamp} %{NOTSPACE:elb} %{IP:clientip}:%{INT:clientport:float} (?:(%{IP:backendip}:?:%{INT:backendport:int})|-) %{NUMBER:request_processing_time:float} %{NUMBER:backend_processing_time:float} %{N
UMBER:response_processing_time:float} (?:-|%{INT:elb_status_code:int}) (?:-|%{INT:backend_status_code:int}) %{INT:received_bytes:int} %{INT:sent_bytes:int} \"%{ELB_REQUEST_LINE}\" \"(?:-|%{DATA:user_agent})\" (?:-|%{NOTSPACE:ssl_cipher}) (?:-|%{NOTSPACE:ssl_protocol})
" ]
#match => ["message", "%{ELB_ACCESS_LOG} \"%{DATA:userAgent}\"( %{NOTSPACE:ssl_cipher} %{NOTSPACE:ssl_protocol})?"]
}
date {
match => [ "timestamp", "ISO8601" ]
}
geoip {
source => "clientip"
}
}
}
output {
if [type] == "elb" {
elasticsearch {
hosts => ["http://node:9200"]
index => "logstash-%{+YYYY.MM}"
user => "..."
password => "..."
}
}
}
推荐阅读
- datastax - 将 LocalDateTime 转换为 Cassandra TIMESTAMP 时的编解码器问题
- python - Python程序打印出提供的子字符串的模式在提供的字符串中匹配的次数
- node.js - Mongodb _id 在同一个集合中有两种不同的数据类型
- powershell - 如何找到有权访问共享邮箱的特定用户?
- python - PySpark df.write 正在将不同数量的记录加载到不同的表中
- r - 识别在数据框中复制的行
- android - NavigationUI.navigateUp 什么都不做
- c# - 安装驱动程序后检测新的 USB 电缆/设备
- angular - Angular 虚拟滚动和 Browseranimation Renderbug
- javascript - 为什么我无法访问此 DataTable 中隐藏列的文本?