1. 程式人生 > >15.深入k8s:Event事件處理及其原始碼分析

15.深入k8s:Event事件處理及其原始碼分析

![74623200_p0_master1200](https://img.luozhiyun.com/20201011223447.jpg) > 轉載請宣告出處哦~,本篇文章釋出於luozhiyun的部落格:https://www.luozhiyun.com > > 原始碼版本是[1.19](https://github.com/kubernetes/kubernetes/tree/release-1.19) ## 概述 k8s的Event事件是一種資源物件,用於展示叢集內發生的情況,k8s系統中的各個元件會將執行時發生的各種事件上報給apiserver 。可以通過kubectl get event 或 kubectl describe pod podName 命令顯示事件,檢視k8s叢集中發生了哪些事件。 apiserver 會將Event事件存在etcd叢集中,為避免磁碟空間被填滿,故強制執行保留策略:在最後一次的事件發生後,刪除1小時之前發生的事件。 如: ``` Events: Type Reason Age From Message ---- ------ ---- ---- ------- Normal Scheduled 19s default-scheduler Successfully assigned default/hpatest-bbb44c476-8d45v to 192.168.13.130 Normal Pulled 15s kubelet, 192.168.13.130 Container image "nginx" already present on machine Normal Created 15s kubelet, 192.168.13.130 Created container hpatest Normal Started 13s kubelet, 192.168.13.130 Started container hpatest ``` 當叢集中的 node 或 pod 異常時,大部分使用者會使用 kubectl 檢視對應的 events,我們通過前面章節的程式碼分析可以看到這樣的程式碼: ```go recorder.Eventf(cj, v1.EventTypeWarning, "FailedNeedsStart", "Cannot determine if job needs to be started: %v", err) ``` 通過查詢也可以確認基本上與node 或 pod相關的模組都會涉及到事件,如:controller-manage、kube-proxy、kube-scheduler、kubelet 等。 Event事件管理機制主要有三部分組成: * EventRecorder:是事件生成者,k8s元件通過呼叫它的方法來生成事件; * EventBroadcaster:事件廣播器,負責消費EventRecorder產生的事件,然後分發給broadcasterWatcher; * broadcasterWatcher:用於定義事件的處理方式,如上報apiserver; 整個事件管理機制的流程大致如圖: ![image-20201011221745830](https://img.luozhiyun.com/20201011223452.png) 下面我們以kubelet 中的Event事件來作為分析的例子進行講解。 ## 原始碼分析 kubelet 在初始化的時候會呼叫makeEventRecorder進行Event初始化。 **makeEventRecorder** 檔案位置:cmd/kubelet/app/server.go ```go func makeEventRecorder(kubeDeps *kubelet.Dependencies, nodeName types.NodeName) { if kubeDeps.Recorder != nil { return } // 初始化 EventBroadcaster eventBroadcaster := record.NewBroadcaster() // 初始化 EventRecorder kubeDeps.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: componentKubelet, Host: string(nodeName)}) //記錄Event到log eventBroadcaster.StartStructuredLogging(3) if kubeDeps.EventClient != nil { klog.V(4).Infof("Sending events to api server.") //上報Event到apiserver並存儲至etcd叢集 eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeDeps.EventClient.Events("")}) } else { klog.Warning("No api server defined - no events will be sent to API server.") } } ``` 這個方法建立了一個EventBroadcaster,這是一個事件廣播器,會消費EventRecorder記錄的事件並通過StartStructuredLogging和StartRecordingToSink分別將event傳送給log和apiserver;EventRecorder,用作事件記錄器,k8s系統元件通過它記錄關鍵性事件; ### EventRecorder記錄事件 ```go type EventRecorder interface { Event(object runtime.Object, eventtype, reason, message string) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) } ``` EventRecorder介面非常的簡單,就3個方法。其中Event是可以用來記錄剛發生的事件;Eventf通過使用fmt.Sprintf格式化輸出事件的格式;AnnotatedEventf功能和Eventf一致,但是附加了註釋欄位。 我們記錄事件的時候上面也提到了,一般如下記錄: ```go recorder.Eventf(cj, v1.EventTypeWarning, "FailedNeedsStart", "Cannot determine if job needs to be started: %v", err) ``` Eventf會呼叫到EventRecorder的實現類recorderImpl中去,最後呼叫到generateEvent方法中: **Event** 檔案位置:client-go/tools/record/event.go ```go func (recorder *recorderImpl) Event(object runtime.Object, eventtype, reason, message string) { recorder.generateEvent(object, nil, metav1.Now(), eventtype, reason, message) } func (recorder *recorderImpl) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) { recorder.Event(object, eventtype, reason, fmt.Sprintf(messageFmt, args...)) } ``` **generateEvent** ```go func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations map[string]string, timestamp metav1.Time, eventtype, reason, message string) { ... //例項化Event event := recorder.makeEvent(ref, annotations, eventtype, reason, message) event.Source = recorder.source //非同步呼叫Action方法將事件寫入到incoming中 go func() { // NOTE: events should be a non-blocking operation defer utilruntime.HandleCrash() recorder.Action(watch.Added, event) }() } ``` generateEvent方法會非同步的呼叫Action方法,將事件寫入到incoming中: ```go func (m *Broadcaster) Action(action EventType, obj runtime.Object) { m.incoming <- Event{action, obj} } ``` 呼叫步驟如下: ![image-20201011170747803](https://img.luozhiyun.com/20201011223458.png) ### EventBroadcaster事件廣播 EventBroadcaster初始化的時候會呼叫NewBroadcaster方法: 檔案位置:client-go/tools/record/event.go ```go func NewBroadcaster() EventBroadcaster { return &eventBroadcasterImpl{ Broadcaster: watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), sleepDuration: defaultSleepDuration, } } ``` 這裡會建立一個eventBroadcasterImpl例項,並設定兩個欄位Broadcaster和sleepDuration。Broadcaster是這個方法的核心,我們下面接著看: ```go func NewBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *Broadcaster { m := &Broadcaster{ watchers: map[int64]*broadcasterWatcher{}, incoming: make(chan Event, incomingQueueLength), watchQueueLength: queueLength, fullChannelBehavior: fullChannelBehavior, } m.distributing.Add(1) //開啟事件迴圈 go m.loop() return m } ``` 在這裡初始化Broadcaster的時候,會初始化一個broadcasterWatcher,用於定義事件處理方式,如上報apiserver等;初始化incoming,用於EventBroadcaster和EventRecorder進行事件傳輸。 **loop** 檔案位置:k8s.io/apimachinery/pkg/watch/mux.go ```go func (m *Broadcaster) loop() { //獲取m.incoming管道中的資料 for event := range m.incoming { if event.Type == internalRunFunctionMarker { event.Object.(functionFakeRuntimeObject)() continue } //進行事件分發 m.distribute(event) } m.closeAll() m.distributing.Done() } ``` 這個方法會一直後臺等待獲取m.incoming管道中的資料,然後呼叫distribute方法進行事件分發給broadcasterWatcher。incoming管道中的資料是EventRecorder呼叫Event方法傳入的。 **distribute** ```go func (m *Broadcaster) distribute(event Event) { m.lock.Lock() defer m.lock.Unlock() //如果是非阻塞,那麼使用DropIfChannelFull標識 if m.fullChannelBehavior == DropIfChannelFull { for _, w := range m.watchers { select { case w.result <- event: case <-w.stopped: default: // Don't block if the event can't be queued. } } } else { for _, w := range m.watchers { select { case w.result <- event: case <-w.stopped: } } } } ``` 如果是非阻塞,那麼使用DropIfChannelFull標識,在w.result管道滿了之後,事件會丟失。如果沒有default關鍵字,那麼,當w.result管道滿了之後,分發過程會阻塞並等待。 這裡之所以需要丟失事件,是因為隨著k8s叢集越來越大,上報事件也隨之增多,那麼每次上報都要對etcd進行讀寫,這樣會給etcd叢集帶來壓力。但是事件丟失並不會影響叢集的正常工作,所以非阻塞分發機制下事件會丟失。 ### recordToSink事件的處理 呼叫StartRecordingToSink方法會將資料上報到apiserver。 **StartRecordingToSink** ```go func (e *eventBroadcasterImpl) StartRecordingToSink(sink EventSink) watch.Interface { eventCorrelator := NewEventCorrelatorWithOptions(e.options) return e.StartEventWatcher( func(event *v1.Event) { recordToSink(sink, event, eventCorrelator, e.sleepDuration) }) } func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface { watcher := e.Watch() go func() { defer utilruntime.HandleCrash() for watchEvent := range watcher.ResultChan() { event, ok := watchEvent.Object.(*v1.Event) if !ok { continue } //回撥傳入的方法 eventHandler(event) } }() return watcher } ``` StartRecordingToSink會呼叫StartEventWatcher,StartEventWatcher方法裡面會非同步的呼叫 watcher.ResultChan()方法獲取到broadcasterWatcher的result管道,result管道里面的資料就是Broadcaster的distribute方法進行分發的。 最後會回撥到傳入的方法recordToSink中。 **recordToSink** ```go func recordToSink(sink EventSink, event *v1.Event, eventCorrelator *EventCorrelator, sleepDuration time.Duration) { eventCopy := *event event = &eventCopy //對事件做預處理,聚合相同的事件 result, err := eventCorrelator.EventCorrelate(event) if err != nil { utilruntime.HandleError(err) } if result.Skip { return } tries := 0 for { // 把事件傳送到 apiserver if recordEvent(sink, result.Event, result.Patch, result.Event.Count >
1, eventCorrelator) { break } tries++ if tries >= maxTriesPerEvent { klog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event) break } if tries == 1 { time.Sleep(time.Duration(float64(sleepDuration) * rand.Float64())) } else { time.Sleep(sleepDuration) } } } ``` recordToSink方法首先會呼叫EventCorrelate方法對event做預處理,聚合相同的事件,避免產生的事件過多,增加 etcd 和 apiserver 的壓力,如果傳入的Event太多了,那麼result.Skip 就會返回false; 接下來會呼叫recordEvent方法把事件傳送到 apiserver,它會重試很多次(預設是 12 次),並且每次重試都有一定時間間隔(預設是 10 秒鐘)。 下面我們分別來看看EventCorrelate方法和recordEvent方法。 **EventCorrelate** 檔案位置:client-go/tools/record/events_cache.go ```go func (c *EventCorrelator) EventCorrelate(newEvent *v1.Event) (*EventCorrelateResult, error) { if newEvent == nil { return nil, fmt.Errorf("event is nil") } aggregateEvent, ckey := c.aggregator.EventAggregate(newEvent) observedEvent, patch, err := c.logger.eventObserve(aggregateEvent, ckey) if c.filterFunc(observedEvent) { return &EventCorrelateResult{Skip: true}, nil } return &EventCorrelateResult{Event: observedEvent, Patch: patch}, err } ``` EventCorrelate方法會呼叫EventAggregate、eventObserve進行聚合,呼叫filterFunc會呼叫到spamFilter.Filter方法進行過濾。 ```go func (e *EventAggregator) EventAggregate(newEvent *v1.Event) (*v1.Event, string) { now := metav1.NewTime(e.clock.Now()) var record aggregateRecord eventKey := getEventKey(newEvent) aggregateKey, localKey := e.keyFunc(newEvent) e.Lock() defer e.Unlock() // 查詢快取裡面是否也存在這樣的記錄 value, found := e.cache.Get(aggregateKey) if found { record = value.(aggregateRecord) } // maxIntervalInSeconds預設時間是600s,這裡校驗快取裡面的記錄是否太老了 // 如果是那麼就建立一個新的 // 如果record在快取裡面找不到,那麼lastTimestamp是零,那麼也建立一個新的 maxInterval := time.Duration(e.maxIntervalInSeconds) * time.Second interval := now.Time.Sub(record.lastTimestamp.Time) if interval >
maxInterval { record = aggregateRecord{localKeys: sets.NewString()} } record.localKeys.Insert(localKey) record.lastTimestamp = now // 重新加入到LRU快取中 e.cache.Add(aggregateKey, record) // 如果沒有達到閾值,那麼不進行聚合 if uint(record.localKeys.Len()) < e.maxEvents { return newEvent, eventKey } record.localKeys.PopAny() eventCopy := &v1.Event{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("%v.%x", newEvent.InvolvedObject.Name, now.UnixNano()), Namespace: newEvent.Namespace, }, Count: 1, FirstTimestamp: now, InvolvedObject: newEvent.InvolvedObject, LastTimestamp: now, // 將Message進行聚合 Message: e.messageFunc(newEvent), Type: newEvent.Type, Reason: newEvent.Reason, Source: newEvent.Source, } return eventCopy, aggregateKey } ``` EventAggregate方法也考慮了很多,首先是去快取裡面查詢有沒有相同的聚合記錄aggregateRecord,如果沒有的話,那麼會在校驗時間間隔的時候順便建立聚合記錄aggregateRecord; 由於快取時lru快取,所以再將聚合記錄重新Add到快取的頭部; 接下來會判斷快取是否已經超過了閾值,如果沒有達到閾值,那麼直接返回不進行聚合; 如果達到閾值了,那麼會重新copy傳入的Event,並呼叫messageFunc方法聚合Message; **eventObserve** ```go func (e *eventLogger) eventObserve(newEvent *v1.Event, key string) (*v1.Event, []byte, error) { var ( patch []byte err error ) eventCopy := *newEvent event := &eventCopy e.Lock() defer e.Unlock() // 檢查是否在快取中 lastObservation := e.lastEventObservationFromCache(key) // 如果大於0說明存在,並且對Count進行自增 if lastObservation.count > 0 { event.Name = lastObservation.name event.ResourceVersion = lastObservation.resourceVersion event.FirstTimestamp = lastObservation.firstTimestamp event.Count = int32(lastObservation.count) + 1 eventCopy2 := *event eventCopy2.Count = 0 eventCopy2.LastTimestamp = metav1.NewTime(time.Unix(0, 0)) eventCopy2.Message = "" newData, _ := json.Marshal(event) oldData, _ := json.Marshal(eventCopy2) patch, err = strategicpatch.CreateTwoWayMergePatch(oldData, newData, event) } // 最後重新更新快取記錄 e.cache.Add( key, eventLog{ count: uint(event.Count), firstTimestamp: event.FirstTimestamp, name: event.Name, resourceVersion: event.ResourceVersion, }, ) return event, patch, err } ``` eventObserve方法裡面會去查詢快取中的記錄,然後對count進行自增後更新到快取中。 **Filter** 檔案位置:client-go/tools/record/events_cache.go ```go func (f *EventSourceObjectSpamFilter) Filter(event *v1.Event) bool { var record spamRecord eventKey := getSpamKey(event) f.Lock() defer f.Unlock() value, found := f.cache.Get(eventKey) if found { record = value.(spamRecord) } if record.rateLimiter == nil { record.rateLimiter = flowcontrol.NewTokenBucketRateLimiterWithClock(f.qps, f.burst, f.clock) } // 使用令牌桶進行過濾 filter := !record.rateLimiter.TryAccept() // update the cache f.cache.Add(eventKey, record) return filter } ``` Filter主要時起到了一個限速的作用,通過令牌桶來進行過濾操作。 **recordEvent** 檔案位置:client-go/tools/record/event.go ```go func recordEvent(sink EventSink, event *v1.Event, patch []byte, updateExistingEvent bool, eventCorrelator *EventCorrelator) bool { var newEvent *v1.Event var err error // 更新已經存在的事件 if updateExistingEvent { newEvent, err = sink.Patch(event, patch) } // 建立一個新的事件 if !updateExistingEvent || (updateExistingEvent && util.IsKeyNotFoundError(err)) { event.ResourceVersion = "" newEvent, err = sink.Create(event) } if err == nil { eventCorrelator.UpdateState(newEvent) return true } // 如果是已知錯誤,就不要再重試了;否則,返回 false,讓上層進行重試 switch err.(type) { case *restclient.RequestConstructionError: klog.Errorf("Unable to construct event '%#v': '%v' (will not retry!)", event, err) return true case *errors.StatusError: if errors.IsAlreadyExists(err) { klog.V(5).Infof("Server rejected event '%#v': '%v' (will not retry!)", event, err) } else { klog.Errorf("Server rejected event '%#v': '%v' (will not retry!)", event, err) } return true case *errors.UnexpectedObjectError: default: } klog.Errorf("Unable to write event: '%v' (may retry after sleeping)", err) return false } ``` recordEvent方法會根據eventCorrelator返回的結果來決定是新建一個事件還是更新已經存在的事件,並根據請求的結果決定是否需要重試。 整個recordToSink呼叫比較繞,這裡我把圖畫一下: ![image-20201011222338424](https://img.luozhiyun.com/20201011223510.png) 到這裡整個方法算時講解完畢了。 ## 總結 瞭解完 events 的整個處理流程後,再梳理一下整個流程: 1. 首先是初始化 EventBroadcaster 物件,同時會初始化一個 Broadcaster 物件,並開啟一個loop迴圈接收所有的 events 並進行廣播; 2. 然後通過 EventBroadcaster 物件的 NewRecorder() 方法初始化 EventRecorder 物件,EventRecorder 物件會生成 events 並通過 Action() 方法傳送 events 到 Broadcaster 的 channel 佇列中; 3. EventBroadcaster 會呼叫StartStructuredLogging、StartRecordingToSink方法呼叫封裝好的StartEventWatcher方法,並執行自己的邏輯; 4. StartRecordingToSink封裝的StartEventWatcher方法裡面會將所有的 events 廣播給每一個 watcher,並呼叫recordToSink方法對收到 events 後會進行快取、過濾、聚合而後傳送到 apiserver,apiserver 會將 events 儲存到 etcd 中。 ## Reference https://www.bluematador.com/blog/kubernetes-events-explained https://kubernetes.io/docs/tasks/debug-application-cluster/debug-application-intros