logrus hook輸出日誌到本地磁碟的操作
logrus是go的一個日誌框架,它最讓人激動的應該是hook機制,可以在初始化時為logrus新增hook,logrus可以實現各種擴充套件功能,可以將日誌輸出到elasticsearch和activemq等中介軟體去,甚至可以輸出到你的email和叮叮中去,不要問為為什麼可以發現可以輸入到叮叮中去,都是淚,手動笑哭!
言歸正傳,這裡就簡單的通過hook機制將檔案輸出到本地磁碟。
首先
go get github.com/sirupsen/logrus
然後
logrus和go lib裡面一樣有6個等級,可以直接呼叫
logrus.Debug("Useful debugging information.") logrus.Info("Something noteworthy happened!") logrus.Warn("You should probably take a look at this.") logrus.Error("Something failed but I'm not quitting.") logrus.Fatal("Bye.") //log之後會呼叫os.Exit(1) logrus.Panic("I'm bailing.") //log之後會panic()
專案例子結構
main.go
package main import ( "fmt" "github.com/sirupsen/logrus" "logT/logS" ) func main() { //建立一個hook,將日誌儲存路徑輸入進去 hook := logS.NewHook("d:/log/golog.log") //載入hook之前列印日誌 logrus.WithField("file","d:/log/golog.log").Info("New logrus hook err.") logrus.AddHook(hook) //載入hook之後列印日誌 logrus.WithFields(logrus.Fields{ "animal": "walrus",}).Info("A walrus appears") }
hook.go
不要看下面三個go檔案程式碼很長,其實大多數都是固定程式碼,也就NewHook函式自己擴充套件定義就好
package logS
import ( "fmt" "github.com/sirupsen/logrus" "os" "strings" ) // Hook 寫檔案的Logrus Hook type Hook struct { W LoggerInterface } func NewHook(file string) (f *Hook) { w := NewFileWriter() config := fmt.Sprintf(`{"filename":"%s","maxdays":7}`,file) err := w.Init(config) if err != nil { return nil } return &Hook{w} } // Fire 實現Hook的Fire介面 func (hook *Hook) Fire(entry *logrus.Entry) (err error) { message,err := getMessage(entry) if err != nil { fmt.Fprintf(os.Stderr,"Unable to read entry,%v",err) return err } switch entry.Level { case logrus.PanicLevel: fallthrough case logrus.FatalLevel: fallthrough case logrus.ErrorLevel: return hook.W.WriteMsg(fmt.Sprintf("[ERROR] %s",message),LevelError) case logrus.WarnLevel: return hook.W.WriteMsg(fmt.Sprintf("[WARN] %s",LevelWarn) case logrus.InfoLevel: return hook.W.WriteMsg(fmt.Sprintf("[INFO] %s",LevelInfo) case logrus.DebugLevel: return hook.W.WriteMsg(fmt.Sprintf("[DEBUG] %s",LevelDebug) default: return nil } } // Levels 實現Hook的Levels介面 func (hook *Hook) Levels() []logrus.Level { return []logrus.Level{ logrus.PanicLevel,logrus.FatalLevel,logrus.ErrorLevel,logrus.WarnLevel,logrus.InfoLevel,logrus.DebugLevel,} } func getMessage(entry *logrus.Entry) (message string,err error) { message = message + fmt.Sprintf("%s ",entry.Message) file,lineNumber := GetCallerIgnoringLogMulti(2) if file != "" { sep := fmt.Sprintf("%s/src/",os.Getenv("GOPATH")) fileName := strings.Split(file,sep) if len(fileName) >= 2 { file = fileName[1] } } message = fmt.Sprintf("%s:%d ",file,lineNumber) + message for k,v := range entry.Data { message = message + fmt.Sprintf("%v:%v ",k,v) } return }
caller.go
package logS import ( "runtime" "strings" ) func GetCaller(callDepth int,suffixesToIgnore ...string) (file string,line int) { // bump by 1 to ignore the getCaller (this) stackframe callDepth++ outer: for { var ok bool _,line,ok = runtime.Caller(callDepth) if !ok { file = "???" line = 0 break } for _,s := range suffixesToIgnore { if strings.HasSuffix(file,s) { callDepth++ continue outer } } break } return } // GetCallerIgnoringLogMulti TODO func GetCallerIgnoringLogMulti(callDepth int) (string,int) { // the +1 is to ignore this (getCallerIgnoringLogMulti) frame return GetCaller(callDepth+1,"logrus/hooks.go","logrus/entry.go","logrus/logger.go","logrus/exported.go","asm_amd64.s") }
file.go
package logS import ( "encoding/json" "errors" "fmt" "io/ioutil" "log" "os" "path/filepath" "strings" "sync" "time" ) // RFC5424 log message levels. const ( LevelError = iota LevelWarn LevelInfo LevelDebug ) // LoggerInterface Logger介面 type LoggerInterface interface { Init(config string) error WriteMsg(msg string,level int) error Destroy() Flush() } // LogWriter implements LoggerInterface. // It writes messages by lines limit,file size limit,or time frequency. type LogWriter struct { *log.Logger mw *MuxWriter // The opened file Filename string `json:"filename"` Maxlines int `json:"maxlines"` maxlinesCurlines int // Rotate at size Maxsize int `json:"maxsize"` maxsizeCursize int // Rotate daily Daily bool `json:"daily"` Maxdays int64 `json:"maxdays"` dailyOpendate int Rotate bool `json:"rotate"` startLock sync.Mutex // Only one log can write to the file Level int `json:"level"` } // MuxWriter an *os.File writer with locker. type MuxWriter struct { sync.Mutex fd *os.File } // write to os.File. func (l *MuxWriter) Write(b []byte) (int,error) { l.Lock() defer l.Unlock() return l.fd.Write(b) } // SetFd set os.File in writer. func (l *MuxWriter) SetFd(fd *os.File) { if l.fd != nil { _ = l.fd.Close() } l.fd = fd } // NewFileWriter create a FileLogWriter returning as LoggerInterface. func NewFileWriter() LoggerInterface { w := &LogWriter{ Filename: "",Maxlines: 1000000,Maxsize: 1 << 28,//256 MB Daily: true,Maxdays: 7,Rotate: true,Level: LevelDebug,} // use MuxWriter instead direct use os.File for lock write when rotate w.mw = new(MuxWriter) // set MuxWriter as Logger's io.Writer w.Logger = log.New(w.mw,"",log.Ldate|log.Ltime) return w } // Init file logger with json config. // jsonconfig like: // { // "filename":"logs/sample.log",// "maxlines":10000,// "maxsize":1<<30,// "daily":true,// "maxdays":15,// "rotate":true // } func (w *LogWriter) Init(jsonconfig string) error { err := json.Unmarshal([]byte(jsonconfig),w) if err != nil { return err } if len(w.Filename) == 0 { return errors.New("jsonconfig must have filename") } err = w.startLogger() return err } // start file logger. create log file and set to locker-inside file writer. func (w *LogWriter) startLogger() error { fd,err := w.createLogFile() if err != nil { return err } w.mw.SetFd(fd) err = w.initFd() if err != nil { return err } return nil } func (w *LogWriter) docheck(size int) { w.startLock.Lock() defer w.startLock.Unlock() if w.Rotate && ((w.Maxlines > 0 && w.maxlinesCurlines >= w.Maxlines) || (w.Maxsize > 0 && w.maxsizeCursize >= w.Maxsize) || (w.Daily && time.Now().Day() != w.dailyOpendate)) { if err := w.DoRotate(); err != nil { fmt.Fprintf(os.Stderr,"FileLogWriter(%q): %s\n",w.Filename,err) return } } w.maxlinesCurlines++ w.maxsizeCursize += size } // WriteMsg write logger message into file. func (w *LogWriter) WriteMsg(msg string,level int) error { if level > w.Level { return nil } n := 24 + len(msg) // 24 stand for the length "2013/06/23 21:00:22 [T] " w.docheck(n) w.Logger.Print(msg) return nil } func (w *LogWriter) createLogFile() (*os.File,error) { // Open the log file fd,err := os.OpenFile(w.Filename,os.O_WRONLY|os.O_APPEND|os.O_CREATE,0660) return fd,err } func (w *LogWriter) initFd() error { fd := w.mw.fd finfo,err := fd.Stat() if err != nil { return fmt.Errorf("get stat err: %s",err) } w.maxsizeCursize = int(finfo.Size()) w.dailyOpendate = time.Now().Day() if finfo.Size() > 0 { content,err := ioutil.ReadFile(w.Filename) if err != nil { return err } w.maxlinesCurlines = len(strings.Split(string(content),"\n")) } else { w.maxlinesCurlines = 0 } return nil } // DoRotate means it need to write file in new file. // new file name like xx.log.2013-01-01.2 func (w *LogWriter) DoRotate() error { _,err := os.Lstat(w.Filename) if err == nil { // file exists // Find the next available number num := 1 fname := "" for ; err == nil && num <= 999; num++ { fname = w.Filename + fmt.Sprintf(".%s.%03d",time.Now().Format("2006-01-02"),num) _,err = os.Lstat(fname) } // return error if the last file checked still existed if err == nil { return fmt.Errorf("Rotate: Cannot find free log number to rename %s",w.Filename) } // block Logger's io.Writer w.mw.Lock() defer w.mw.Unlock() fd := w.mw.fd _ = fd.Close() // close fd before rename // Rename the file to its newfound home err = os.Rename(w.Filename,fname) if err != nil { return fmt.Errorf("Rotate: %s",err) } // re-start logger err = w.startLogger() if err != nil { return fmt.Errorf("Rotate StartLogger: %s",err) } go w.deleteOldLog() } return nil } func (w *LogWriter) deleteOldLog() { dir := filepath.Dir(w.Filename) _ = filepath.Walk(dir,func(path string,info os.FileInfo,err error) (returnErr error) { defer func() { if r := recover(); r != nil { returnErr = fmt.Errorf("Unable to delete old log '%s',error: %+v",path,r) fmt.Println(returnErr) } }() if !info.IsDir() && info.ModTime().Unix() < (time.Now().Unix()-60*60*24*w.Maxdays) { if strings.HasPrefix(filepath.Base(path),filepath.Base(w.Filename)) { _ = os.Remove(path) } } return }) } // Destroy destroy file logger,close file writer. func (w *LogWriter) Destroy() { _ = w.mw.fd.Close() } // Flush file logger. // there are no buffering messages in file logger in memory. // flush file means sync file from disk. func (w *LogWriter) Flush() { _ = w.mw.fd.Sync() }
補充知識:golang logrus自定義hook:日誌切片hook、郵件警報hook、kafkahook
logrus Hook 分析
logrus hook 介面定義很簡單。如下
package logrus // A hook to be fired when logging on the logging levels returned from // `Levels()` on your implementation of the interface. Note that this is not // fired in a goroutine or a channel with workers,you should handle such // functionality yourself if your call is non-blocking and you don't wish for // the logging calls for levels returned from `Levels()` to block. type Hook interface { Levels() []Level Fire(*Entry) error } // Internal type for storing the hooks on a logger instance. type LevelHooks map[Level][]Hook // Add a hook to an instance of logger. This is called with // `log.Hooks.Add(new(MyHook))` where `MyHook` implements the `Hook` interface. func (hooks LevelHooks) Add(hook Hook) { for _,level := range hook.Levels() { hooks[level] = append(hooks[level],hook) } } // Fire all the hooks for the passed level. Used by `entry.log` to fire // appropriate hooks for a log entry. func (hooks LevelHooks) Fire(level Level,entry *Entry) error { for _,hook := range hooks[level] { if err := hook.Fire(entry); err != nil { return err } } return nil }
只需實現 該結構的介面。
type Hook interface { Levels() []Level Fire(*Entry) error }
就會被logrus框架遍歷呼叫已註冊的 hook 的 Fire 方法
獲取日誌例項
// log_hook.go package logger import ( "fmt" "github.com/sirupsen/logrus" "library/util/constant" "os" ) //自實現 logrus hook func getLogger(module string) *logrus.Logger { //例項化 logger := logrus.New() //設定輸出 logger.Out = os.Stdout //設定日誌級別 logger.SetLevel(logrus.DebugLevel) //設定日誌格式 //自定writer就行, hook 交給 lfshook logger.AddHook(newLogrusHook(constant.GetLogPath(),module)) logger.SetFormatter(&logrus.JSONFormatter{ TimestampFormat:"2006-01-02 15:04:05",}) return logger } //確保每次呼叫使用的檔案都是唯一的。 func GetNewFieldLoggerContext(module,appField string) *logrus.Entry { logger:= getLogger(module) return logger.WithFields(logrus.Fields{ "app": appField,}) } //訂閱 警告日誌 func SubscribeLog(entry *logrus.Entry,subMap SubscribeMap) { logger := entry.Logger logger.AddHook(newSubScribeHook(subMap)) fmt.Println("日誌訂閱成功") }
constant.GetLogPath() 可以替換為自己的日誌檔案輸出目錄地址,比如我的mac上則是:/usr/local/log,直接替換即可。
日誌切片hook
程式碼
// writer.go package logger import ( "fmt" "github.com/pkg/errors" "io" "library/util" "os" "path/filepath" "sync" "time" ) type LogWriter struct { logDir string //日誌根目錄地址。 module string //模組 名 curFileName string //當前被指定的filename curBaseFileName string //在使用中的file turnCateDuration time.Duration mutex sync.RWMutex outFh *os.File } func (w *LogWriter) Write(p []byte) (n int,err error) { w.mutex.Lock() defer w.mutex.Unlock() if out,err:= w.getWriter(); err!=nil { return 0,errors.New("failed to fetch target io.Writer") }else{ return out.Write(p) } } func (w *LogWriter) getFileName() string { base := time.Now().Truncate(w.turnCateDuration) return fmt.Sprintf("%s/%s/%s_%s",w.logDir,base.Format("2006-01-02"),w.module,base.Format("15")) } func (w *LogWriter) getWriter()(io.Writer,error) { fileName := w.curBaseFileName //判斷是否有新的檔名 //會出現新的檔名 baseFileName := w.getFileName() if baseFileName != fileName { fileName = baseFileName } dirname := filepath.Dir(fileName) if err := os.MkdirAll(dirname,0755); err != nil { return nil,errors.Wrapf(err,"failed to create directory %s",dirname) } fileHandler,err := os.OpenFile(fileName,os.O_CREATE|os.O_APPEND|os.O_WRONLY,0644) if err != nil { return nil,errors.Errorf("failed to open file %s",err) } w.outFh.Close() w.outFh = fileHandler w.curBaseFileName = fileName w.curFileName = fileName return fileHandler,nil } func New(logPath,module string,duration time.Duration) *LogWriter { return &LogWriter{ logDir: logPath,module: module,turnCateDuration:duration,curFileName: "",curBaseFileName: "",} }
// hook.go package logger import ( "github.com/rifflock/lfshook" "github.com/sirupsen/logrus" "time" ) func newLogrusHook(logPath,moduel string) logrus.Hook { logrus.SetLevel(logrus.WarnLevel) writer := New(logPath,moduel,time.Hour * 2) lfsHook := lfshook.NewHook(lfshook.WriterMap{ logrus.DebugLevel: writer,logrus.InfoLevel: writer,logrus.WarnLevel: writer,logrus.ErrorLevel: writer,logrus.FatalLevel: writer,logrus.PanicLevel: writer,},&logrus.TextFormatter{DisableColors: true}) // writer 生成新的log檔案型別 writer 在通過new hook函式 消費 fire 函式 // writer 是實現了writer 介面的庫,在日誌呼叫write是做預處理 return lfsHook }
測試程式碼
func TestGetLogger(t *testing.T) { lg := GetNewFieldLoggerContext("test","d") lg.Logger.Info("????") }
解析
logger例項持有了 自定義的 io.writer 結構體,在消費Fire函式時,會呼叫Write方法,此時通過Truncate時間切片函式邏輯判斷需要寫入的檔案。或建立新的檔案。
注: 文章提供的程式碼是按天切分資料夾的,資料夾內模組日誌再按2小時切分。可自行替換成按模組切分。
郵件警報hook
程式碼
// subscribeHook.go package logger import ( "fmt" "github.com/sirupsen/logrus" "library/email" "strings" ) type SubscribeMap map[logrus.Level][]*email.Receiver type SubscribeHook struct { subMap SubscribeMap } //此處可以自實現hook 目前使用三方hook func(h *SubscribeHook)Levels() []logrus.Level{ return logrus.AllLevels } func(h *SubscribeHook)Fire(entry *logrus.Entry) error{ for level,receivers := range h.subMap { //命中 準備消費 if level == entry.Level { if len(receivers) > 0 { email.SendEmail(receivers,fmt.Sprintf("%s:[系統日誌警報]",entry.Level.String()),fmt.Sprintf("錯誤內容: %s",entry.Message)) } } } return nil } func NewSubscribeMap(level logrus.Level,receiverStr string) SubscribeMap{ subMap := SubscribeMap{} addressList := strings.Split(receiverStr,";") var receivers []*email.Receiver for _,address := range addressList { receivers = append(receivers,&email.Receiver{Email: address}) } subMap[level] = receivers return subMap } func newSubScribeHook(subMap SubscribeMap) *SubscribeHook { return &SubscribeHook{subMap}
// email.go package email import ( "fmt" "gopkg.in/gomail.v2" "regexp" "strconv" ) type Sender struct { User string Password string Host string Port int MailTo []string Subject string Content string } type Receiver struct { Email string } func (r *Receiver) Check() bool { pattern := `\w+([-+.]\w+)*@\w+([-.]\w+)*\.\w+([-.]\w+)*` //匹配電子郵箱 reg := regexp.MustCompile(pattern) return reg.MatchString(r.Email) } func (s *Sender) clean (){ } //檢查 郵箱正確性 func (s *Sender)NewReceiver(email string) *Receiver { rec := &Receiver{Email:email} if rec.Check() { m.MailTo = []string{email} return rec }else{ fmt.Printf("email check fail 【%s】\n",email) return nil } } func (s *Sender)NewReceivers(receivers []*Receiver) { for _,rec := range receivers { if rec.Check() { m.MailTo = append(m.MailTo,rec.Email) }else{ fmt.Printf("email check fail 【%s】\n",rec.Email) } } } // 163郵箱 password 為開啟smtp後給的祕鑰 var m = Sender{User:"[email protected]",Password:"666666666",Host: "smtp.163.com",Port: 465} func SendEmail(receivers []*Receiver,subject,content string){ m.NewReceivers(receivers) m.Subject = subject m.Content = content e := gomail.NewMessage() e.SetHeader("From",e.FormatAddress(m.User,"hengsheng")) e.SetHeader("To",m.MailTo...) //傳送給多個使用者 e.SetHeader("Subject",m.Subject) //設定郵件主題 e.SetBody("text/html",m.Content) //設定郵件正文 d := gomail.NewDialer(m.Host,m.Port,m.User,m.Password) err := d.DialAndSend(e) if err != nil { fmt.Printf("error 郵件傳送錯誤! %s \n",err.Error()) } }
使用
同理在writer時 如果是錯誤日誌則傳送郵件。
o.logger = logger.GetNewFieldLoggerContext("test","666") if subscribeSocket { logger.SubscribeLog(o.Logger,logger.NewSubscribeMap(logrus.ErrorLevel,"[email protected];[email protected]")) } // o 為實際結構體例項
kafkahook
// kafka hook package logger import ( "github.com/sirupsen/logrus" "library/kafka" "library/util/constant" ) type KafKaHook struct { kafkaProducer *kafka.KafkaProducer } func(h *KafKaHook)Levels() []logrus.Level{ return logrus.AllLevels } func(h *KafKaHook)Fire(entry *logrus.Entry) error{ h.kafkaProducer.SendMsgSync(entry.Message) return nil } func newKafkaHook() *KafKaHook{ producer := kafka.NewKafkaProducer(constant.KafkaLogElkTopic,true) return &KafKaHook{kafkaProducer: producer} }
使用時logger.AddHook(newKafkaHook()) 即可
kafka模組
生產者
// kafkaProducer.go package kafka import ( "errors" "fmt" "github.com/Shopify/sarama" "library/util/constant" "log" "time" ) func GetKafkaAddress()[]string{ return "127.0.0.1:9092" } //同步訊息模式 func SyncProducer(topic,message string) error { config := sarama.NewConfig() config.Producer.Return.Successes = true config.Producer.Timeout = 5 * time.Second p,err := sarama.NewSyncProducer(GetKafkaAddress(),config) if err != nil { return errors.New(fmt.Sprintf("sarama.NewSyncProducer err,message=%s \n",err)) } defer p.Close() msg := &sarama.ProducerMessage{ Topic: topic,Value: sarama.ByteEncoder(message),} part,offset,err := p.SendMessage(msg) if err != nil { return errors.New(fmt.Sprintf("send sdsds err=%s \n",err)) } else { fmt.Printf("傳送成功,partition=%d,offset=%d \n",part,offset) return nil } } //async 非同步生產者 type KafkaProducer struct { topic string asyncProducer *sarama.AsyncProducer syncProducer *sarama.SyncProducer sync bool } func NewKafkaProducer(topic string,sync bool) *KafkaProducer { k := &KafkaProducer{ topic: topic,sync: sync,} if sync { k.initSync() }else{ k.initAsync() } return k } func (k *KafkaProducer) initAsync() bool { if k.sync { fmt.Printf("sync producer cant call async func !\n") return false } config := sarama.NewConfig() //等待伺服器所有副本都儲存成功後的響應 config.Producer.RequiredAcks = sarama.WaitForAll //隨機向partition傳送訊息 config.Producer.Partitioner = sarama.NewRandomPartitioner //是否等待成功和失敗後的響應,只有上面的RequireAcks設定不是NoReponse這裡才有用. config.Producer.Return.Successes = true config.Producer.Return.Errors = true //設定使用的kafka版本,如果低於V0_10_0_0版本,訊息中的timestrap沒有作用.需要消費和生產同時配置 //注意,版本設定不對的話,kafka會返回很奇怪的錯誤,並且無法成功傳送訊息 config.Version = sarama.V0_10_0_1 producer,e := sarama.NewAsyncProducer(GetKafkaAddress(),config) if e != nil { fmt.Println(e) return false } k.asyncProducer = &producer defer producer.AsyncClose() pd := *k.asyncProducer go func() { for{ select { case <-pd.Successes(): //fmt.Println("offset: ",suc.Offset,"timestamp: ",suc.Timestamp.String(),"partitions: ",suc.Partition) case fail := <-pd.Errors(): fmt.Printf("err: %s \n",fail.Err.Error()) } } }() return true } func (k *KafkaProducer) initSync() bool { if !k.sync { fmt.Println("async producer cant call sync func !") return false } config := sarama.NewConfig() config.Producer.Return.Successes = true config.Producer.Timeout = 5 * time.Second p,config) k.syncProducer = &p if err != nil { log.Printf("sarama.NewSyncProducer err,err) return false } return true } func (k *KafkaProducer) SendMsgAsync(sendStr string) { msg := &sarama.ProducerMessage{ Topic: k.topic,} //將字串轉化為位元組陣列 msg.Value = sarama.ByteEncoder(sendStr) //fmt.Println(value) //使用通道傳送 pd := *k.asyncProducer pd.Input() <- msg } func (k *KafkaProducer) SendMsgSync(sendStr string) bool { msg := &sarama.ProducerMessage{ Topic: k.topic,Value: sarama.ByteEncoder(sendStr),} pd := *k.syncProducer part,err := pd.SendMessage(msg) if err != nil { fmt.Printf("傳送失敗 send message(%s) err=%s \n",sendStr,err) return false } else { fmt.Printf("傳送成功 partition=%d,offset) return true } }
呼叫 SendMsgSync 或 SendMsgAsync 生產訊息,注意初始化時的引數要保證一致!
消費者組
// kafkaConsumerGroup.go package kafka import ( "context" "fmt" "github.com/Shopify/sarama" "log" "sync" ) func NewKafkaConsumerGroup(topics []string,group string,businessCall func(message *sarama.ConsumerMessage) bool) *KafkaConsumerGroup { k := &KafkaConsumerGroup{ brokers: GetKafkaAddress(),topics: topics,group: group,channelBufferSize: 2,ready: make(chan bool),version: "1.1.1",handler: businessCall,} k.Init() return k } // 消費者組(consumer group): 相同的group.id的消費者將視為同一個消費者組,// 每個消費者都需要設定一個組id,每條訊息只能被 consumer group 中的一個 // Consumer 消費,但可以被多個 consumer group 消費 type KafkaConsumerGroup struct { //代理(broker): 一臺kafka伺服器稱之為一個broker brokers []string //主題(topic): 訊息的一種邏輯分組,用於對訊息分門別類,每一類訊息稱之為一個主題,相同主題的訊息放在一個佇列中 topics []string version string ready chan bool group string channelBufferSize int //業務呼叫 handler func(message *sarama.ConsumerMessage) bool } func (k *KafkaConsumerGroup)Init() func() { version,err := sarama.ParseKafkaVersion(k.version) if err!=nil{ fmt.Printf("Error parsing Kafka version: %v",err) } cfg := sarama.NewConfig() cfg.Version = version // 分割槽分配策略 cfg.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange // 未找到組消費位移的時候從哪邊開始消費 cfg.Consumer.Offsets.Initial = -2 // channel長度 cfg.ChannelBufferSize = k.channelBufferSize ctx,cancel := context.WithCancel(context.Background()) client,err := sarama.NewConsumerGroup(k.brokers,k.group,cfg) if err != nil { fmt.Printf("Error creating consumer group client: %v",err) } wg := &sync.WaitGroup{} wg.Add(1) go func() { defer func() { wg.Done() //util.HandlePanic("client.Consume panic",log.StandardLogger()) }() for { if err := client.Consume(ctx,k.topics,k); err != nil { log.Printf("Error from consumer: %v",err) } // check if context was cancelled,signaling that the consumer should stop if ctx.Err() != nil { log.Println(ctx.Err()) return } k.ready = make(chan bool) } }() <-k.ready fmt.Printf("Sarama consumer up and running!... \n") // 保證在系統退出時,通道里面的訊息被消費 return func() { cancel() wg.Wait() if err = client.Close(); err != nil { fmt.Printf("Error closing client: %v \n",err) } } } // Setup is run at the beginning of a new session,before ConsumeClaim func (k *KafkaConsumerGroup) Setup(sarama.ConsumerGroupSession) error { // Mark the consumer as ready close(k.ready) return nil } // Cleanup is run at the end of a session,once all ConsumeClaim goroutines have exited func (k *KafkaConsumerGroup) Cleanup(sarama.ConsumerGroupSession) error { return nil } // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). func (k *KafkaConsumerGroup) ConsumeClaim(session sarama.ConsumerGroupSession,claim sarama.ConsumerGroupClaim) error { // NOTE: // Do not move the code below to a goroutine. // The `ConsumeClaim` itself is called within a goroutine,see: // https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29 // 具體消費訊息 for message := range claim.Messages() { //msg := string(message.Value) //k.logger.Infof("卡夫卡: %s",msg) if ok:= k.handler(message); ok { // 更新位移 session.MarkMessage(message,"") } //run.Run(msg) } return nil }
測試程式碼
func TestKafkaConsumerGroup_Init(t *testing.T) { //pd := NewKafkaProducer("test-fail",true) //pd.InitSync() k := NewKafkaConsumerGroup([]string{constant.KafkaALiSdkTopic},"group-2",func(message *sarama.ConsumerMessage) bool { fmt.Println(string(message.Value)) //如果失敗的處理邏輯 //if ok := pd.SendMsgSync("666666"); ok { // return true //} return false }) consumerDone := k.Init() sigterm := make(chan os.Signal,1) signal.Notify(sigterm,syscall.SIGINT,syscall.SIGTERM) select { case <-sigterm: fmt.Println("terminating: via signal") } consumerDone() }
這裡有一些補償邏輯在裡面。
以上就是logrus相關hook。
好了,這篇logrus hook輸出日誌到本地磁碟的操作就是小編分享給大家的全部內容了,希望能給大家一個參考,也希望大家多多支援我們。