1. 程式人生 > >3.深入Istio:Pilot配置規則ConfigController

3.深入Istio:Pilot配置規則ConfigController

> 轉載請宣告出處哦~,本篇文章釋出於luozhiyun的部落格:https://www.luozhiyun.com > > 本文使用的Istio原始碼是 release 1.5。 Config Controller用於管理各種配置資料,包括使用者建立的流量管理規則和策略。Istio目前支援三種類型的Config Controller: * MCP:是一種網路配置協議,用於隔離Pilot和底層平臺(檔案系統、K8s),使得Pilot無須感知底層平臺的差異,從而達到解耦的目的。 * File:通過監視器週期性地讀取本地配置檔案,將配置規則快取在記憶體中,並維護配置的增加、更新、刪除事件,當快取由變化的時候,非同步通知執行事件回撥。 * Kubernetes:基於k8s的Config發現利用了k8s Informer的監聽能力。在k8s叢集中,Config以CustomResource的形式存在。通過監聽apiserver配置規則資源,維護所有資源的快取Store,並觸發事件處理回撥函式。 ## ConfigController初始化 ConfigController是在initConfigController中被初始化的,在initConfigController方法中會呼叫makeKubeConfigController進行controller的初始化。 ```go func (s *Server) makeKubeConfigController(args *PilotArgs) (model.ConfigStoreCache, error) { //建立configClient configClient, err := controller.NewClient(args.Config.KubeConfig, "", collections.Pilot, args.Config.ControllerOptions.DomainSuffix, buildLedger(args.Config), args.Revision) if err != nil { return nil, multierror.Prefix(err, "failed to open a config client.") } //建立controller,併為config資源設定監聽 return controller.NewController(configClient, args.Config.ControllerOptions), nil } func NewController(client *Client, options controller2.Options) model.ConfigStoreCache { log.Infof("CRD controller watching namespaces %q", options.WatchedNamespace) // The queue requires a time duration for a retry delay after a handler error out := &controller{ client: client, queue: queue.NewQueue(1 * time.Second), kinds: make(map[resource.GroupVersionKind]*cacheHandler), } // add stores for CRD kinds //獲取所有的CRD型別 for _, s := range client.Schemas().All() { //為每一種Config資源都建立一個informer,監聽所有的Config資源 out.addInformer(s, options.WatchedNamespace, options.ResyncPeriod) } return out } ``` 初始化完controller之後會獲取所有的CRD型別,為每一種Config資源都建立一個informer,監聽所有的Config資源。 ```go Pilot = collection.NewSchemasBuilder(). //MeshPolicy MustAdd(IstioAuthenticationV1Alpha1Meshpolicies). MustAdd(IstioAuthenticationV1Alpha1Policies). MustAdd(IstioConfigV1Alpha2Httpapispecbindings). MustAdd(IstioConfigV1Alpha2Httpapispecs). MustAdd(IstioMixerV1ConfigClientQuotaspecbindings). MustAdd(IstioMixerV1ConfigClientQuotaspecs). //DestinationRule MustAdd(IstioNetworkingV1Alpha3Destinationrules). //EnvoyFilter MustAdd(IstioNetworkingV1Alpha3Envoyfilters). //Gateway MustAdd(IstioNetworkingV1Alpha3Gateways). //ServiceEntry MustAdd(IstioNetworkingV1Alpha3Serviceentries). //Sidecar MustAdd(IstioNetworkingV1Alpha3Sidecars). //VirtualService MustAdd(IstioNetworkingV1Alpha3Virtualservices). MustAdd(IstioRbacV1Alpha1Clusterrbacconfigs). MustAdd(IstioRbacV1Alpha1Rbacconfigs). MustAdd(IstioRbacV1Alpha1Servicerolebindings). MustAdd(IstioRbacV1Alpha1Serviceroles). MustAdd(IstioSecurityV1Beta1Authorizationpolicies). MustAdd(IstioSecurityV1Beta1Peerauthentications). MustAdd(IstioSecurityV1Beta1Requestauthentications). Build() ``` 這裡定義好了所有要用到的Config資源型別,主要涉及網路配置、認證、鑑權、策略管理等。 ## ConfigController事件處理 下面我們看一下controller定義: ```go type controller struct { client *Client queue queue.Instance kinds map[resource.GroupVersionKind]*cacheHandler } ``` client是呼叫`controller.NewClient`初始化的client;queue會在Informer監聽到資源的變動的時候將資料push到佇列中,controller在呼叫run方法的時候單獨執行一個執行緒執行queue中的函式;kinds在呼叫addInformer方法的時候初始化進去。 queue.Instance的定義如下: ```go type Task func() error type Instance interface { Push(task Task) Run(<-chan struct{}) } type queueImpl struct { delay time.Duration tasks []Task cond *sync.Cond closing bool } ``` queueImpl繼承了Instance介面,在呼叫push方法的時候,會將Task放入到tasks陣列中,並在呼叫Run方法的時候消費陣列中的資料。 controller繼承了ConfigStoreCache介面: ```go type ConfigStoreCache interface { ConfigStore // 註冊規則事件處理函式 RegisterEventHandler(kind resource.GroupVersionKind, handler func(Config, Config, Event)) // 執行 Run(stop <-chan struct{}) // 配置快取是否已同步 HasSynced() bool } ``` ConfigStoreCache通過RegisterEventHandler介面為上面提到的配置資源都註冊事件處理函式,通過Run方法啟動控制器。 ```go func (c *controller) Run(stop <-chan struct{}) { log.Infoa("Starting Pilot K8S CRD controller") go func() { cache.WaitForCacheSync(stop, c.HasSynced) //單獨啟動一個執行緒執行queue裡面的函式 c.queue.Run(stop) }() for _, ctl := range c.kinds { go ctl.informer.Run(stop) } <-stop log.Info("controller terminated") } ``` 在呼叫Run方法的時候會單獨的啟動一個執行緒呼叫queue的Run方法消費佇列中的資料,並遍歷所有的配置資訊,呼叫informer的Run方法開啟監聽。 監聽器的EventHandler通過如下程式碼註冊: ```go func (c *controller) newCacheHandler( schema collection.Schema, o runtime.Object, otype string, resyncPeriod time.Duration, lf cache.ListFunc, wf cache.WatchFunc) *cacheHandler { informer := cache.NewSharedIndexInformer( &cache.ListWatch{ListFunc: lf, WatchFunc: wf}, o, resyncPeriod, cache.Indexers{}) h := &cacheHandler{ c: c, schema: schema, informer: informer, } informer.AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { incrementEvent(otype, "add") //將ADD事件傳送至佇列 c.queue.Push(func() error { return h.onEvent(nil, obj, model.EventAdd) }) }, UpdateFunc: func(old, cur interface{}) { if !reflect.DeepEqual(old, cur) { incrementEvent(otype, "update") //將Update事件傳送至佇列 c.queue.Push(func() error { return h.onEvent(old, cur, model.EventUpdate) }) } else { incrementEvent(otype, "updatesame") } }, DeleteFunc: func(obj interface{}) { incrementEvent(otype, "delete") //將Delete事件傳送至佇列 c.queue.Push(func() error { return h.onEvent(nil, obj, model.EventDelete) }) }, }) return h } ``` 當Config資源建立、更新、刪除時,EventHandler建立任務物件並將其傳送到任務佇列中,然後由任務處理執行緒處理。當對應的事件被呼叫的時候會觸發onEvent方法,會呼叫到cacheHandler的onEvent方法,最後設定完畢後將cacheHandler返回,controller會將此cacheHandler設定到kinds陣列中存下來。 下面我們看一下cacheHandler的定義: ```go type cacheHandler struct { c *controller schema collection.Schema informer cache.SharedIndexInformer handlers []func(model.Config, model.Config, model.Event) } ``` cacheHandler在上面初始化的時候,會傳入對應的controller、Schema、informer,然後在呼叫configController的RegisterEventHandler方法的時候會初始化對應的configHandler。 configController的RegisterEventHandler方法會在初始化DiscoveryService的時候呼叫initEventHandlers方法進行初始化: ```go func (s *Server) initEventHandlers() error { ... if s.configController != nil { configHandler := func(old, curr model.Config, _ model.Event) { pushReq := &model.PushRequest{ Full: true, ConfigTypesUpdated: map[resource.GroupVersionKind]struct{}{curr.GroupVersionKind(): {}}, Reason: []model.TriggerReason{model.ConfigUpdate}, } s.EnvoyXdsServer.ConfigUpdate(pushReq) } //遍歷所有的資源 for _, schema := range collections.Pilot.All() { // This resource type was handled in external/servicediscovery.go, no need to rehandle here. //ServiceEntry 這個資源不在這裡註冊,感興趣的朋友可以自己找一下 if schema.Resource().GroupVersionKind() == collections.IstioNetworkingV1Alpha3Serviceentries. Resource().GroupVersionKind() { continue } //註冊configHandler到configController中 s.configController.RegisterEventHandler(schema.Resource().GroupVersionKind(), configHandler) } } return nil } ``` initEventHandlers會呼叫collections.Pilot.All方法獲取所有的資源配置,然後遍歷呼叫RegisterEventHandler方法將configHandler函式註冊到cacheHandler的handlers中,至於configHandler函式做了什麼,我們到下一篇講XdsServer的時候再講。 這一部分的程式碼是比較繞的,這裡畫個圖理解一下吧。 ![Group2](https://img.luozhiyun.com/20201128180706.png) 整個執行流程為: ![configserver (1)](https://img.luozhiyun.com/20201128180703.png) ## 總結 至此,ConfigController的核心原理及工作流程就介紹完畢了。本篇主要講解了我們常用的Istio的Gateway、DestinationRule及VirtualService等配置是如何被Istio監聽到並作出相應改變的。希望大家能有所收穫。 ## Reference https://ruofeng.me/2018/11/08/how-does-istio-pilot-push-eds-config/ https://zhaohuabing.com/post/2019-10-21-pilot-discovery-code-analysi https://www.servicemesher.com/blog/envoy-proxy-config-deep-dive/ https://www.cnblogs.com/163yun/p/8962