1. 程式人生 > 程式設計 >logrus hook輸出日誌到本地磁碟的操作

logrus hook輸出日誌到本地磁碟的操作

logrus是go的一個日誌框架,它最讓人激動的應該是hook機制,可以在初始化時為logrus新增hook,logrus可以實現各種擴充套件功能,可以將日誌輸出到elasticsearch和activemq等中介軟體去,甚至可以輸出到你的email和叮叮中去,不要問為為什麼可以發現可以輸入到叮叮中去,都是淚,手動笑哭!

言歸正傳,這裡就簡單的通過hook機制將檔案輸出到本地磁碟。

首先

go get github.com/sirupsen/logrus

然後

logrus和go lib裡面一樣有6個等級,可以直接呼叫

logrus.Debug("Useful debugging information.")
logrus.Info("Something noteworthy happened!")
logrus.Warn("You should probably take a look at this.")
logrus.Error("Something failed but I'm not quitting.")
logrus.Fatal("Bye.")  //log之後會呼叫os.Exit(1)
logrus.Panic("I'm bailing.")  //log之後會panic()

專案例子結構

logrus hook輸出日誌到本地磁碟的操作

main.go

package main

import (
 "fmt"
 "github.com/sirupsen/logrus"
 "logT/logS"
)
func main() {
  //建立一個hook,將日誌儲存路徑輸入進去
 hook := logS.NewHook("d:/log/golog.log")
 //載入hook之前列印日誌
 logrus.WithField("file","d:/log/golog.log").Info("New logrus hook err.")
 logrus.AddHook(hook)
 //載入hook之後列印日誌
 logrus.WithFields(logrus.Fields{
 "animal": "walrus",}).Info("A walrus appears")
}

hook.go

不要看下面三個go檔案程式碼很長,其實大多數都是固定程式碼,也就NewHook函式自己擴充套件定義就好

package logS

import (
 "fmt"
 "github.com/sirupsen/logrus"
 "os"
 "strings"
)

// Hook 寫檔案的Logrus Hook
type Hook struct {
 W LoggerInterface
}

func NewHook(file string) (f *Hook) {
 w := NewFileWriter()
 config := fmt.Sprintf(`{"filename":"%s","maxdays":7}`,file)
 err := w.Init(config)
 if err != nil {
 return nil
 }

 return &Hook{w}
}

