支援熱載入的日誌資訊收集器
阿新 • • 發佈:2020-08-12
在分散式系統中,日誌資訊分佈在各個主機上,需要將日誌資訊收集起來便於檢視
主要思路是使用tail庫來讀取日誌檔案,他的優點是可以隨著日誌檔案的更改動態讀取日誌檔案資訊
然後將讀取到的日誌資訊傳送給kafka,以便展示日誌資訊
使用etcd來管理配置資訊,例如需要讀取哪一臺主機的哪個目錄下的日誌檔案,並且當etcd中的配置資訊發生改變以後,可以在不重啟tail和kafka的情況下跟著發生改變,去取其他地方的日誌資訊,這就是熱載入
總體框架圖
然後就是具體關係
main.go
package main import ( "fmt" "logAgent/conf" "logAgent/etcd" "logAgent/kafka" "logAgent/tailf" "sync" "time" "gopkg.in/ini.v1" ) var ( myConf = new(conf.MyConf) ) func main() { // 載入配置檔案 err := ini.MapTo(myConf, "./conf/config.ini") if err != nil { fmt.Println("load config file failed,err:", err) return } fmt.Println("load conf file success") // init kafka err = kafka.Init([]string{myConf.KafkaConf.Address}) if err != nil { fmt.Println("init kafka producer client failed,err:", err) return } fmt.Println("init kafka success") // init etcd err = etcd.Init(myConf.EtcdConf.Address, time.Duration(myConf.EtcdConf.TimeOut)*time.Second) if err != nil { fmt.Println("init etcd producer client failed,err:", err) return } fmt.Println("init etcd success") // 1.從etcd中獲取tailf的配置 logConfs, err := etcd.GetLogConf("cyl") if err != nil { fmt.Println("get log config fail,err:", err) return } for _, logConf := range logConfs { fmt.Printf("%s:%s\n", logConf.Topic, logConf.Path) } // 2.為每一個配置建立一個tail任務 tailf.Init(logConfs) newConfChan := tailf.TailMgrs.GetNewConfChan() var wg sync.WaitGroup wg.Add(1) go etcd.WatchLogConf("cyl", newConfChan) wg.Wait() // time.Sleep(time.Hour) }
載入kafka和etcd的配置檔案
config.ini
[kafka] address=kafka_ip:9092 topic=gyy [etcd] address=etcd_ip:2379 timeout=5
config.go
package conf // MyConf ... type MyConf struct { KafkaConf `ini:"kafka"` EtcdConf `ini:"etcd"` } // KafkaConf ... type KafkaConf struct { Address string `ini:"address"` Topic string `ini:"topic"` } // EtcdConf ... type EtcdConf struct { Address string `ini:"address"` TimeOut int `ini:"timeout"` }
etcd.go
package etcd import ( "context" "encoding/json" "fmt" "time" "go.etcd.io/etcd/clientv3" ) // LogConfig 日誌檔案配置資訊 type LogConfig struct { Topic string `json:"topic"` Path string `json:"path"` } var ( // EtcdCli etcd 的客戶端 EtcdCli *clientv3.Client ) // Init 初始化etcd func Init(address string, timeOut time.Duration) (err error) { EtcdCli, err = clientv3.New(clientv3.Config{ Endpoints: []string{address}, DialTimeout: timeOut, }) return } // GetLogConf get the configration which log should be collected func GetLogConf(etcdKey string) (logConfig []*LogConfig, err error) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) resp, err := EtcdCli.Get(ctx, etcdKey) cancel() if err != nil { fmt.Println("get error") return } for _, ev := range resp.Kvs { fmt.Printf("%s:%s\n", ev.Key, ev.Value) err = json.Unmarshal(ev.Value, &logConfig) if err != nil { fmt.Println("get log configuration") return } } return } // WatchLogConf detect if the log configuration has any change func WatchLogConf(etcdKey string, newLogConf chan<- []*LogConfig) { rch := EtcdCli.Watch(context.Background(), etcdKey) for wresp := range rch { for _, ev := range wresp.Events { fmt.Printf("Type:%s Key:%s Value:%s\n", ev.Type, ev.Kv.Key, ev.Kv.Value) var newConf []*LogConfig if ev.Type != clientv3.EventTypeDelete { err := json.Unmarshal(ev.Kv.Value, &newConf) if err != nil { fmt.Println("unmaeshal new configuration failed,err:", err) return } } newLogConf <- newConf } } }
kafka.go
package kafka import ( "fmt" "github.com/Shopify/sarama" ) var ( // KafkaClient kafka的生產者客戶端 KafkaClient sarama.SyncProducer ) // Init 初始化kafka生產者客戶端 func Init(address []string) (err error) { config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Partitioner = sarama.NewRandomPartitioner config.Producer.Return.Successes = true KafkaClient, err = sarama.NewSyncProducer(address, config) return } // SendMsg send the message to kafka func SendMsg(topic, message string) (err error) { msg := &sarama.ProducerMessage{} msg.Topic = topic msg.Value = sarama.StringEncoder(message) pid, offset, err := KafkaClient.SendMessage(msg) if err == nil { fmt.Printf("send msg successed,pid:%v,offset:%v\n", pid, offset) } return }
tailfMgr.go
package tailf import ( "fmt" "logAgent/etcd" "time" ) // TailMgrs 例項化的tail工作管理員 var TailMgrs *TailMgr // TailMgr 儲存了所有的tailfObj type TailMgr struct { LogConfs []*etcd.LogConfig tailTasks map[string]*TailTask newLogConfs chan []*etcd.LogConfig } // Init 初始化日誌管理 func Init(logConfs []*etcd.LogConfig) { TailMgrs = &TailMgr{ LogConfs: logConfs, tailTasks: make(map[string]*TailTask, 16), newLogConfs: make(chan []*etcd.LogConfig), } // 為每一個tail任務建立一個tailObj for _, logConf := range logConfs { tailTask, err := NewTailTask(logConf.Topic, logConf.Path) if err != nil { fmt.Println("create a tail task failed,err:", err) return } topicAndPath := fmt.Sprintf("%s_%s", logConf.Topic, logConf.Path) TailMgrs.tailTasks[topicAndPath] = tailTask } go TailMgrs.updateLogConf() } // GetNewConfChan return the channel which contain new configuration of log topic and path func (t *TailMgr) GetNewConfChan() (newConfChan chan<- []*etcd.LogConfig) { return t.newLogConfs } func (t *TailMgr) updateLogConf() { for { select { case newConfs := <-t.newLogConfs: for _, newConf := range newConfs { topicAndPath := fmt.Sprintf("%s_%s", newConf.Topic, newConf.Path) _, ok := t.tailTasks[topicAndPath] // 新增 if !ok { newTailTask, err := NewTailTask(newConf.Topic, newConf.Path) if err != nil { fmt.Println("create new tail task failed,err:", err) return } t.tailTasks[topicAndPath] = newTailTask } } // 找出刪除的tail任務 for _, logConf := range t.LogConfs { flag := false for _, newConf := range newConfs { if logConf.Topic == newConf.Topic && logConf.Path == newConf.Path { flag = true continue } } // 需要關閉這個tail任務 if !flag { topicAndPath := fmt.Sprintf("%s_%s", logConf.Topic, logConf.Path) t.tailTasks[topicAndPath].CancelFunc() delete(t.tailTasks, topicAndPath) } } t.LogConfs = newConfs for _, vv := range newConfs { fmt.Printf("%s:%s\n", vv.Topic, vv.Path) } default: time.Sleep(time.Second) } } }
tailf.go
package tailf import ( "context" "fmt" "logAgent/kafka" "time" "github.com/hpcloud/tail" ) // var ( // // TailObj 全域性變數,tailf控制代碼 // TailObj *tail.Tail // ) // TailTask 是一個tail任務 type TailTask struct { Topic string Path string TailObj *tail.Tail Ctx context.Context CancelFunc context.CancelFunc } // NewTailTask 新建立一個tail任務 func NewTailTask(topic, path string) (tailTask *TailTask, err error) { config := tail.Config{ ReOpen: true, //重新開啟 Follow: true, //是否跟隨 Location: &tail.SeekInfo{Offset: 0, Whence: 2}, //從檔案什麼地方開始讀 MustExist: false, //檔案不存在不報錯 Poll: true, } tailObj, err := tail.TailFile(path, config) if err != nil { fmt.Printf("create a tail task failed,err:%v\n", err) return } ctx, cancel := context.WithCancel(context.Background()) tailTask = &TailTask{ Topic: topic, Path: path, TailObj: tailObj, Ctx: ctx, CancelFunc: cancel, } go tailTask.run(ctx) return } func (t *TailTask) run(ctx context.Context) { for { select { case <-ctx.Done(): return case data := <-t.TailObj.Lines: kafka.SendMsg(t.Topic, data.Text) default: time.Sleep(time.Second) } } }