kubernetes1.9原始碼閱讀 List
List-Watch是kubernetes的核心機制。元件kubelet、kube-controller-manager、kube-scheduler需要監控各種資源(pod、service等)的變化,當這些物件發生變化時(add、delete、update),kube-apiserver會主動通知這些元件。這個過程類似一個釋出-訂閱系統。本文章將從程式碼角度探究一下list-watch的實現方式。
本次分析是基於kubernetes tag v1.9.0
kube-apiserver對etcd的List-watch機制
構建PodStorage
pkg/registry/core/pod/storage.go NewStorage()
- kube-apiserver針對每一類資源(pod、service、endpoint、replication controller、depolyments),都會建立Storage物件,如:PodStorage。PodStorage.Pod.Store封裝了對etcd的操作;
- RESTOptions包括Decorator物件,它是針對storage的一個裝飾器;
建立StorageDecorator
apiserver/registry/storage_factory.go
- 通過StorageFactory工廠模式建立Cacher物件,返回StorageDecorator,StorageDecorator即為帶有cache的storage;
建立Cacher
apiserver/pkg/storage/cacher.go NewCacherFromConfig()
- Cacher物件,主要的資料成員:storage、watchCache、reflector、watchers及incoming channel;
- watchCache是一個cache,用來儲存apiserver從etcd那裡watch到的物件;
- watchers是一個map,map的值型別為cacheWatcher,當kubelet、kube-scheduler需要watch某類資源時,他們會向kube-apiserver發起watch請求,kube-apiserver就會生成一個cacheWatcher,cacheWatcher負責將watch的資源通過http從apiserver傳遞到kubelet、kube-scheduler;
- Reflector物件,主要資料成員:ListerWatcher,ListerWatcher是介面物件,包括方法List()和Watch();listerWatcher包裝了Storage,主要是將watch到的物件存到watchCache中;
- incoming channel接收watchCacheEvent;
- 註冊cacher.processEvent方法,協程呼叫cacher.dispatchEvents();
- 協程呼叫cacher.startCaching();
StartCaching
client-go/tools/cache/reflector.go ListAndWatch()
- ListAndWatch()方法
- 首先,建立watchCache物件和cacheListerWatcher物件,cacheListWatcher物件是ListerWatcher介面實現,實現了List()和Watch()方法;
- 之後,執行cacheListerWatcher的List()方法和Watch()方法;
- 最後,呼叫reflector的watchHandler()方法;
List/Watch
apiserver/pkg/storage/cacher.go List()和Watch();
- List()方法將呼叫storage.List()方法;
- Watch()方法將呼叫storage.watch()方法;
Storage List/Watch
apiserver/pkg/storage/etcd/etcd_helper.go
- etcdHelper物件是Storage介面物件的實現;
- etcdHelper的List()方法:
- 獲取etcd的物件,包括resourceVersion資訊;
- etcdHelper的WatchList()方法:
- 建立etcdWatcher;
- etcdWatcher物件,實現了Watch介面;
- etcdWatcher物件,主要的資料成員是etcdIncoming channel和outgoing channel;
- 協程執行etcdWatcher.translate()
- 最後,協程執行etcdWatcher.etcdWatch()
etcdWatcher.etcdWatch
apiserver/pkg/storage/etcd/etcd_watcher.go etcdWatch()
- 如果resourceVersion==0, 執行etcdGetInitialWatchState(),獲取所有的pods,並將結果輸入到etcdIncoming channel;
- 之後,不停的呼叫watcher.Next(),並將結果輸入到etcdIncoming channel;
etcdWatcher.translate
apiserver/pkg/storage/etcd/etcd_watcher.go translate()
- 讀取etcdIncoming channel資訊;
- 呼叫etcdWatcher.sendResult()進行轉化;
- 輸入到outgoing channel;
reflector.watchHandler
client-go/tools/cache/reflector.go watchHandler()
- 讀取outgoing channel資訊,更新watchCache;
更新watchCache
apiserver/pkg/storage/watch_cache.go Add()/Delete()/Get()/Update()
處理事件processEvent
apiserver/pkg/storage/watch_cache.go processEvent()
- 建立watchCacheEvent
- 呼叫watchCache.updateCache(),更新cache;
kube-apiserver的watch功能
kube-apiserver的watch功能是作為一個restful api提供給其他元件(kubelet、kube-controller-manager、kube-scheduler、kube-proxy)。watch的處理流程和PUT、DELETE、GET等REST API處理流程類似。
registerResourceHandlers
apiserver/pkg/endpoints/installer.go registerResourceHandlers()
ListResource
apiserver/pkg/endpoints/handlers/rest.go ListResource()
1.呼叫rest.watcher.watch()方法,這裡將會呼叫Store.watch();
2.呼叫serveWatch()方法;
Store.watch
apiserver/pkg/registry/generic/registry/store.go watch()/watchPredicate()
- 會呼叫Storage.Watch()方法,這裡將會呼叫Cacher.watch();
Cacher.watch
apiserver/pkg/storage/cacher.go watch()/watchList()
- 首先,呼叫newCacheWatcher生成一個watcher,並將watcher插入到cacher.watchers中去;
- 協程呼叫cacheWatcher.process()方法,此方法將會操作cacheWatcher的input channel的訊息;
3. 會將watchCacheEvent通過add()新增到cacheWatcher的input channel中;
操作input channel
apiserver/pkg/storage/cacher.go process()
- 讀取input channel的資訊,並呼叫sendWatchCacheEvent()方法;
sendWatchCacheEvent
apiserver/pkg/storage/cacher.go sendWatchCacheEvent()
- kube-apiserver的watch會帶過濾的功能;
- 對watchCacheEvent進行Filter,輸出到cacheWatcher的result channel中;
serveWatch
apiserver/pkg/endpoints/handlers/rest.go serveWatch()
對result Channel資訊進行序列化,併發送給呼叫者;
相關閱讀:
1. http://licyhust.com/