1. 程式人生 > >docker原始碼分析之容器日誌處理與log-driver實現

docker原始碼分析之容器日誌處理與log-driver實現

  • 子程序:

    由一個程序(父程序)建立的程序,整合父程序大部分屬性,同時可以被父程序守護和管理。

    (2) 你需要知道關於程序產生日誌的形式

    程序產生日誌有兩類輸出方式,一類是寫入到檔案中。另一類是直接寫到stdout或者stderr,例如php的echo python的print golang的fmt.Println("")等等。

    (3) 是否知道docker-daemon與執行中container的關係?

    一個container就是一個特殊的程序,它是由docker daemon建立並啟動,因此container是docker daemon的子程序。由docker daemon守護和管理。因此container的stdout能夠被docker daemon獲取到。基於此理論,我們來分析docker daemon相關程式碼。docker-daemon關於日誌原始碼分析container例項原始碼# /container/container.go:62 type CommonContainer struct{ StreamConfig *stream.Config ... } # /container/stream/streams.go:26 type Config struct { sync.WaitGroup stdout *broadcaster.Unbuffered stderr *broadcaster.Unbuffered stdin io.ReadCloser stdinPipe io.WriteCloser }找到如上所示對應的程式碼,顯示了每一個container例項都有幾個屬性stdout,stderr,stdin,以及管道stdinPipe。這裡說下stdinPipe,當容器使用-i引數啟動時標準輸入將被執行,daemon將能夠使用此管道向容器內寫入標準輸入。

    我們試想以上圖例,如果是你,你怎麼實現日誌收集轉發?# /container/container.go:312func(container *Container) StartLogger(cfg containertypes.LogConfig) (logger.Logger, error) { c, err := logger.GetLogDriver(cfg.Type)iferr != nil{returnnil, fmt.Errorf("Failed to get logging factory: %v", err) } ctx := logger.Context{ Config: cfg.Config, ContainerID: container.ID, ContainerName: container.Name, ContainerEntrypoint: container.Path, ContainerArgs: container.Args, ContainerImageID: container.ImageID.String(), ContainerImageName: container.Config.Image, ContainerCreated: container.Created, ContainerEnv: container.Config.Env, ContainerLabels: container.Config.Labels, DaemonName: "docker", }// Set logging file for "json-logger"ifcfg.Type == jsonfilelog.Name { ctx.LogPath, err = container.GetRootResourcePath(fmt.Sprintf("%s-json.log", container.ID))iferr != nil{returnnil, err } }returnc(ctx) } #/container/container.go:978func(container *Container) startLogging() error {ifcontainer.HostConfig.LogConfig.Type == "none"{returnnil// do not start logging routines} l, err := container.StartLogger(container.HostConfig.LogConfig)iferr != nil{returnfmt.Errorf("Failed to initialize logging driver: %v", err) } copier := logger.NewCopier(map[string]io.Reader{"stdout": container.StdoutPipe(), "stderr": container.StderrPipe()}, l) container.LogCopier = copier copier.Run() container.LogDriver = l// set LogPath field only for json-file logdriverifjl, ok := l.(*jsonfilelog.JSONFileLogger); ok { container.LogPath = jl.LogPath() }returnnil}第一個方法是為container查詢log-driver。首先根據容器配置的log-driver類別呼叫:logger.GetLogDriver(cfg.Type)返回一個方法型別:/daemon/logger/factory.go:9 type Creator func(Context) (Logger, error)實質就是從工廠類註冊的logdriver外掛去查詢,具體原始碼下文分析。獲取到c方法後構建呼叫引數具體就是容器的一些資訊。然後使用呼叫c方法返回driver。driver是個介面型別,我們看看有哪些方法:# /daemon/logger/logger.go:61typeLogger interface{ Log(*Message) error Name() stringClose() error }很簡單的三個方法,也很容易理解,Log()傳送日誌訊息到driver,Close()進行關閉操作(根據不同實現)。

    也就是說我們自己實現一個logdriver,只需要實現如上三個方法,然後註冊到logger工廠類中即可。下面我們來看/daemon/logger/factory.go

    第二個方法就是處理日誌了,獲取到日誌driver,在建立一個Copier,顧名思義就是複製日誌,分別從stdout 和stderr複製到logger driver。下面看看具體關鍵實現:#/daemon/logger/copir.go:41func(c *Copier) copySrc(name string, src io.Reader) {deferc.copyJobs.Done() reader := bufio.NewReader(src)for{select{case<-c.closed:returndefault: line, err := reader.ReadBytes('\n') line = bytes.TrimSuffix(line, []byte{'\n'})// ReadBytes can return full or partial output even when it failed.// e.g. it can return a full entry and EOF.iferr == nil|| len(line) >0{iflogErr := c.dst.Log(&Message{Line: line, Source: name, Timestamp: time.Now().UTC()}); logErr != nil{ logrus.Errorf("Failed to log msg %q for logger %s: %s", line, c.dst.Name(), logErr) } }iferr != nil{iferr != io.EOF { logrus.Errorf("Error scanning log stream: %s", err) }return} } } }每讀取一行資料,構建一個訊息,呼叫logdriver的log方法傳送到driver處理。日誌driver註冊器位於/daemon/logger/factory.go的原始碼實現即時日誌driver的註冊器,其中幾個重要的方法(上文已經提到一個):# /daemon/logger/factory.go:21func(lf *logdriverFactory) register(name string, c Creator) error {iflf.driverRegistered(name) {returnfmt.Errorf("logger: log driver named '%s' is already registered", name) } lf.m.Lock() lf.registry[name] = c lf.m.Unlock()returnnil} # /daemon/logger/factory.go:39func(lf *logdriverFactory) registerLogOptValidator(name string, l LogOptValidator) error { lf.m.Lock()deferlf.m.Unlock()if_, ok := lf.optValidator[name]; ok {returnfmt.Errorf("logger: log validator named '%s' is already registered", name) } lf.optValidator[name] = lreturnnil}看起來很簡單,就是將一個Creator方法型別新增到一個map結構中,將LogOptValidator新增到另一個map這裡注意加鎖的操作。#/daemon/logger/factory.go:13 type LogOptValidator func(cfg map[string]string) error這個主要是驗證driver的引數 ,dockerd和docker啟動引數中有:--log-opt好雨雲幫自己實現一個基於zmq的log-driver上文已經完整分析了dockerdaemon管理logdriver和處理日誌的整個流程。相信你已經比較明白了。下面我們以zmq-driver為例講講我們怎麼實現自己的driver。直接接收容器的日誌。

    上文我們已經談了一個log-driver需要實現的幾個方法。

    我們可以看看位於/daemon/logger目錄下的已有的driver的實現,例如fluentd,awslogs等。

    下面我們來分析zmq-driver具體的程式碼://定義一個struct,這裡包含一個zmq套接字typeZmqLogger struct{ writer *zmq.Socket containerId stringtenantId stringserviceId stringfelock sync.Mutex }//定義init方法呼叫logger註冊器的方法註冊當前driver//和引數驗證方法。funcinit() {iferr := logger.RegisterLogDriver(name, New); err != nil{ logrus.Fatal(err) }iferr := logger.RegisterLogOptValidator(name, ValidateLogOpt); err != nil{ logrus.Fatal(err) } }//實現一個上文提到的Creator方法註冊logdriver.//這裡新建一個zmq套接字構建一個例項funcNew(ctx logger.Context) (logger.Logger, error) { zmqaddress := ctx.Config[zmqAddress] puber, err := zmq.NewSocket(zmq.PUB)iferr != nil{returnnil, err }var( env = make(map[string]string) tenantId stringserviceId string)for_, pair := rangectx.ContainerEnv { p := strings.SplitN(pair, "=",2)//logrus.Errorf("ContainerEnv pair: %s", pair)iflen(p) ==2{ key := p[0] value := p[1] env[key] = value } } tenantId = env["TENANT_ID"] serviceId = env["SERVICE_ID"]iftenantId == ""{ tenantId = "default"}ifserviceId == ""{ serviceId = "default"} puber.Connect(zmqaddress)return&ZmqLogger{ writer: puber, containerId: ctx.ID(), tenantId: tenantId, serviceId: serviceId, felock: sync.Mutex{}, }, nil}//實現Log方法,這裡使用zmq socket傳送日誌訊息//這裡必須注意,zmq socket是執行緒不安全的,我們知道//本方法可能被兩個執行緒(複製stdout和膚質stderr)呼叫//必須使用鎖保證執行緒安全。否則會發生錯誤。func(s *ZmqLogger) Log(msg *logger.Message) error { s.felock.Lock()defers.felock.Unlock() s.writer.Send(s.tenantId, zmq.SNDMORE) s.writer.Send(s.serviceId, zmq.SNDMORE)ifmsg.Source == "stderr"{ s.writer.Send(s.containerId+": "+string(msg.Line), zmq.DONTWAIT) } else{ s.writer.Send(s.containerId+": "+string(msg.Line), zmq.DONTWAIT) }returnnil}//實現Close方法,這裡用來關閉zmq socket。//同樣注意執行緒安全,呼叫此方法的是容器關閉協程。func(s *ZmqLogger) Close() error { s.felock.Lock()defers.felock.Unlock()ifs.writer != nil{returns.writer.Close() }returnnil}func(s *ZmqLogger) Name() string{returnname }//驗證引數的方法,我們使用引數傳入zmq pub的地址。funcValidateLogOpt(cfg map[string]string) error {forkey := rangecfg {switchkey {casezmqAddress:default:returnfmt.Errorf("unknown log opt '%s' for %s log driver", key, name) } }ifcfg[zmqAddress] == ""{returnfmt.Errorf("must specify a value for log opt '%s'", zmqAddress) }returnnil}總結多研究原始碼可以方便我們理解docker的工作原理。今天我們分析了日誌部分。希望讀者對這部分功能能夠理解得更清晰。