支援熱載入的日誌資訊處理器
阿新 • • 發佈:2020-08-17
前面我們說了,將日誌資訊從結點的日誌檔案中讀取出來,然後儲存到訊息佇列kafka中,今天我們需要實現的是將kafka中的日誌資訊讀取出來,存入elasticsearch中,然後通過kibana便捷的查詢日誌資訊。
通過etcd來獲取日誌檔案的配置資訊,使用etcd的wtach功能支援熱載入功能。
總體框架圖
具體關係流程圖
main.go
package main import ( "fmt" "logConsumer/config" "logConsumer/elasticsearch" "logConsumer/etcd" "logConsumer/kafka" "sync" "time" "gopkg.in/ini.v1" ) var logConf = new(config.LogConsumerConf) func main() { // 讀配置檔案 err := ini.MapTo(logConf, "./config/config.ini") if err != nil { fmt.Printf("fail to load config, err:%v\n", err) return } fmt.Println("load config file success") // 初始化kafka err = kafka.Init(logConf.KafkaConf.Address) if err != nil { fmt.Printf("fail to init kafka,err:%v\n", err) return } fmt.Println("init kafka success") // 初始化etcd err = etcd.Init(logConf.EtcdConf.Address, time.Duration(logConf.EtcdConf.TimeOut)*time.Second) if err != nil { fmt.Printf("fail to init etcd,err:%v\n", err) return } fmt.Println("init etcd success") // 初始化elasticsearch err = elasticsearch.Init(logConf.ElasticSearchConf.Address) if err != nil { fmt.Printf("fail to init elasticsearch,err:%v\n", err) return } fmt.Println("init elasticsearch success") // 從etcd中讀取配置,獲取所有topic logDetailMsgs, err := etcd.GetConf(logConf.EtcdConf.EtcdKey) for _, logDetail := range logDetailMsgs { fmt.Println(logDetail) } if err != nil { fmt.Printf("get topic and path of the log failed,err:%v\n", err) return } // 對每個topic從kafka讀資料存入elasticsearch kafka.InitMgr(logDetailMsgs) var wg sync.WaitGroup wg.Add(1) go etcd.UpdateConf(logConf.EtcdConf.EtcdKey, kafka.MsgTransferMgr.NewLogConfChan) wg.Wait() //err = kafka.GetLogMsg(logDetailMsgs) //if err != nil { // fmt.Printf("fail to condumer log msg from kafka,err:%v\n", err) // return //} // 每個topic在ES中建立對應的索引,topic對應原始結點的日誌檔案的path和內容存入ES }
config.ini
[kafka] address=kafka_ip:9092 [elasticsearch] address=http://elasticsearch_ip:9200 [etcd] address=etcd_ip:2379 timeout=5 etcd_key=cyl
config.go
package config type LogConsumerConf struct { KafkaConf `ini:"kafka"` EtcdConf `ini:"etcd"` ElasticSearchConf `ini:"elasticsearch"` } type KafkaConf struct { Address string `ini:"address"` } type EtcdConf struct { Address string `ini:"address"` TimeOut int `ini:"timeout"` EtcdKey string `ini:"etcd_key"` } type ElasticSearchConf struct { Address string `ini:"address"` }
elasticsearch.go
package elasticsearch import ( "context" "fmt" "github.com/olivere/elastic/v7" ) type ElasticSearchMsg struct { Topic string `json:"topic"` Path string `json:"path"` Log string `json:"log"` } // ElasticSearchClient ... var ElasticSearchClient *elastic.Client func Init(address string) (err error) { ElasticSearchClient, err = elastic.NewClient(elastic.SetURL(address)) return } // SendToElasticSearch send msg to ES func SendToElasticSearch(msg *ElasticSearchMsg) (err error) { put, err := ElasticSearchClient.Index().Index(msg.Topic).BodyJson(msg).Do(context.Background()) if err != nil { fmt.Printf("fail to send msg to slasticsearch,err:%v\n", err) return } fmt.Printf("Indexed user %s to index %s, type %s\n", put.Id, put.Index, put.Type) return }
etcd.go
package etcd import ( "context" "encoding/json" "fmt" "time" "go.etcd.io/etcd/clientv3" ) type LogDetailMsg struct { Topic string `json:"topic"` Path string `json:"path"` } // EtcdClient ... var EtcdClient *clientv3.Client func Init(address string, timeOut time.Duration) (err error) { EtcdClient, err = clientv3.New(clientv3.Config{ Endpoints: []string{address}, DialTimeout: timeOut, }) return } // GetConf return a slice of topic and path func GetConf(etcdKey string) (logDetailMsgs []*LogDetailMsg, err error) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) resp, err := EtcdClient.Get(ctx, etcdKey) if err != nil { return } cancel() for _, ev := range resp.Kvs { err = json.Unmarshal(ev.Value, &logDetailMsgs) if err != nil { return } } return } func UpdateConf(etcdKey string, newConf chan<- []*LogDetailMsg) { rch := EtcdClient.Watch(context.Background(), etcdKey) for wresp := range rch { for _, ev := range wresp.Events { var nConf []*LogDetailMsg if ev.Type != clientv3.EventTypeDelete { err := json.Unmarshal(ev.Kv.Value, &nConf) if err != nil { fmt.Printf("get newer config msg failed,err:%v\n", err) return } } newConf <- nConf } } }
kafkaMgr.go
package kafka import ( "context" "logConsumer/etcd" "time" ) var MsgTransferMgr *KafkaConsumerMgr // KafkaConsumerMgr ... type KafkaConsumerMgr struct { LogConfs []*etcd.LogDetailMsg KafkaConsumerTasks map[string]*ConsumerTask NewLogConfChan chan []*etcd.LogDetailMsg } func InitMgr(logDetailMsgs []*etcd.LogDetailMsg) { MsgTransferMgr = &KafkaConsumerMgr{ LogConfs: logDetailMsgs, KafkaConsumerTasks: make(map[string]*ConsumerTask, 16), NewLogConfChan: make(chan []*etcd.LogDetailMsg), } for _, logDetailMsg := range logDetailMsgs { ctx, cancel := context.WithCancel(context.Background()) MsgTransferMgr.KafkaConsumerTasks[logDetailMsg.Topic] = &ConsumerTask{ Topic: logDetailMsg.Topic, Path: logDetailMsg.Path, Ctx: ctx, ConsumerTaskCancel: cancel, } go MsgTransferMgr.KafkaConsumerTasks[logDetailMsg.Topic].getLogMsgFromOneTopic() } go MsgTransferMgr.updateConfFromEtcd() } func (k *KafkaConsumerMgr) updateConfFromEtcd() { for { select { case logConfs := <-k.NewLogConfChan: for _, logConf := range logConfs { _, ok := k.KafkaConsumerTasks[logConf.Topic] //存在新新增的配置 if !ok { ctx, cancel := context.WithCancel(context.Background()) k.KafkaConsumerTasks[logConf.Topic] = &ConsumerTask{ Topic: logConf.Topic, Path: logConf.Path, Ctx: ctx, ConsumerTaskCancel: cancel, } go k.KafkaConsumerTasks[logConf.Topic].getLogMsgFromOneTopic() } } // 停止刪除的任務 for _, logDetail := range k.LogConfs { flag := false for _, newConf := range logConfs { if logDetail.Topic == newConf.Topic { flag = true } } if !flag { // 刪除 k.KafkaConsumerTasks[logDetail.Topic].ConsumerTaskCancel() delete(k.KafkaConsumerTasks, logDetail.Topic) } } k.LogConfs = logConfs default: time.Sleep(time.Second) } } }
kafka.go
package kafka import ( "context" "fmt" "logConsumer/elasticsearch" "github.com/Shopify/sarama" ) type ConsumerTask struct { Topic string Path string Ctx context.Context ConsumerTaskCancel context.CancelFunc } // KafkaConsumer ... var KafkaConsumer sarama.Consumer func Init(address string) (err error) { KafkaConsumer, err = sarama.NewConsumer([]string{address}, nil) return } //func GetLogMsg(logDetailMsgs []*etcd.LogDetailMsg) (err error) { // for _, logDetailMsg := range logDetailMsgs { // go getLogMsgFromOneTopic(logDetailMsg.Topic) // } // return //} func (c *ConsumerTask) getLogMsgFromOneTopic() { fmt.Printf("getLogMsgFromOneTopic,topic=%v\n", c.Topic) partitionList, err := KafkaConsumer.Partitions(c.Topic) fmt.Println(c.Topic, partitionList) if err != nil { fmt.Printf("get partitions from kafka failed,err:%v\n", err) } for partition := range partitionList { pc, err := KafkaConsumer.ConsumePartition(c.Topic, int32(partition), sarama.OffsetNewest) if err != nil { fmt.Printf("read partition failed,err:%v\n", err) } defer pc.AsyncClose() go c.getAndSendMsg(pc, c.Ctx) } for { select { case <-c.Ctx.Done(): return default: continue } } } func (c *ConsumerTask) getAndSendMsg(partitionConsumer sarama.PartitionConsumer, ctx context.Context) { tmpMsg := &elasticsearch.ElasticSearchMsg{ Topic: c.Topic, Path: c.Path, } for msg := range partitionConsumer.Messages() { fmt.Println("gyy", tmpMsg) fmt.Println(string(msg.Value)) tmpMsg.Log = string(msg.Value) err := elasticsearch.SendToElasticSearch(tmpMsg) if err != nil { fmt.Printf("fail to send msg to ES,err:%v\n", err) } select { case <-ctx.Done(): return default: continue } } }
go.mod
module logConsumer go 1.13 require ( github.com/Shopify/sarama v1.19.0 github.com/coreos/etcd v3.3.22+incompatible // indirect github.com/coreos/go-semver v0.3.0 // indirect github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf // indirect github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect github.com/google/uuid v1.1.1 // indirect github.com/olivere/elastic/v7 v7.0.4 go.etcd.io/etcd v3.3.22+incompatible go.uber.org/zap v1.15.0 // indirect gopkg.in/ini.v1 v1.58.0 )