golang基礎-beego讀取配置_輸出log日誌、tailf元件讀取log、配置zookeeper_kafka、傳送log至kafka
阿新 • • 發佈:2018-12-22
在前面3篇博文中已經學習了
今天我們來整合這些demo,寫一個log日誌收集傳送kafka的小專案
專案的流程框架圖
專案的結構圖:
1、載入配置檔案loadConf,封裝結構體
[logs]
log_level=debug
log_path=E:\golang\go_pro\logs\logagent.log
[collect]
log_path=E:\golang\go_pro\logs\logagent.log
topic=nginx_log
chan_size=100
[kafka]
server_addr=192.168.21.8:9092
簡單說下配置資訊的作用
[logs]是log輸出級別,以及log輸出的檔案地址路徑
[collect]是要讀取的log日誌地址,然後利用topic,啟動goroutine傳送給kafka
[kafka]kafka關聯的ip埠
我們將配置資訊封裝成結構體,然後在定義一個全域性變數來進行使用
var (
appConfig *Config
)
type Config struct {
logLevel string
logPath string
chanSize int
kafkaAddr string
collectConf []tailf.CollectConf
}
在結構體中collectConf 是一個數組,因為我們傳送kafka時候,可能是多個不同路徑+topic(此例我們只用了一個)
type CollectConf struct {
LogPath string
Topic string
}
2、初始化beego的log元件
func initLogger()(err error) {
config := make(map[string]interface{})
config["filename"] = appConfig.logPath
config["level"] = convertLogLevel(appConfig.logLevel)
configStr, err := json.Marshal(config)
if err != nil {
fmt.Println("initLogger failed, marshal err:" , err)
return
}
logs.SetLogger(logs.AdapterFile, string(configStr))
//{"filename":"E:\\golang\\go_pro\\logs\\logagent.log","level":7}
fmt.Println(string(configStr))
return
}
3、初始化tailf
在初始化goroutine模組,輸出log日誌,我們需要設計幾個結構體
在初始化配置資訊中提到了結構體Config,裡面的 collectConf []tailf.CollectConf
我們在封裝如下2個結構體
TailObj 結構體是利用tail.Lines讀取CollectConf路徑下的資訊
type TailObj struct {
tail *tail.Tail
conf CollectConf
}
TailObjMgr 結構體是tail.Lines讀取CollectConf路徑下的資訊時候, 存放到chan管道中,tailObjs 這可能是多個不同路徑+topic(此例我們只用了一個)
type TailObjMgr struct {
tailObjs []*TailObj
msgChan chan *TextMsg
}
然後將tailf初始化的操作貼出來
func InitTail(conf []CollectConf, chanSize int) (err error) {
if len(conf) == 0 {
err = fmt.Errorf("invalid config for log collect, conf:%v", conf)
return
}
tailObjMgr = &TailObjMgr{
msgChan: make(chan*TextMsg, chanSize),
}
////appConfig.collectConf [{E:\golang\go_pro\logs\logagent.log nginx_log}]
for _, v := range conf {
obj := &TailObj{
conf: v,
}
//v--- {E:\golang\go_pro\logs\logagent.log nginx_log}
fmt.Println("v---",v)
tails, errTail := tail.TailFile(v.LogPath, tail.Config{
ReOpen: true,
Follow: true,
//Location: &tail.SeekInfo{Offset: 0, Whence: 2},
MustExist: false,
Poll: true,
})
if errTail != nil {
err = errTail
return
}
obj.tail = tails
tailObjMgr.tailObjs = append(tailObjMgr.tailObjs, obj)
go readFromTail(obj)
}
return
}
func readFromTail(tailObj *TailObj) {
for true {
line, ok := <-tailObj.tail.Lines
if !ok {
logs.Warn("tail file close reopen, filename:%s\n", tailObj.tail.Filename)
time.Sleep(100 * time.Millisecond)
continue
}
textMsg := &TextMsg{
Msg:line.Text,
Topic: tailObj.conf.Topic,
}
tailObjMgr.msgChan <- textMsg
}
}
4、初始化kafka
/*初始化kafka*/
func InitKafka(addr string) (err error){
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Partitioner = sarama.NewRandomPartitioner
config.Producer.Return.Successes = true
client, err = sarama.NewSyncProducer([]string{addr}, config)
if err != nil {
logs.Error("init kafka producer failed, err:", err)
return
}
//記錄步驟資訊
logs.Debug("init kafka succ")
return
}
以前寫過關於kafka的例子,就不再詳細介紹了
5、tailf讀取
從管道中讀取即可
//從chan中取出
msg := tailf.GetOneLine()
fmt.Println(msg)
func GetOneLine()(msg *TextMsg) {
msg = <- tailObjMgr.msgChan
return
}
利用fmt.Println(msg)進行測試,我同時輸出到控制檯上
6、傳送資料kafka
func SendToKafka(data, topic string)(err error) {
msg := &sarama.ProducerMessage{}
msg.Topic = topic
msg.Value = sarama.StringEncoder(data)
pid, offset, err := client.SendMessage(msg)
if err != nil {
logs.Error("send message failed, err:%v data:%v topic:%v", err, data, topic)
return
}
logs.Debug("send succ, pid:%v offset:%v, topic:%v\n", pid, offset, topic)
return
}
傳送kafka的操作也是寫過demo例子的,這裡就不在詳細介紹了
7、啟動zookeeper,kafka測試
8、檢視測試效果
終端輸出+log日誌輸出目錄
以下是輸出到kafka的效果圖
程式碼區
main.go
package main
import(
"fmt"
"github.com/astaxie/beego/logs"
"logagent/kafka"
"logagent/tailf"
// "time"
)
func main() {
/*
載入配置檔案logagent.conf資訊
*/
filename := "E:/golang/go_pro/logagent.conf"
err := loadConf("ini", filename)
if err != nil {
fmt.Printf("load conf failed, err:%v\n", err)
panic("load conf failed")
return
}
/*
初始化beego/logs的一些功能,設定輸出目錄
*/
err = initLogger()
if err != nil {
fmt.Printf("load logger failed, err:%v\n", err)
panic("load logger failed")
return
}
/*先測試將log輸出配置正確,輸出到logagent.log中*/
logs.Debug("load conf succ, config:%v", appConfig)
/*初始化tailf日誌元件 */
//appConfig.collectConf [{E:\golang\go_pro\logs\logagent.log nginx_log}]
fmt.Println("appConfig.collectConf",appConfig.collectConf)
err = tailf.InitTail(appConfig.collectConf, appConfig.chanSize)
if err != nil {
logs.Error("init tail failed, err:%v", err)
return
}
/*先測試將tailf配置正確,輸出到logagent.log中*/
logs.Debug("initialize tailf succ")
/*初始kafka的工作*/
err = kafka.InitKafka(appConfig.kafkaAddr)
if err != nil {
logs.Error("init tail failed, err:%v", err)
return
}
logs.Debug("initialize all succ")
err = serverRun()
if err != nil {
logs.Error("serverRUn failed, err:%v", err)
return
}
logs.Info("program exited")
}
config.go
package main
import(
"fmt"
"errors"
"github.com/astaxie/beego/config"
"logagent/tailf"
)
var (
appConfig *Config
)
type Config struct {
logLevel string
logPath string
chanSize int
kafkaAddr string
collectConf []tailf.CollectConf
}
func loadCollectConf(conf config.Configer) (err error ) {
var cc tailf.CollectConf
cc.LogPath = conf.String("collect::log_path")
if len(cc.LogPath) == 0 {
err = errors.New("invalid collect::log_path")
return
}
cc.Topic = conf.String("collect::topic")
if len(cc.LogPath) == 0 {
err = errors.New("invalid collect::topic")
return
}
appConfig.collectConf = append(appConfig.collectConf, cc)
return
}
/*
載入配置檔案資訊
[logs]
log_level=debug
log_path=E:\golang\go_pro\logs\logagent.log
[collect]
log_path=E:\golang\go_pro\logs\logagent.log
topic=nginx_log
chan_size=100
[kafka]
server_addr=192.168.21.8:9092
*/
func loadConf(confType, filename string) (err error) {
conf, err := config.NewConfig(confType, filename)
if err != nil {
fmt.Println("new config failed, err:", err)
return
}
/*定義一個全域性變數儲存
var appConfig *Config
*/
appConfig = &Config{}
appConfig.logLevel = conf.String("logs::log_level")
if len(appConfig.logLevel) == 0 {
appConfig.logLevel = "debug"
}
appConfig.logPath = conf.String("logs::log_path")
if len(appConfig.logPath) == 0 {
appConfig.logPath = "E:\\golang\\go_pro\\logs\\logagent.log"
}
appConfig.chanSize, err = conf.Int("collect::chan_size")
if err != nil {
appConfig.chanSize = 100
}
appConfig.kafkaAddr = conf.String("kafka::server_addr")
if len(appConfig.kafkaAddr) == 0 {
err = fmt.Errorf("invalid kafka addr")
return
}
err = loadCollectConf(conf)
if err != nil {
fmt.Printf("load collect conf failed, err:%v\n", err)
return
}
return
}
log.go
package main
import (
"encoding/json"
"fmt"
"github.com/astaxie/beego/logs"
)
func convertLogLevel(level string) int {
switch (level) {
case "debug":
return logs.LevelDebug
case "warn":
return logs.LevelWarn
case "info":
return logs.LevelInfo
case "trace":
return logs.LevelTrace
}
return logs.LevelDebug
}
/*
初始化beego/logs的一些功能,設定輸出目錄
*/
func initLogger()(err error) {
config := make(map[string]interface{})
config["filename"] = appConfig.logPath
config["level"] = convertLogLevel(appConfig.logLevel)
configStr, err := json.Marshal(config)
if err != nil {
fmt.Println("initLogger failed, marshal err:", err)
return
}
logs.SetLogger(logs.AdapterFile, string(configStr))
//{"filename":"E:\\golang\\go_pro\\logs\\logagent.log","level":7}
fmt.Println(string(configStr))
return
}
tailf.go
package tailf
import (
"github.com/hpcloud/tail"
"github.com/astaxie/beego/logs"
"fmt"
"time"
)
type CollectConf struct {
LogPath string
Topic string
}
/*{E:\golang\go_pro\logs\logagent.log nginx_log}
每條配置
*/
type TailObj struct {
tail *tail.Tail
conf CollectConf
}
type TextMsg struct {
Msg string
Topic string
}
type TailObjMgr struct {
tailObjs []*TailObj
msgChan chan *TextMsg
}
var (
tailObjMgr* TailObjMgr
)
func GetOneLine()(msg *TextMsg) {
msg = <- tailObjMgr.msgChan
return
}
/*初始化Tail元件一些功能*/
func InitTail(conf []CollectConf, chanSize int) (err error) {
if len(conf) == 0 {
err = fmt.Errorf("invalid config for log collect, conf:%v", conf)
return
}
tailObjMgr = &TailObjMgr{
msgChan: make(chan*TextMsg, chanSize),
}
////appConfig.collectConf [{E:\golang\go_pro\logs\logagent.log nginx_log}]
for _, v := range conf {
obj := &TailObj{
conf: v,
}
//v--- {E:\golang\go_pro\logs\logagent.log nginx_log}
fmt.Println("v---",v)
tails, errTail := tail.TailFile(v.LogPath, tail.Config{
ReOpen: true,
Follow: true,
//Location: &tail.SeekInfo{Offset: 0, Whence: 2},
MustExist: false,
Poll: true,
})
if errTail != nil {
err = errTail
return
}
obj.tail = tails
tailObjMgr.tailObjs = append(tailObjMgr.tailObjs, obj)
go readFromTail(obj)
}
return
}
func readFromTail(tailObj *TailObj) {
for true {
line, ok := <-tailObj.tail.Lines
if !ok {
logs.Warn("tail file close reopen, filename:%s\n", tailObj.tail.Filename)
time.Sleep(100 * time.Millisecond)
continue
}
textMsg := &TextMsg{
Msg:line.Text,
Topic: tailObj.conf.Topic,
}
tailObjMgr.msgChan <- textMsg
}
}
kafka.go
package kafka
import(
"github.com/Shopify/sarama"
"github.com/astaxie/beego/logs"
)
var (
client sarama.SyncProducer
)
/*初始化kafka*/
func InitKafka(addr string) (err error){
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Partitioner = sarama.NewRandomPartitioner
config.Producer.Return.Successes = true
client, err = sarama.NewSyncProducer([]string{addr}, config)
if err != nil {
logs.Error("init kafka producer failed, err:", err)
return
}
//記錄步驟資訊
logs.Debug("init kafka succ")
return
}
/*
傳送到kafak
*/
func SendToKafka(data, topic string)(err error) {
msg := &sarama.ProducerMessage{}
msg.Topic = topic
msg.Value = sarama.StringEncoder(data)
pid, offset, err := client.SendMessage(msg)
if err != nil {
logs.Error("send message failed, err:%v data:%v topic:%v", err, data, topic)
return
}
logs.Debug("send succ, pid:%v offset:%v, topic:%v\n", pid, offset, topic)
return
}
server.go
package main
import(
"logagent/tailf"
"logagent/kafka"
"github.com/astaxie/beego/logs"
"time"
"fmt"
)
func serverRun() (err error){
for {
//從chan中取出
msg := tailf.GetOneLine()
fmt.Println(msg)
err = kafka.SendToKafka(msg.Msg, msg.Topic)
if err != nil {
logs.Error("send to kafka failed, err:%v", err)
time.Sleep(time.Second)
continue
}
}
return
}
logagent.conf
[logs]
log_level=debug
log_path=E:\golang\go_pro\logs\logagent.log
[collect]
log_path=E:\golang\go_pro\logs\logagent.log
topic=nginx_log
chan_size=100
[kafka]
server_addr=192.168.21.8:9092