// Fire 實現Hook的Fire介面
func (hook *Hook) Fire(entry *logrus.Entry) (err error) {
 message,err := getMessage(entry)
 if err != nil {
 fmt.Fprintf(os.Stderr,"Unable to read entry,%v",err)
 return err
 }
 switch entry.Level {
 case logrus.PanicLevel:
 fallthrough
 case logrus.FatalLevel:
 fallthrough
 case logrus.ErrorLevel:
 return hook.W.WriteMsg(fmt.Sprintf("[ERROR] %s",message),LevelError)
 case logrus.WarnLevel:
 return hook.W.WriteMsg(fmt.Sprintf("[WARN] %s",LevelWarn)
 case logrus.InfoLevel:
 return hook.W.WriteMsg(fmt.Sprintf("[INFO] %s",LevelInfo)
 case logrus.DebugLevel:
 return hook.W.WriteMsg(fmt.Sprintf("[DEBUG] %s",LevelDebug)
 default:
 return nil
 }
}

// Levels 實現Hook的Levels介面
func (hook *Hook) Levels() []logrus.Level {
 return []logrus.Level{
 logrus.PanicLevel,logrus.FatalLevel,logrus.ErrorLevel,logrus.WarnLevel,logrus.InfoLevel,logrus.DebugLevel,}
}

func getMessage(entry *logrus.Entry) (message string,err error) {
 message = message + fmt.Sprintf("%s ",entry.Message)
 file,lineNumber := GetCallerIgnoringLogMulti(2)
 if file != "" {
 sep := fmt.Sprintf("%s/src/",os.Getenv("GOPATH"))
 fileName := strings.Split(file,sep)
 if len(fileName) >= 2 {
  file = fileName[1]
 }
 }
 message = fmt.Sprintf("%s:%d ",file,lineNumber) + message

 for k,v := range entry.Data {
 message = message + fmt.Sprintf("%v:%v ",k,v)
 }
 return
}

caller.go

package logS

import (
 "runtime"
 "strings"
)

func GetCaller(callDepth int,suffixesToIgnore ...string) (file string,line int) {
 // bump by 1 to ignore the getCaller (this) stackframe
 callDepth++
outer:
 for {
 var ok bool
 _,line,ok = runtime.Caller(callDepth)
 if !ok {
  file = "???"
  line = 0
  break
 }

 for _,s := range suffixesToIgnore {
  if strings.HasSuffix(file,s) {
  callDepth++
  continue outer
  }
 }
 break
 }
 return
}

// GetCallerIgnoringLogMulti TODO
func GetCallerIgnoringLogMulti(callDepth int) (string,int) {
 // the +1 is to ignore this (getCallerIgnoringLogMulti) frame
 return GetCaller(callDepth+1,"logrus/hooks.go","logrus/entry.go","logrus/logger.go","logrus/exported.go","asm_amd64.s")
}

file.go

package logS

import (
 "encoding/json"
 "errors"
 "fmt"
 "io/ioutil"
 "log"
 "os"
 "path/filepath"
 "strings"
 "sync"
 "time"
)

// RFC5424 log message levels.
const (
 LevelError = iota
 LevelWarn
 LevelInfo
 LevelDebug
)

// LoggerInterface Logger介面
type LoggerInterface interface {
 Init(config string) error
 WriteMsg(msg string,level int) error
 Destroy()
 Flush()
}

// LogWriter implements LoggerInterface.
// It writes messages by lines limit,file size limit,or time frequency.
type LogWriter struct {
 *log.Logger
 mw *MuxWriter
 // The opened file
 Filename string `json:"filename"`

 Maxlines     int `json:"maxlines"`
 maxlinesCurlines int

 // Rotate at size
 Maxsize    int `json:"maxsize"`
 maxsizeCursize int

 // Rotate daily
 Daily     bool `json:"daily"`
 Maxdays    int64 `json:"maxdays"`
 dailyOpendate int

 Rotate bool `json:"rotate"`

 startLock sync.Mutex // Only one log can write to the file

 Level int `json:"level"`
}

// MuxWriter an *os.File writer with locker.
type MuxWriter struct {
 sync.Mutex
 fd *os.File
}

// write to os.File.
func (l *MuxWriter) Write(b []byte) (int,error) {
 l.Lock()
 defer l.Unlock()
 return l.fd.Write(b)
}

// SetFd set os.File in writer.
func (l *MuxWriter) SetFd(fd *os.File) {
 if l.fd != nil {
 _ = l.fd.Close()
 }
 l.fd = fd
}

// NewFileWriter create a FileLogWriter returning as LoggerInterface.
func NewFileWriter() LoggerInterface {
 w := &LogWriter{
 Filename: "",Maxlines: 1000000,Maxsize: 1 << 28,//256 MB
 Daily:  true,Maxdays: 7,Rotate:  true,Level:  LevelDebug,}
 // use MuxWriter instead direct use os.File for lock write when rotate
 w.mw = new(MuxWriter)
 // set MuxWriter as Logger's io.Writer
 w.Logger = log.New(w.mw,"",log.Ldate|log.Ltime)
 return w
}

// Init file logger with json config.
// jsonconfig like:
// {
// "filename":"logs/sample.log",// "maxlines":10000,// "maxsize":1<<30,// "daily":true,// "maxdays":15,// "rotate":true
// }
func (w *LogWriter) Init(jsonconfig string) error {
 err := json.Unmarshal([]byte(jsonconfig),w)
 if err != nil {
 return err
 }
 if len(w.Filename) == 0 {
 return errors.New("jsonconfig must have filename")
 }
 err = w.startLogger()
 return err
}

// start file logger. create log file and set to locker-inside file writer.
func (w *LogWriter) startLogger() error {
 fd,err := w.createLogFile()
 if err != nil {
 return err
 }
 w.mw.SetFd(fd)
 err = w.initFd()
 if err != nil {
 return err
 }
 return nil
}

func (w *LogWriter) docheck(size int) {
 w.startLock.Lock()
 defer w.startLock.Unlock()
 if w.Rotate && ((w.Maxlines > 0 && w.maxlinesCurlines >= w.Maxlines) ||
 (w.Maxsize > 0 && w.maxsizeCursize >= w.Maxsize) ||
 (w.Daily && time.Now().Day() != w.dailyOpendate)) {
 if err := w.DoRotate(); err != nil {
  fmt.Fprintf(os.Stderr,"FileLogWriter(%q): %s\n",w.Filename,err)
  return
 }
 }
 w.maxlinesCurlines++
 w.maxsizeCursize += size
}

// WriteMsg write logger message into file.
func (w *LogWriter) WriteMsg(msg string,level int) error {
 if level > w.Level {
 return nil
 }
 n := 24 + len(msg) // 24 stand for the length "2013/06/23 21:00:22 [T] "
 w.docheck(n)
 w.Logger.Print(msg)
 return nil
}

func (w *LogWriter) createLogFile() (*os.File,error) {
 // Open the log file
 fd,err := os.OpenFile(w.Filename,os.O_WRONLY|os.O_APPEND|os.O_CREATE,0660)
 return fd,err
}

func (w *LogWriter) initFd() error {
 fd := w.mw.fd
 finfo,err := fd.Stat()
 if err != nil {
 return fmt.Errorf("get stat err: %s",err)
 }
 w.maxsizeCursize = int(finfo.Size())
 w.dailyOpendate = time.Now().Day()
 if finfo.Size() > 0 {
 content,err := ioutil.ReadFile(w.Filename)
 if err != nil {
  return err
 }
 w.maxlinesCurlines = len(strings.Split(string(content),"\n"))
 } else {
 w.maxlinesCurlines = 0
 }
 return nil
}

// DoRotate means it need to write file in new file.
// new file name like xx.log.2013-01-01.2
func (w *LogWriter) DoRotate() error {
 _,err := os.Lstat(w.Filename)
 if err == nil { // file exists
 // Find the next available number
 num := 1
 fname := ""
 for ; err == nil && num <= 999; num++ {
  fname = w.Filename + fmt.Sprintf(".%s.%03d",time.Now().Format("2006-01-02"),num)
  _,err = os.Lstat(fname)
 }
 // return error if the last file checked still existed
 if err == nil {
  return fmt.Errorf("Rotate: Cannot find free log number to rename %s",w.Filename)
 }

 // block Logger's io.Writer
 w.mw.Lock()
 defer w.mw.Unlock()

 fd := w.mw.fd
 _ = fd.Close()

 // close fd before rename
 // Rename the file to its newfound home
 err = os.Rename(w.Filename,fname)
 if err != nil {
  return fmt.Errorf("Rotate: %s",err)
 }

 // re-start logger
 err = w.startLogger()
 if err != nil {
  return fmt.Errorf("Rotate StartLogger: %s",err)
 }

 go w.deleteOldLog()
 }

 return nil
}

func (w *LogWriter) deleteOldLog() {
 dir := filepath.Dir(w.Filename)
 _ = filepath.Walk(dir,func(path string,info os.FileInfo,err error) (returnErr error) {
 defer func() {
  if r := recover(); r != nil {
  returnErr = fmt.Errorf("Unable to delete old log '%s',error: %+v",path,r)
  fmt.Println(returnErr)
  }
 }()

 if !info.IsDir() && info.ModTime().Unix() < (time.Now().Unix()-60*60*24*w.Maxdays) {
  if strings.HasPrefix(filepath.Base(path),filepath.Base(w.Filename)) {
  _ = os.Remove(path)
  }
 }
 return
 })
}

// Destroy destroy file logger,close file writer.
func (w *LogWriter) Destroy() {
 _ = w.mw.fd.Close()
}

// Flush file logger.
// there are no buffering messages in file logger in memory.
// flush file means sync file from disk.
func (w *LogWriter) Flush() {
 _ = w.mw.fd.Sync()
}

補充知識:golang logrus自定義hook:日誌切片hook、郵件警報hook、kafkahook

logrus Hook 分析

logrus hook 介面定義很簡單。如下

package logrus

// A hook to be fired when logging on the logging levels returned from
// `Levels()` on your implementation of the interface. Note that this is not
// fired in a goroutine or a channel with workers,you should handle such
// functionality yourself if your call is non-blocking and you don't wish for
// the logging calls for levels returned from `Levels()` to block.
type Hook interface {
 Levels() []Level
 Fire(*Entry) error
}

// Internal type for storing the hooks on a logger instance.
type LevelHooks map[Level][]Hook

// Add a hook to an instance of logger. This is called with
// `log.Hooks.Add(new(MyHook))` where `MyHook` implements the `Hook` interface.
func (hooks LevelHooks) Add(hook Hook) {
 for _,level := range hook.Levels() {
 hooks[level] = append(hooks[level],hook)
 }
}

// Fire all the hooks for the passed level. Used by `entry.log` to fire
// appropriate hooks for a log entry.
func (hooks LevelHooks) Fire(level Level,entry *Entry) error {
 for _,hook := range hooks[level] {
 if err := hook.Fire(entry); err != nil {
  return err
 }
 }
 return nil
}

只需實現 該結構的介面。

type Hook interface {
 Levels() []Level
 Fire(*Entry) error
}

就會被logrus框架遍歷呼叫已註冊的 hook 的 Fire 方法

獲取日誌例項

// log_hook.go
package logger

import (
 "fmt"
 "github.com/sirupsen/logrus"
 "library/util/constant"
 "os"
)

//自實現 logrus hook
func getLogger(module string) *logrus.Logger {
 //例項化
 logger := logrus.New()
 //設定輸出
 logger.Out = os.Stdout
 //設定日誌級別
 logger.SetLevel(logrus.DebugLevel)
 //設定日誌格式
 //自定writer就行, hook 交給 lfshook
 logger.AddHook(newLogrusHook(constant.GetLogPath(),module))
 
 logger.SetFormatter(&logrus.JSONFormatter{
 TimestampFormat:"2006-01-02 15:04:05",})
 return logger
}

//確保每次呼叫使用的檔案都是唯一的。
func GetNewFieldLoggerContext(module,appField string) *logrus.Entry {
 logger:= getLogger(module)
 return logger.WithFields(logrus.Fields{
 "app": appField,})
}

//訂閱 警告日誌
func SubscribeLog(entry *logrus.Entry,subMap SubscribeMap) {
 logger := entry.Logger
 logger.AddHook(newSubScribeHook(subMap))
 fmt.Println("日誌訂閱成功")
}

constant.GetLogPath() 可以替換為自己的日誌檔案輸出目錄地址,比如我的mac上則是:/usr/local/log,直接替換即可。

日誌切片hook

程式碼

// writer.go
package logger

import (
 "fmt"
 "github.com/pkg/errors"
 "io"
 "library/util"
 "os"
 "path/filepath"
 "sync"
 "time"
)

type LogWriter struct {
 logDir       string //日誌根目錄地址。
 module       string //模組 名
  curFileName   string //當前被指定的filename
 curBaseFileName   string //在使用中的file
 turnCateDuration  time.Duration
 mutex      sync.RWMutex
 outFh      *os.File
}

func (w *LogWriter) Write(p []byte) (n int,err error) {
 w.mutex.Lock()
 defer w.mutex.Unlock()
 if out,err:= w.getWriter(); err!=nil {
 return 0,errors.New("failed to fetch target io.Writer")
 }else{
 return out.Write(p)
 }
}

func (w *LogWriter) getFileName() string {
 base := time.Now().Truncate(w.turnCateDuration)
 return fmt.Sprintf("%s/%s/%s_%s",w.logDir,base.Format("2006-01-02"),w.module,base.Format("15"))
}

func (w *LogWriter) getWriter()(io.Writer,error) {
 fileName := w.curBaseFileName
 //判斷是否有新的檔名
 //會出現新的檔名
 baseFileName := w.getFileName()
 if baseFileName != fileName {
 fileName = baseFileName
 }

 dirname := filepath.Dir(fileName)
 if err := os.MkdirAll(dirname,0755); err != nil {
 return nil,errors.Wrapf(err,"failed to create directory %s",dirname)
 }

 fileHandler,err := os.OpenFile(fileName,os.O_CREATE|os.O_APPEND|os.O_WRONLY,0644)
 if err != nil {
 return nil,errors.Errorf("failed to open file %s",err)
 }
 w.outFh.Close()
 w.outFh = fileHandler
 w.curBaseFileName = fileName
 w.curFileName = fileName

 return fileHandler,nil
}

func New(logPath,module string,duration time.Duration) *LogWriter {
 return &LogWriter{
 logDir: logPath,module: module,turnCateDuration:duration,curFileName: "",curBaseFileName: "",}
}
// hook.go
package logger

import (
 "github.com/rifflock/lfshook"
 "github.com/sirupsen/logrus"
 "time"
)
func newLogrusHook(logPath,moduel string) logrus.Hook {
 logrus.SetLevel(logrus.WarnLevel)

 writer := New(logPath,moduel,time.Hour * 2)

 lfsHook := lfshook.NewHook(lfshook.WriterMap{
 logrus.DebugLevel: writer,logrus.InfoLevel: writer,logrus.WarnLevel: writer,logrus.ErrorLevel: writer,logrus.FatalLevel: writer,logrus.PanicLevel: writer,},&logrus.TextFormatter{DisableColors: true})

 // writer 生成新的log檔案型別 writer 在通過new hook函式 消費 fire 函式
 // writer 是實現了writer 介面的庫,在日誌呼叫write是做預處理
 return lfsHook
}

測試程式碼

func TestGetLogger(t *testing.T) {
 lg := GetNewFieldLoggerContext("test","d")
 lg.Logger.Info("????")
}

解析

logger例項持有了 自定義的 io.writer 結構體,在消費Fire函式時,會呼叫Write方法,此時通過Truncate時間切片函式邏輯判斷需要寫入的檔案。或建立新的檔案。

注: 文章提供的程式碼是按天切分資料夾的,資料夾內模組日誌再按2小時切分。可自行替換成按模組切分。

郵件警報hook

程式碼

// subscribeHook.go
package logger

import (
 "fmt"
 "github.com/sirupsen/logrus"
 "library/email"
 "strings"
)

type SubscribeMap map[logrus.Level][]*email.Receiver
type SubscribeHook struct {
 subMap SubscribeMap
}
//此處可以自實現hook 目前使用三方hook
func(h *SubscribeHook)Levels() []logrus.Level{
 return logrus.AllLevels
}

func(h *SubscribeHook)Fire(entry *logrus.Entry) error{
 for level,receivers := range h.subMap {
 //命中 準備消費
 if level == entry.Level {
  if len(receivers) > 0 {
  email.SendEmail(receivers,fmt.Sprintf("%s:[系統日誌警報]",entry.Level.String()),fmt.Sprintf("錯誤內容: %s",entry.Message))
  }
 }
 }
 return nil
}
func NewSubscribeMap(level logrus.Level,receiverStr string) SubscribeMap{
 subMap := SubscribeMap{}
 addressList := strings.Split(receiverStr,";")
 var receivers []*email.Receiver
 for _,address := range addressList {
 receivers = append(receivers,&email.Receiver{Email: address})
 }
 subMap[level] = receivers
 return subMap
}
func newSubScribeHook(subMap SubscribeMap) *SubscribeHook {
 return &SubscribeHook{subMap}
// email.go
package email

import (
 "fmt"
 "gopkg.in/gomail.v2"
 "regexp"
 "strconv"
)

type Sender struct {
 User   string
 Password string
 Host   string
 Port   int
 MailTo  []string
 Subject  string
 Content  string
}

type Receiver struct {
 Email  string
}

func (r *Receiver) Check() bool {
 pattern := `\w+([-+.]\w+)*@\w+([-.]\w+)*\.\w+([-.]\w+)*` //匹配電子郵箱
 reg := regexp.MustCompile(pattern)
 return reg.MatchString(r.Email)
}

func (s *Sender) clean (){

}

//檢查 郵箱正確性
func (s *Sender)NewReceiver(email string) *Receiver {
 rec := &Receiver{Email:email}
 if rec.Check() {
 m.MailTo = []string{email}
 return rec
 }else{
 fmt.Printf("email check fail 【%s】\n",email)
 return nil
 }
}
func (s *Sender)NewReceivers(receivers []*Receiver) {
 for _,rec := range receivers {
 if rec.Check() {
  m.MailTo = append(m.MailTo,rec.Email)
 }else{
  fmt.Printf("email check fail 【%s】\n",rec.Email)
 }
 }
}
// 163郵箱 password 為開啟smtp後給的祕鑰
var m = Sender{User:"[email protected]",Password:"666666666",Host: "smtp.163.com",Port: 465}

func SendEmail(receivers []*Receiver,subject,content string){
 m.NewReceivers(receivers)
 m.Subject = subject
 m.Content = content

 e := gomail.NewMessage()
 e.SetHeader("From",e.FormatAddress(m.User,"hengsheng"))
 e.SetHeader("To",m.MailTo...)  //傳送給多個使用者
 e.SetHeader("Subject",m.Subject) //設定郵件主題
 e.SetBody("text/html",m.Content)  //設定郵件正文
 d := gomail.NewDialer(m.Host,m.Port,m.User,m.Password)
 err := d.DialAndSend(e)
 if err != nil {
 fmt.Printf("error 郵件傳送錯誤! %s \n",err.Error())
 }
}

使用

同理在writer時 如果是錯誤日誌則傳送郵件。

o.logger = logger.GetNewFieldLoggerContext("test","666")
if subscribeSocket {
 logger.SubscribeLog(o.Logger,logger.NewSubscribeMap(logrus.ErrorLevel,"[email protected];[email protected]"))
 }
 // o 為實際結構體例項

kafkahook

// kafka hook
package logger

import (
 "github.com/sirupsen/logrus"
 "library/kafka"
 "library/util/constant"
)

type KafKaHook struct {
 kafkaProducer  *kafka.KafkaProducer
}


func(h *KafKaHook)Levels() []logrus.Level{
 return logrus.AllLevels
}

func(h *KafKaHook)Fire(entry *logrus.Entry) error{
 h.kafkaProducer.SendMsgSync(entry.Message)
 return nil
}

func newKafkaHook() *KafKaHook{
 producer := kafka.NewKafkaProducer(constant.KafkaLogElkTopic,true)
 return &KafKaHook{kafkaProducer: producer}
}

使用時logger.AddHook(newKafkaHook()) 即可

kafka模組

生產者

// kafkaProducer.go
package kafka

import (
 "errors"
 "fmt"
 "github.com/Shopify/sarama"
 "library/util/constant"
 "log"
 "time"
)

func GetKafkaAddress()[]string{
 return "127.0.0.1:9092"
}

//同步訊息模式
func SyncProducer(topic,message string) error {
 config := sarama.NewConfig()
 config.Producer.Return.Successes = true
 config.Producer.Timeout = 5 * time.Second
 p,err := sarama.NewSyncProducer(GetKafkaAddress(),config)
 if err != nil {
 return errors.New(fmt.Sprintf("sarama.NewSyncProducer err,message=%s \n",err))
 }
 defer p.Close()
 msg := &sarama.ProducerMessage{
 Topic: topic,Value: sarama.ByteEncoder(message),}
 part,offset,err := p.SendMessage(msg)
 if err != nil {
 return errors.New(fmt.Sprintf("send sdsds err=%s \n",err))
 } else {
 fmt.Printf("傳送成功,partition=%d,offset=%d \n",part,offset)
 return nil
 }
}

//async 非同步生產者
type KafkaProducer struct {
 topic    string
 asyncProducer  *sarama.AsyncProducer
 syncProducer  *sarama.SyncProducer
 sync    bool
}

func NewKafkaProducer(topic string,sync bool) *KafkaProducer {
 k := &KafkaProducer{
 topic:   topic,sync:   sync,}
 if sync {
 k.initSync()
 }else{
 k.initAsync()
 }
 return k
}

func (k *KafkaProducer) initAsync() bool {
 if k.sync {
 fmt.Printf("sync producer cant call async func !\n")
 return false
 }
 config := sarama.NewConfig()
 //等待伺服器所有副本都儲存成功後的響應
 config.Producer.RequiredAcks = sarama.WaitForAll
 //隨機向partition傳送訊息
 config.Producer.Partitioner = sarama.NewRandomPartitioner
 //是否等待成功和失敗後的響應,只有上面的RequireAcks設定不是NoReponse這裡才有用.
 config.Producer.Return.Successes = true
 config.Producer.Return.Errors = true
 //設定使用的kafka版本,如果低於V0_10_0_0版本,訊息中的timestrap沒有作用.需要消費和生產同時配置
 //注意,版本設定不對的話,kafka會返回很奇怪的錯誤,並且無法成功傳送訊息
 config.Version = sarama.V0_10_0_1

 producer,e := sarama.NewAsyncProducer(GetKafkaAddress(),config)
 if e != nil {
 fmt.Println(e)
 return false
 }
 k.asyncProducer = &producer
 defer producer.AsyncClose()
 pd := *k.asyncProducer
 go func() {
 for{
  select {
  case <-pd.Successes():
  //fmt.Println("offset: ",suc.Offset,"timestamp: ",suc.Timestamp.String(),"partitions: ",suc.Partition)
  case fail := <-pd.Errors():
  fmt.Printf("err: %s \n",fail.Err.Error())
  }
 }
 }()

 return true
}

func (k *KafkaProducer) initSync() bool {
 if !k.sync {
 fmt.Println("async producer cant call sync func !")
 return false
 }

 config := sarama.NewConfig()
 config.Producer.Return.Successes = true
 config.Producer.Timeout = 5 * time.Second
 p,config)
 k.syncProducer = &p
 if err != nil {
 log.Printf("sarama.NewSyncProducer err,err)
 return false
 }
 return true
}

func (k *KafkaProducer) SendMsgAsync(sendStr string) {

 msg := &sarama.ProducerMessage{
 Topic: k.topic,}

 //將字串轉化為位元組陣列
 msg.Value = sarama.ByteEncoder(sendStr)
 //fmt.Println(value)

 //使用通道傳送
 pd := *k.asyncProducer
 pd.Input() <- msg
}

func (k *KafkaProducer) SendMsgSync(sendStr string) bool {
 msg := &sarama.ProducerMessage{
 Topic: k.topic,Value: sarama.ByteEncoder(sendStr),}
 pd := *k.syncProducer
 part,err := pd.SendMessage(msg)
 if err != nil {
 fmt.Printf("傳送失敗 send message(%s) err=%s \n",sendStr,err)
 return false
 } else {
 fmt.Printf("傳送成功 partition=%d,offset)
 return true
 }
}

呼叫 SendMsgSync 或 SendMsgAsync 生產訊息,注意初始化時的引數要保證一致!

消費者組

// kafkaConsumerGroup.go

package kafka

import (
 "context"
 "fmt"
 "github.com/Shopify/sarama"
 "log"
 "sync"
)

func NewKafkaConsumerGroup(topics []string,group string,businessCall func(message *sarama.ConsumerMessage) bool) *KafkaConsumerGroup {
 k := &KafkaConsumerGroup{
 brokers:  GetKafkaAddress(),topics:  topics,group:       group,channelBufferSize: 2,ready:       make(chan bool),version:  "1.1.1",handler:  businessCall,}
 k.Init()
 return k
}

// 消費者組(consumer group): 相同的group.id的消費者將視為同一個消費者組,// 每個消費者都需要設定一個組id,每條訊息只能被 consumer group 中的一個
// Consumer 消費,但可以被多個 consumer group 消費
type KafkaConsumerGroup struct {
 //代理(broker): 一臺kafka伺服器稱之為一個broker
 brokers   []string
 //主題(topic): 訊息的一種邏輯分組,用於對訊息分門別類,每一類訊息稱之為一個主題,相同主題的訊息放在一個佇列中
 topics    []string
 version   string
 ready       chan bool
 group       string
 channelBufferSize  int
 //業務呼叫
 handler     func(message *sarama.ConsumerMessage) bool
}

func (k *KafkaConsumerGroup)Init() func() {

 version,err := sarama.ParseKafkaVersion(k.version)
 if err!=nil{
 fmt.Printf("Error parsing Kafka version: %v",err)
 }
 cfg := sarama.NewConfig()
 cfg.Version = version
 // 分割槽分配策略
 cfg.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
 // 未找到組消費位移的時候從哪邊開始消費
 cfg.Consumer.Offsets.Initial = -2
 // channel長度
 cfg.ChannelBufferSize = k.channelBufferSize
 ctx,cancel := context.WithCancel(context.Background())
 client,err := sarama.NewConsumerGroup(k.brokers,k.group,cfg)
 if err != nil {
  fmt.Printf("Error creating consumer group client: %v",err)
 }

 wg := &sync.WaitGroup{}
 wg.Add(1)
 go func() {
  defer func() {
  wg.Done()
  //util.HandlePanic("client.Consume panic",log.StandardLogger())
  }()
  for {
  if err := client.Consume(ctx,k.topics,k); err != nil {
   log.Printf("Error from consumer: %v",err)
  }
  // check if context was cancelled,signaling that the consumer should stop
  if ctx.Err() != nil {
   log.Println(ctx.Err())
   return
  }
  k.ready = make(chan bool)
  }
 }()

 <-k.ready
 fmt.Printf("Sarama consumer up and running!... \n")
 // 保證在系統退出時,通道里面的訊息被消費
 return func() {
  cancel()
  wg.Wait()
  if err = client.Close(); err != nil {
  fmt.Printf("Error closing client: %v \n",err)
  }
 }

}

// Setup is run at the beginning of a new session,before ConsumeClaim
func (k *KafkaConsumerGroup) Setup(sarama.ConsumerGroupSession) error {
 // Mark the consumer as ready
 close(k.ready)
 return nil
}

// Cleanup is run at the end of a session,once all ConsumeClaim goroutines have exited
func (k *KafkaConsumerGroup) Cleanup(sarama.ConsumerGroupSession) error {
 return nil
}

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (k *KafkaConsumerGroup) ConsumeClaim(session sarama.ConsumerGroupSession,claim sarama.ConsumerGroupClaim) error {

 // NOTE:
 // Do not move the code below to a goroutine.
 // The `ConsumeClaim` itself is called within a goroutine,see:
 // https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
 // 具體消費訊息
 for message := range claim.Messages() {
 //msg := string(message.Value)
 //k.logger.Infof("卡夫卡: %s",msg)

 if ok:= k.handler(message); ok {
  // 更新位移
  session.MarkMessage(message,"")
 }
 //run.Run(msg)
 }
 return nil
}

測試程式碼

func TestKafkaConsumerGroup_Init(t *testing.T) {
 //pd := NewKafkaProducer("test-fail",true)
 //pd.InitSync()
 k := NewKafkaConsumerGroup([]string{constant.KafkaALiSdkTopic},"group-2",func(message *sarama.ConsumerMessage) bool {
 fmt.Println(string(message.Value))
 //如果失敗的處理邏輯
 //if ok := pd.SendMsgSync("666666"); ok {
 // return true
 //}
 return false

 })
 consumerDone := k.Init()

 sigterm := make(chan os.Signal,1)
 signal.Notify(sigterm,syscall.SIGINT,syscall.SIGTERM)
 select {
 case <-sigterm:
 fmt.Println("terminating: via signal")
 }
 consumerDone()
}

這裡有一些補償邏輯在裡面。

以上就是logrus相關hook。

好了,這篇logrus hook輸出日誌到本地磁碟的操作就是小編分享給大家的全部內容了,希望能給大家一個參考,也希望大家多多支援我們。