1. 程式人生 > 實用技巧 >支援熱載入的日誌資訊收集器

支援熱載入的日誌資訊收集器

在分散式系統中,日誌資訊分佈在各個主機上,需要將日誌資訊收集起來便於檢視

主要思路是使用tail庫來讀取日誌檔案,他的優點是可以隨著日誌檔案的更改動態讀取日誌檔案資訊

然後將讀取到的日誌資訊傳送給kafka,以便展示日誌資訊

使用etcd來管理配置資訊,例如需要讀取哪一臺主機的哪個目錄下的日誌檔案,並且當etcd中的配置資訊發生改變以後,可以在不重啟tail和kafka的情況下跟著發生改變,去取其他地方的日誌資訊,這就是熱載入

總體框架圖

然後就是具體關係

main.go

package main

import (
	"fmt"
	"logAgent/conf"
	"logAgent/etcd"
	"logAgent/kafka"
	"logAgent/tailf"
	"sync"
	"time"

	"gopkg.in/ini.v1"
)

var (
	myConf = new(conf.MyConf)
)

func main() {
	// 載入配置檔案
	err := ini.MapTo(myConf, "./conf/config.ini")
	if err != nil {
		fmt.Println("load config file failed,err:", err)
		return
	}
	fmt.Println("load conf file success")
	// init kafka
	err = kafka.Init([]string{myConf.KafkaConf.Address})
	if err != nil {
		fmt.Println("init kafka producer client failed,err:", err)
		return
	}
	fmt.Println("init kafka success")
	// init etcd
	err = etcd.Init(myConf.EtcdConf.Address, time.Duration(myConf.EtcdConf.TimeOut)*time.Second)
	if err != nil {
		fmt.Println("init etcd producer client failed,err:", err)
		return
	}
	fmt.Println("init etcd success")
	// 1.從etcd中獲取tailf的配置
	logConfs, err := etcd.GetLogConf("cyl")
	if err != nil {
		fmt.Println("get log config fail,err:", err)
		return
	}
	for _, logConf := range logConfs {
		fmt.Printf("%s:%s\n", logConf.Topic, logConf.Path)
	}
	// 2.為每一個配置建立一個tail任務
	tailf.Init(logConfs)
	newConfChan := tailf.TailMgrs.GetNewConfChan()
	var wg sync.WaitGroup
	wg.Add(1)
	go etcd.WatchLogConf("cyl", newConfChan)
	wg.Wait()
	// time.Sleep(time.Hour)
}

  載入kafka和etcd的配置檔案

config.ini

[kafka]
address=kafka_ip:9092
topic=gyy

[etcd]
address=etcd_ip:2379
timeout=5

config.go

package conf

// MyConf ...
type MyConf struct {
	KafkaConf `ini:"kafka"`
	EtcdConf  `ini:"etcd"`
}

// KafkaConf ...
type KafkaConf struct {
	Address string `ini:"address"`
	Topic   string `ini:"topic"`
}

// EtcdConf ...
type EtcdConf struct {
	Address string `ini:"address"`
	TimeOut int    `ini:"timeout"`
}

  etcd.go

package etcd

import (
	"context"
	"encoding/json"
	"fmt"
	"time"

	"go.etcd.io/etcd/clientv3"
)

// LogConfig 日誌檔案配置資訊
type LogConfig struct {
	Topic string `json:"topic"`
	Path  string `json:"path"`
}

var (
	// EtcdCli etcd 的客戶端
	EtcdCli *clientv3.Client
)

// Init 初始化etcd
func Init(address string, timeOut time.Duration) (err error) {
	EtcdCli, err = clientv3.New(clientv3.Config{
		Endpoints:   []string{address},
		DialTimeout: timeOut,
	})
	return
}

// GetLogConf get the configration which log should be collected
func GetLogConf(etcdKey string) (logConfig []*LogConfig, err error) {
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	resp, err := EtcdCli.Get(ctx, etcdKey)
	cancel()
	if err != nil {
		fmt.Println("get error")
		return
	}
	for _, ev := range resp.Kvs {
		fmt.Printf("%s:%s\n", ev.Key, ev.Value)
		err = json.Unmarshal(ev.Value, &logConfig)
		if err != nil {
			fmt.Println("get log configuration")
			return
		}
	}
	return
}

