在分布式系统中,日志信息分布在各个主机上,需要将日志信息收集起来便于查看
主要思路是使用tail库来读取日志文件,他的优点是可以随着日志文件的更改动态读取日志文件信息
然后将读取到的日志信息发送给kafka,以便展示日志信息
使用etcd来管理配置信息,例如需要读取哪一台主机的哪个目录下的日志文件,并且当etcd中的配置信息发生改变以后,可以在不重启tail和kafka的情况下跟着发生改变,去取其他地方的日志信息,这就是热加载
总体框架图
然后就是具体关系
main.go
package main import ( "fmt" "logAgent/conf" "logAgent/etcd" "logAgent/kafka" "logAgent/tailf" "sync" "time" "gopkg.in/ini.v1" ) var ( myConf = new(conf.MyConf) ) func main() { // 加载配置文件 err := ini.MapTo(myConf, "./conf/config.ini") if err != nil { fmt.Println("load config file failed,err:", err) return } fmt.Println("load conf file success") // init kafka err = kafka.Init([]string{myConf.KafkaConf.Address}) if err != nil { fmt.Println("init kafka producer client failed,err:", err) return } fmt.Println("init kafka success") // init etcd err = etcd.Init(myConf.EtcdConf.Address, time.Duration(myConf.EtcdConf.TimeOut)*time.Second) if err != nil { fmt.Println("init etcd producer client failed,err:", err) return } fmt.Println("init etcd success") // 1.从etcd中获取tailf的配置 logConfs, err := etcd.GetLogConf("cyl") if err != nil { fmt.Println("get log config fail,err:", err) return } for _, logConf := range logConfs { fmt.Printf("%s:%s\n", logConf.Topic, logConf.Path) } // 2.为每一个配置创建一个tail任务 tailf.Init(logConfs) newConfChan := tailf.TailMgrs.GetNewConfChan() var wg sync.WaitGroup wg.Add(1) go etcd.WatchLogConf("cyl", newConfChan) wg.Wait() // time.Sleep(time.Hour) }
加载kafka和etcd的配置文件
config.ini
[kafka] address=kafka_ip:9092 topic=gyy [etcd] address=etcd_ip:2379 timeout=5
config.go
package conf // MyConf ... type MyConf struct { KafkaConf `ini:"kafka"` EtcdConf `ini:"etcd"` } // KafkaConf ... type KafkaConf struct { Address string `ini:"address"` Topic string `ini:"topic"` } // EtcdConf ... type EtcdConf struct { Address string `ini:"address"` TimeOut int `ini:"timeout"` }
etcd.go
package etcd import ( "context" "encoding/json" "fmt" "time" "go.etcd.io/etcd/clientv3" ) // LogConfig 日志文件配置信息 type LogConfig struct { Topic string `json:"topic"` Path string `json:"path"` } var ( // EtcdCli etcd 的客户端 EtcdCli *clientv3.Client ) // Init 初始化etcd func Init(address string, timeOut time.Duration) (err error) { EtcdCli, err = clientv3.New(clientv3.Config{ Endpoints: []string{address}, DialTimeout: timeOut, }) return } // GetLogConf get the configration which log should be collected func GetLogConf(etcdKey string) (logConfig []*LogConfig, err error) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) resp, err := EtcdCli.Get(ctx, etcdKey) cancel() if err != nil { fmt.Println("get error") return } for _, ev := range resp.Kvs { fmt.Printf("%s:%s\n", ev.Key, ev.Value) err = json.Unmarshal(ev.Value, &logConfig) if err != nil { fmt.Println("get log configuration") return } } return } // WatchLogConf detect if the log configuration has any change func WatchLogConf(etcdKey string, newLogConf chan<- []*LogConfig) { rch := EtcdCli.Watch(context.Background(), etcdKey) for wresp := range rch { for _, ev := range wresp.Events { fmt.Printf("Type:%s Key:%s Value:%s\n", ev.Type, ev.Kv.Key, ev.Kv.Value) var newConf []*LogConfig if ev.Type != clientv3.EventTypeDelete { err := json.Unmarshal(ev.Kv.Value, &newConf) if err != nil { fmt.Println("unmaeshal new configuration failed,err:", err) return } } newLogConf <- newConf } } }
kafka.go
package kafka import ( "fmt" "github.com/Shopify/sarama" ) var ( // KafkaClient kafka的生产者客户端 KafkaClient sarama.SyncProducer ) // Init 初始化kafka生产者客户端 func Init(address []string) (err error) { config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Partitioner = sarama.NewRandomPartitioner config.Producer.Return.Successes = true KafkaClient, err = sarama.NewSyncProducer(address, config) return } // SendMsg send the message to kafka func SendMsg(topic, message string) (err error) { msg := &sarama.ProducerMessage{} msg.Topic = topic msg.Value = sarama.StringEncoder(message) pid, offset, err := KafkaClient.SendMessage(msg) if err == nil { fmt.Printf("send msg successed,pid:%v,offset:%v\n", pid, offset) } return }
tailfMgr.go
package tailf import ( "fmt" "logAgent/etcd" "time" ) // TailMgrs 实例化的tail任务管理器 var TailMgrs *TailMgr // TailMgr 保存了所有的tailfObj type TailMgr struct { LogConfs []*etcd.LogConfig tailTasks map[string]*TailTask newLogConfs chan []*etcd.LogConfig } // Init 初始化日志管理 func Init(logConfs []*etcd.LogConfig) { TailMgrs = &TailMgr{ LogConfs: logConfs, tailTasks: make(map[string]*TailTask, 16), newLogConfs: make(chan []*etcd.LogConfig), } // 为每一个tail任务创建一个tailObj for _, logConf := range logConfs { tailTask, err := NewTailTask(logConf.Topic, logConf.Path) if err != nil { fmt.Println("create a tail task failed,err:", err) return } topicAndPath := fmt.Sprintf("%s_%s", logConf.Topic, logConf.Path) TailMgrs.tailTasks[topicAndPath] = tailTask } go TailMgrs.updateLogConf() } // GetNewConfChan return the channel which contain new configuration of log topic and path func (t *TailMgr) GetNewConfChan() (newConfChan chan<- []*etcd.LogConfig) { return t.newLogConfs } func (t *TailMgr) updateLogConf() { for { select { case newConfs := <-t.newLogConfs: for _, newConf := range newConfs { topicAndPath := fmt.Sprintf("%s_%s", newConf.Topic, newConf.Path) _, ok := t.tailTasks[topicAndPath] // 新增 if !ok { newTailTask, err := NewTailTask(newConf.Topic, newConf.Path) if err != nil { fmt.Println("create new tail task failed,err:", err) return } t.tailTasks[topicAndPath] = newTailTask } } // 找出删除的tail任务 for _, logConf := range t.LogConfs { flag := false for _, newConf := range newConfs { if logConf.Topic == newConf.Topic && logConf.Path == newConf.Path { flag = true continue } } // 需要关闭这个tail任务 if !flag { topicAndPath := fmt.Sprintf("%s_%s", logConf.Topic, logConf.Path) t.tailTasks[topicAndPath].CancelFunc() delete(t.tailTasks, topicAndPath) } } t.LogConfs = newConfs for _, vv := range newConfs { fmt.Printf("%s:%s\n", vv.Topic, vv.Path) } default: time.Sleep(time.Second) } } }
tailf.go
package tailf import ( "context" "fmt" "logAgent/kafka" "time" "github.com/hpcloud/tail" ) // var ( // // TailObj 全局变量,tailf句柄 // TailObj *tail.Tail // ) // TailTask 是一个tail任务 type TailTask struct { Topic string Path string TailObj *tail.Tail Ctx context.Context CancelFunc context.CancelFunc } // NewTailTask 新创建一个tail任务 func NewTailTask(topic, path string) (tailTask *TailTask, err error) { config := tail.Config{ ReOpen: true, //重新打开 Follow: true, //是否跟随 Location: &tail.SeekInfo{Offset: 0, Whence: 2}, //从文件什么地方开始读 MustExist: false, //文件不存在不报错 Poll: true, } tailObj, err := tail.TailFile(path, config) if err != nil { fmt.Printf("create a tail task failed,err:%v\n", err) return } ctx, cancel := context.WithCancel(context.Background()) tailTask = &TailTask{ Topic: topic, Path: path, TailObj: tailObj, Ctx: ctx, CancelFunc: cancel, } go tailTask.run(ctx) return } func (t *TailTask) run(ctx context.Context) { for { select { case <-ctx.Done(): return case data := <-t.TailObj.Lines: kafka.SendMsg(t.Topic, data.Text) default: time.Sleep(time.Second) } } }
go.mod
module logAgent go 1.13 require ( github.com/Shopify/sarama v1.27.0 github.com/coreos/etcd v3.3.22+incompatible // indirect github.com/coreos/go-semver v0.3.0 // indirect github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect github.com/gogo/protobuf v1.3.1 // indirect github.com/google/uuid v1.1.1 // indirect github.com/hpcloud/tail v1.0.0 go.etcd.io/etcd v3.3.22+incompatible go.uber.org/zap v1.15.0 // indirect google.golang.org/grpc v1.26.0 // indirect gopkg.in/fsnotify.v1 v1.4.7 // indirect gopkg.in/ini.v1 v1.58.0 gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect )