2.深入Istio:Pilot服務發現
轉載請宣告出處哦~,本篇文章釋出於luozhiyun的部落格:https://www.luozhiyun.com
本文使用的Istio原始碼是 release 1.5。
介紹
pilot-discovery是在Pilot中的核心服務,在Pilot中名為pilot-discovery,主要功能是從註冊中心(如 kubernetes 或者 consul)獲取資訊並彙集,從 Kubernetes API Server 中獲取流量規則,並將服務資訊和流量規則轉化為資料面可以理解的格式,通過標準的資料面 API 下發到網格中的各個SideCar中。
pilot-discovery包含了服務發現、配置規則發現、xDS配置下發。總體上打算分三篇來進行講解,這一篇主要看看服務發現部分的實現。文章中有涉及xDS協議的一些東西,大家可以看看這篇文章:
Pilot服務發現指通過監聽底層平臺的服務註冊中心來快取Istio服務模型,並且監視服務模型的變化,再服務模型更新時觸發相關事件回撥處理函式的執行。
服務發現工作機制
Pilot初始化
discoveryCmd = &cobra.Command{ Use: "discovery", Short: "Start Istio proxy discovery service.", Args: cobra.ExactArgs(0), RunE: func(c *cobra.Command, args []string) error { ... //日誌配置 if err := log.Configure(loggingOptions); err != nil { return err } ... stop := make(chan struct{}) // 建立xDs伺服器 discoveryServer, err := bootstrap.NewServer(&serverArgs) if err != nil { return fmt.Errorf("failed to create discovery service: %v", err) } // 啟動伺服器 if err := discoveryServer.Start(stop); err != nil { return fmt.Errorf("failed to start discovery service: %v", err) } //等待程序推出 cmd.WaitSignal(stop) discoveryServer.WaitUntilCompletion() return nil }, }
Pilot服務在初始化的時候首先會初始化日誌配置,然後建立xDs伺服器,這裡的xDs指的是x Discovery Service的意思,x代表了一系列的元件如:Cluster、Endpoint、Listener、Route 等。
func NewServer(args *PilotArgs) (*Server, error) { args.Default() e := &model.Environment{ ServiceDiscovery: aggregate.NewController(), PushContext: model.NewPushContext(), } s := &Server{ basePort: args.BasePort, clusterID: getClusterID(args), environment: e, EnvoyXdsServer: envoyv2.NewDiscoveryServer(e, args.Plugins), forceStop: args.ForceStop, mux: http.NewServeMux(), } // 初始化處理Istio Config的控制器 if err := s.initConfigController(args); err != nil { return nil, fmt.Errorf("config controller: %v", err) } // 初始化處理Service Discovery的控制器 if err := s.initServiceControllers(args); err != nil { return nil, fmt.Errorf("service controllers: %v", err) } ... //初始化xDS服務端 if err := s.initDiscoveryService(args); err != nil { return nil, fmt.Errorf("discovery service: %v", err) } ... // Webhook 回撥服務 if err := s.initHTTPSWebhookServer(args); err != nil { return nil, fmt.Errorf("injectionWebhook server: %v", err) } //sidecar注入相關 if err := s.initSidecarInjector(args); err != nil { return nil, fmt.Errorf("sidecar injector: %v", err) } ... return s, nil }
NewServer方法裡面初始化了很多模組,這裡挑相關的看看initConfigController是和配置服務相關的,我們之後再看,這裡我們主要看initServiceControllers。
ServiceControllers
服務發現的主要邏輯在Pilot中由ServiceController(服務控制器)實現,通過監聽底層平臺的服務註冊中心來快取Istio服務模型,並監視服務模型的變化,在服務模型更新時觸發相關事件回撥處理函式的執行。
初始化
Controller的初始化執行流程很簡單,這裡用一張圖來描述,initServiceControllers方法最後會呼叫到NewController方法來進行初始化。
func NewController(client kubernetes.Interface, options Options) *Controller {
log.Infof("Service controller watching namespace %q for services, endpoints, nodes and pods, refresh %s",
options.WatchedNamespace, options.ResyncPeriod)
// The queue requires a time duration for a retry delay after a handler error
// 初始化Controller
c := &Controller{
domainSuffix: options.DomainSuffix,
client: client,
//控制器任務佇列
queue: queue.NewQueue(1 * time.Second),
clusterID: options.ClusterID,
xdsUpdater: options.XDSUpdater,
servicesMap: make(map[host.Name]*model.Service),
externalNameSvcInstanceMap: make(map[host.Name][]*model.ServiceInstance),
networksWatcher: options.NetworksWatcher,
metrics: options.Metrics,
}
//獲取informer
sharedInformers := informers.NewSharedInformerFactoryWithOptions(client, options.ResyncPeriod, informers.WithNamespace(options.WatchedNamespace))
//註冊 informer處理器
c.services = sharedInformers.Core().V1().Services().Informer()
//Services Handler
registerHandlers(c.services, c.queue, "Services", c.onServiceEvent)
//endpoints Handler
switch options.EndpointMode {
case EndpointsOnly:
c.endpoints = newEndpointsController(c, sharedInformers)
case EndpointSliceOnly:
c.endpoints = newEndpointSliceController(c, sharedInformers)
}
//Nodes Handler
c.nodes = sharedInformers.Core().V1().Nodes().Informer()
registerHandlers(c.nodes, c.queue, "Nodes", c.onNodeEvent)
podInformer := sharedInformers.Core().V1().Pods().Informer()
c.pods = newPodCache(podInformer, c)
//Pods Handler
registerHandlers(podInformer, c.queue, "Pods", c.pods.onEvent)
return c
}
NewController方法裡面首先是初始化Controller,然後獲取informer後分別註冊Services Handler、endpoints Handler、Nodes Handler、Pods Handler。
核心功能就是監聽k8s相關資源(Service、Endpoint、Pod、Node)的更新事件,執行相應的事件處理回撥函式。
這裡的Controller結構體實現了Controller介面:
type Controller interface {
// AppendServiceHandler notifies about changes to the service catalog.
AppendServiceHandler(f func(*Service, Event)) error
// AppendInstanceHandler notifies about changes to the service instances
// for a service.
AppendInstanceHandler(f func(*ServiceInstance, Event)) error
// Run until a signal is received
Run(stop <-chan struct{})
}
再註冊完畢後會呼叫其Run方法非同步執行。
//非同步呼叫Run方法
go serviceControllers.Run(stop)
//run方法裡面會遍歷GetRegistries列表,並非同步執行其Run方法
func (c *Controller) Run(stop <-chan struct{}) {
for _, r := range c.GetRegistries() {
go r.Run(stop)
}
<-stop
log.Info("Registry Aggregator terminated")
}
到這裡ServiceController為四種資源分別建立了一個監聽器,用於監聽K8s的資源更新,並註冊EventHandler。
Service處理器
func (c *Controller) onServiceEvent(curr interface{}, event model.Event) error {
if err := c.checkReadyForEvents(); err != nil {
return err
}
svc, ok := curr.(*v1.Service)
if !ok {
tombstone, ok := curr.(cache.DeletedFinalStateUnknown)
if !ok {
log.Errorf("Couldn't get object from tombstone %#v", curr)
return nil
}
svc, ok = tombstone.Obj.(*v1.Service)
if !ok {
log.Errorf("Tombstone contained object that is not a service %#v", curr)
return nil
}
}
log.Debugf("Handle event %s for service %s in namespace %s", event, svc.Name, svc.Namespace)
//將k8s service 轉換成 istio service
svcConv := kube.ConvertService(*svc, c.domainSuffix, c.clusterID)
//根據事件型別處理事件
switch event {
//刪除事件
case model.EventDelete:
c.Lock()
delete(c.servicesMap, svcConv.Hostname)
delete(c.externalNameSvcInstanceMap, svcConv.Hostname)
c.Unlock()
// EDS needs to just know when service is deleted.
//更新服務快取
c.xdsUpdater.SvcUpdate(c.clusterID, svc.Name, svc.Namespace, event)
default:
// instance conversion is only required when service is added/updated.
instances := kube.ExternalNameServiceInstances(*svc, svcConv)
c.Lock()
c.servicesMap[svcConv.Hostname] = svcConv
if instances == nil {
delete(c.externalNameSvcInstanceMap, svcConv.Hostname)
} else {
c.externalNameSvcInstanceMap[svcConv.Hostname] = instances
}
c.Unlock()
//更新服務快取
c.xdsUpdater.SvcUpdate(c.clusterID, svc.Name, svc.Namespace, event)
}
// Notify service handlers.
// 觸發XDS事件處理器
for _, f := range c.serviceHandlers {
f(svcConv, event)
}
return nil
}
Service事件處理器會將根據事件的型別更新快取,然後呼叫serviceHandlers的事件處理器進行回撥。serviceHandlers事件處理器是在初始化DiscoveryService的時候設定的。
serviceHandler := func(svc *model.Service, _ model.Event) {
pushReq := &model.PushRequest{
Full: true,
NamespacesUpdated: map[string]struct{}{svc.Attributes.Namespace: {}},
ConfigTypesUpdated: map[resource.GroupVersionKind]struct{}{collections.IstioNetworkingV1Alpha3Serviceentries.Resource().GroupVersionKind(): {}},
Reason: []model.TriggerReason{model.ServiceUpdate},
}
//配置更新
s.EnvoyXdsServer.ConfigUpdate(pushReq)
}
Endpoint處理器
Endpoint處理器會在呼叫newEndpointsController建立endpointsController的時候進行註冊
func newEndpointsController(c *Controller, sharedInformers informers.SharedInformerFactory) *endpointsController {
informer := sharedInformers.Core().V1().Endpoints().Informer()
out := &endpointsController{
kubeEndpoints: kubeEndpoints{
c: c,
informer: informer,
},
}
//註冊處理器
out.registerEndpointsHandler()
return out
}
在回撥的時候會呼叫到endpointsController的onEvent方法:
func (e *endpointsController) onEvent(curr interface{}, event model.Event) error {
...
return e.handleEvent(ep.Name, ep.Namespace, event, curr, func(obj interface{}, event model.Event) {
ep := obj.(*v1.Endpoints)
//EDS更新處理
e.c.updateEDS(ep, event)
})
}
這裡會呼叫updateEDS進行EDS(Endpoint Discovery service)更新處理。
func (c *Controller) updateEDS(ep *v1.Endpoints, event model.Event) {
hostname := kube.ServiceHostname(ep.Name, ep.Namespace, c.domainSuffix)
endpoints := make([]*model.IstioEndpoint, 0)
if event != model.EventDelete {
for _, ss := range ep.Subsets {
for _, ea := range ss.Addresses {
//獲取Endpoint對應的Pod例項
pod := c.pods.getPodByIP(ea.IP)
...
// 將Endpoint轉換成Istio模型IstioEndpoint
for _, port := range ss.Ports {
endpoints = append(endpoints, &model.IstioEndpoint{
Address: ea.IP,
EndpointPort: uint32(port.Port),
ServicePortName: port.Name,
Labels: labelMap,
UID: uid,
ServiceAccount: sa,
Network: c.endpointNetwork(ea.IP),
Locality: locality,
Attributes: model.ServiceAttributes{Name: ep.Name, Namespace: ep.Namespace},
TLSMode: tlsMode,
})
}
}
}
}
//使用xdsUpdater更新EDS
_ = c.xdsUpdater.EDSUpdate(c.clusterID, string(hostname), ep.Namespace, endpoints)
}
在這裡會重新封裝endpoints然後呼叫EDSUpdate進行更新。
func (s *DiscoveryServer) EDSUpdate(clusterID, serviceName string, namespace string,
istioEndpoints []*model.IstioEndpoint) error {
inboundEDSUpdates.Increment()
s.edsUpdate(clusterID, serviceName, namespace, istioEndpoints, false)
return nil
}
func (s *DiscoveryServer) edsUpdate(clusterID, serviceName string, namespace string,
istioEndpoints []*model.IstioEndpoint, internal bool) {
s.mutex.Lock()
defer s.mutex.Unlock()
requireFull := false
...
//找到之前快取的服務
if _, f := s.EndpointShardsByService[serviceName]; !f {
s.EndpointShardsByService[serviceName] = map[string]*EndpointShards{}
}
ep, f := s.EndpointShardsByService[serviceName][namespace]
//不存在則初始化
if !f {
ep = &EndpointShards{
Shards: map[string][]*model.IstioEndpoint{},
ServiceAccounts: map[string]bool{},
}
s.EndpointShardsByService[serviceName][namespace] = ep
if !internal {
adsLog.Infof("Full push, new service %s", serviceName)
requireFull = true
}
}
...
ep.mutex.Lock()
ep.Shards[clusterID] = istioEndpoints
ep.ServiceAccounts = serviceAccounts
ep.mutex.Unlock()
if !internal {
var edsUpdates map[string]struct{}
if !requireFull {
edsUpdates = map[string]struct{}{serviceName: {}}
}
//配置更新
s.ConfigUpdate(&model.PushRequest{
Full: requireFull,
NamespacesUpdated: map[string]struct{}{namespace: {}},
ConfigTypesUpdated: map[resource.GroupVersionKind]struct{}{collections.IstioNetworkingV1Alpha3Serviceentries.Resource().GroupVersionKind(): {}},
EdsUpdates: edsUpdates,
Reason: []model.TriggerReason{model.EndpointUpdate},
})
}
}
edsUpdate方法裡面實際上就是做了兩件事,一是更新快取,二是呼叫ConfigUpdate進行配置更新。
ConfigUpdate資源更新實際上就是通過事件分發執行xDS分發,這塊的細節我們稍後再講。
總結
通過這篇我們掌握了服務發現是通過k8s的Informer來註冊監聽Service、EndPoint、nodes、pods等資源的更新事件,然後通過事件驅動模型執行回撥函式,再呼叫xDS的ConfigUpdate來執行非同步更新配置的操作。
Reference
https://www.servicemesher.com/blog/istio-analysis-4/
https://www.cnblogs.com/163yun/p/8962278.html
https://www.servicemesher.com/blog/envoy-proxy-config-deep-dive/