首页 > 解决方案 > 如何将iptables日志转成json?

问题描述

我有一些 rsyslog iptables 日志,每一行类似于:

Jul  2 06:24:39 mizar kernel: [1746506.948083] NETFILTER: IN=ens192 OUT= MAC=00:0c:29:7d:e8:00:a4:91:b1:e2:a3:34:08:00 SRC=89.248.165.203 DST=192.168.255.2 LEN=44 TOS=0x00 PREC=0x00 TTL=249 ID=9739 PROTO=TCP SPT=44587 DPT=12005 WINDOW=1024 RES=0x00 SYN URGP=0 

是否有 bash / python /whatever 解决方案将其转换为平面 json 文件?每个标签都必须显示,例如

{ "host": [
 { "name": "mizar",
   "timestamp:" "Jul  2 06:24:39",
   "in" : "ens192"
   "out": "",
   "src": "89.248.165.203",
   "dst": "192.168.255.2",
   ... omissis...
   }
 } 

每个字段都必须转换。

谢谢!

标签: jsoniptables

解决方案


供进一步参考:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Sun Jul 18 11:12:29 2021

@author: mf18626
"""
import json
import ipaddress
import urllib.request
import time

def parse(logLine,ipNations):
    
    GEO_IP_API_URL="http://ip-api.com/json/"
    keywords = ["in=","out=","src=","dst=","proto=","spt=","dpt=","ack","syn","psh","fin","rst"]
    tokenizedLogLine = logLine.split()
    
    # Inizializzo i campi, che saranno comuni a tutti i protocolli; poi qualcuno li popola, qualcuno no
    fields={"timestamp":"", 
            "eventId":"",
            "inIface":"",
            "outIface":"",
            "srcIP":"",
            "dstIP":"",
            "spt":"",
            "dpt":"",
            "proto":"",
            "syn":"",
            "ack":"",
            "psh":"",
            "fin":"",
            "rst":"",
            "srcLocation":"",
            "dstLocation":""
            }
    # Il timestamp è sempre al primo posto
    fields.update({"timestamp":tokenizedLogLine[0]})
    # Utilizzo sempre il quarto campo come identificatore per poi ordinare su elastic    
    fields.update({"eventId":tokenizedLogLine[3]})        

    for token in tokenizedLogLine:
        for keyword in keywords:
            for token in tokenizedLogLine:
                if keyword.lower() in token.lower():
                    if keyword == "in=":
                        fields.update({"inIface":token.lower().split("=")[1]})
                    elif keyword == "out=":
                        fields.update({"outIface":token.lower().split("=")[1]})
                    elif keyword =="src=":
                        fields.update({"srcIP":token.lower().split("=")[1]})
                        isIpPrivate = ipaddress.ip_address(token.lower().split("=")[1]).is_private
                        if isIpPrivate is False:
                            if token.lower().split("=")[1] not in ipNations:
                                now = int(time.time())
                                if len(ipNations["queryCounter"])>=44:
                                    while (now - ipNations["queryCounter"][0] < 60):
                                        tts = now - ipNations["queryCounter"][0]
                                        print("GEO IP API limit reached: throttling %d seconds" %(60-tts))
                                        time.sleep(60-tts)
                                        ipNations["queryCounter"]=list(filter(lambda a: a <=  now-60, ipNations["queryCounter"]))
                                        break
                                    now = int(time.time())
                                ipNations["queryCounter"].append(int(time.time()))                                
                                print("Requesting %s" %token.lower().split("=")[1])
                                ipNations["queryCounter"].append("A")
                                print(ipNations["queryCounter"])
                                req = urllib.request.Request(GEO_IP_API_URL+token.lower().split("=")[1])
                                response = urllib.request.urlopen(req).read()
                                json_response = json.loads(response.decode('utf-8'))
                                try:
                                    fields.update({"srcLocation":json_response['country']})
                                    ipNations[token.lower().split("=")[1]]= json_response['country']
                                except:
                                    pass
                            else:
                                fields.update({"srcLocation":ipNations[token.lower().split("=")[1]]})
                        else:
                            fields.update({"srcLocation":"IANA PrivNet"})
                    elif keyword =="dst=":
                        fields.update({"dstIP":token.lower().split("=")[1]})                   
                        isIpPrivate = ipaddress.ip_address(token.lower().split("=")[1]).is_private
                        if isIpPrivate is False:
                            if token.lower().split("=")[1] not in ipNations:
                                now = int(time.time())
                                if len(ipNations["queryCounter"])>=44:
                                    while (now - ipNations["queryCounter"][0] < 60):
                                        tts = now - ipNations["queryCounter"][0]
                                        print("GEO IP API limit reached: throttling %d seconds" %(60-tts))
                                        time.sleep(60-tts)
                                        ipNations["queryCounter"]=list(filter(lambda a: a <=  now-60, ipNations["queryCounter"]))
                                        break
                                    now = int(time.time())
                                ipNations["queryCounter"].append(int(time.time()))
                                print("Requesting %s" %token.lower().split("=")[1])
                                req = urllib.request.Request(GEO_IP_API_URL+token.lower().split("=")[1])
                                response = urllib.request.urlopen(req).read()
                                json_response = json.loads(response.decode('utf-8'))
                                try:
                                    fields.update({"dstLocation":json_response['country']})
                                    ipNations[token.lower().split("=")[1]]= json_response['country']
                                except:
                                    pass                      
                            else:
                                fields.update({"dstLocation":ipNations[token.lower().split("=")[1]]})
                        else:
                            fields.update({"dstLocation":"IANA PrivNet"})
                    elif keyword =="proto=":
                        fields.update({"proto":token.lower().split("=")[1]})
                    elif keyword =="spt=":
                        fields.update({"spt":token.lower().split("=")[1]})
                    elif keyword =="dpt=":
                        fields.update({"dpt":token.lower().split("=")[1]})
                    elif keyword =="syn":
                        fields.update({"syn":token.lower()})    
                    elif keyword =="ack":
                        fields.update({"ack":token.lower()})    
                    elif keyword =="psh":
                        fields.update({"psh":token.lower()})    
                    elif keyword =="fin":
                        fields.update({"fin":token.lower()})    
                    elif keyword =="rst":
                        fields.update({"rst":token.lower()})    
    return fields
def readlines_then_tail(fin):

    while True:
        line = fin.readline()
        if line:
            yield line
        else:
            tail(fin) 
def tail(fin):
    while True:
        where = fin.tell()
        line = fin.readline()
        if not line:
            time.sleep(1)
            fin.seek(where)
        else:
            yield line    


ipNations = {"queryCounter":[]}
i = 0
with open("logline.log") as logFile:
    for logLine in readlines_then_tail(logFile):
        with open("input/jsonizedlog-"+str(i)+".json","w") as jsonfile:
            json.dump(parse(logLine,ipNations),jsonfile)
            i += 1

推荐阅读