// WatchLogConf detect if the log configuration has any change
func WatchLogConf(etcdKey string, newLogConf chan<- []*LogConfig) {
	rch := EtcdCli.Watch(context.Background(), etcdKey)
	for wresp := range rch {
		for _, ev := range wresp.Events {
			fmt.Printf("Type:%s Key:%s Value:%s\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
			var newConf []*LogConfig
			if ev.Type != clientv3.EventTypeDelete {
				err := json.Unmarshal(ev.Kv.Value, &newConf)
				if err != nil {
					fmt.Println("unmaeshal new configuration failed,err:", err)
					return
				}
			}
			newLogConf <- newConf
		}
	}
}

  kafka.go

package kafka

import (
	"fmt"

	"github.com/Shopify/sarama"
)

var (
	// KafkaClient kafka的生產者客戶端
	KafkaClient sarama.SyncProducer
)

// Init 初始化kafka生產者客戶端
func Init(address []string) (err error) {
	config := sarama.NewConfig()
	config.Producer.RequiredAcks = sarama.WaitForAll
	config.Producer.Partitioner = sarama.NewRandomPartitioner
	config.Producer.Return.Successes = true
	KafkaClient, err = sarama.NewSyncProducer(address, config)
	return
}

// SendMsg send the message to kafka
func SendMsg(topic, message string) (err error) {
	msg := &sarama.ProducerMessage{}
	msg.Topic = topic
	msg.Value = sarama.StringEncoder(message)
	pid, offset, err := KafkaClient.SendMessage(msg)
	if err == nil {
		fmt.Printf("send msg successed,pid:%v,offset:%v\n", pid, offset)
	}
	return
}

  tailfMgr.go

package tailf

import (
	"fmt"
	"logAgent/etcd"
	"time"
)

// TailMgrs 例項化的tail工作管理員
var TailMgrs *TailMgr

// TailMgr 儲存了所有的tailfObj
type TailMgr struct {
	LogConfs    []*etcd.LogConfig
	tailTasks   map[string]*TailTask
	newLogConfs chan []*etcd.LogConfig
}

// Init 初始化日誌管理
func Init(logConfs []*etcd.LogConfig) {
	TailMgrs = &TailMgr{
		LogConfs:    logConfs,
		tailTasks:   make(map[string]*TailTask, 16),
		newLogConfs: make(chan []*etcd.LogConfig),
	}
	// 為每一個tail任務建立一個tailObj
	for _, logConf := range logConfs {
		tailTask, err := NewTailTask(logConf.Topic, logConf.Path)
		if err != nil {
			fmt.Println("create a tail task failed,err:", err)
			return
		}
		topicAndPath := fmt.Sprintf("%s_%s", logConf.Topic, logConf.Path)
		TailMgrs.tailTasks[topicAndPath] = tailTask
	}
	go TailMgrs.updateLogConf()
}

// GetNewConfChan return the channel which contain new configuration of log topic and path
func (t *TailMgr) GetNewConfChan() (newConfChan chan<- []*etcd.LogConfig) {
	return t.newLogConfs
}

func (t *TailMgr) updateLogConf() {
	for {
		select {
		case newConfs := <-t.newLogConfs:
			for _, newConf := range newConfs {
				topicAndPath := fmt.Sprintf("%s_%s", newConf.Topic, newConf.Path)
				_, ok := t.tailTasks[topicAndPath]
				// 新增
				if !ok {
					newTailTask, err := NewTailTask(newConf.Topic, newConf.Path)
					if err != nil {
						fmt.Println("create new tail task failed,err:", err)
						return
					}
					t.tailTasks[topicAndPath] = newTailTask
				}
			}
			// 找出刪除的tail任務
			for _, logConf := range t.LogConfs {
				flag := false
				for _, newConf := range newConfs {
					if logConf.Topic == newConf.Topic && logConf.Path == newConf.Path {
						flag = true
						continue
					}
				}
				// 需要關閉這個tail任務
				if !flag {
					topicAndPath := fmt.Sprintf("%s_%s", logConf.Topic, logConf.Path)
					t.tailTasks[topicAndPath].CancelFunc()
					delete(t.tailTasks, topicAndPath)
				}
			}
			t.LogConfs = newConfs
			for _, vv := range newConfs {
				fmt.Printf("%s:%s\n", vv.Topic, vv.Path)
			}
		default:
			time.Sleep(time.Second)
		}
	}
}

  tailf.go

package tailf

import (
	"context"
	"fmt"
	"logAgent/kafka"
	"time"

	"github.com/hpcloud/tail"
)

// var (
// 	// TailObj 全域性變數,tailf控制代碼
// 	TailObj *tail.Tail
// )

// TailTask 是一個tail任務
type TailTask struct {
	Topic      string
	Path       string
	TailObj    *tail.Tail
	Ctx        context.Context
	CancelFunc context.CancelFunc
}

// NewTailTask 新建立一個tail任務
func NewTailTask(topic, path string) (tailTask *TailTask, err error) {
	config := tail.Config{
		ReOpen:    true,                                 //重新開啟
		Follow:    true,                                 //是否跟隨
		Location:  &tail.SeekInfo{Offset: 0, Whence: 2}, //從檔案什麼地方開始讀
		MustExist: false,                                //檔案不存在不報錯
		Poll:      true,
	}
	tailObj, err := tail.TailFile(path, config)
	if err != nil {
		fmt.Printf("create a tail task failed,err:%v\n", err)
		return
	}
	ctx, cancel := context.WithCancel(context.Background())
	tailTask = &TailTask{
		Topic:      topic,
		Path:       path,
		TailObj:    tailObj,
		Ctx:        ctx,
		CancelFunc: cancel,
	}
	go tailTask.run(ctx)
	return
}

func (t *TailTask) run(ctx context.Context) {
	for {
		select {
		case <-ctx.Done():
			return
		case data := <-t.TailObj.Lines:
			kafka.SendMsg(t.Topic, data.Text)
		default:
			time.Sleep(time.Second)
		}
	}
}