1. 程式人生 > 實用技巧 >2.深入Istio:Pilot服務發現

2.深入Istio:Pilot服務發現

轉載請宣告出處哦~,本篇文章釋出於luozhiyun的部落格:https://www.luozhiyun.com

本文使用的Istio原始碼是 release 1.5。

介紹

pilot-discovery是在Pilot中的核心服務,在Pilot中名為pilot-discovery,主要功能是從註冊中心(如 kubernetes 或者 consul)獲取資訊並彙集,從 Kubernetes API Server 中獲取流量規則,並將服務資訊和流量規則轉化為資料面可以理解的格式,通過標準的資料面 API 下發到網格中的各個SideCar中。

pilot-discovery包含了服務發現、配置規則發現、xDS配置下發。總體上打算分三篇來進行講解,這一篇主要看看服務發現部分的實現。文章中有涉及xDS協議的一些東西,大家可以看看這篇文章:

深入解讀Service Mesh背後的技術細節

Pilot服務發現指通過監聽底層平臺的服務註冊中心來快取Istio服務模型,並且監視服務模型的變化,再服務模型更新時觸發相關事件回撥處理函式的執行。

服務發現工作機制

Pilot初始化

	discoveryCmd = &cobra.Command{
		Use:   "discovery",
		Short: "Start Istio proxy discovery service.",
		Args:  cobra.ExactArgs(0),
		RunE: func(c *cobra.Command, args []string) error {
			...
			//日誌配置
			if err := log.Configure(loggingOptions); err != nil {
				return err
			} 
			... 
			stop := make(chan struct{})
 
			// 建立xDs伺服器
			discoveryServer, err := bootstrap.NewServer(&serverArgs)
			if err != nil {
				return fmt.Errorf("failed to create discovery service: %v", err)
			} 
			// 啟動伺服器
			if err := discoveryServer.Start(stop); err != nil {
				return fmt.Errorf("failed to start discovery service: %v", err)
			}
			//等待程序推出
			cmd.WaitSignal(stop) 
			discoveryServer.WaitUntilCompletion()
			return nil
		},
	}

Pilot服務在初始化的時候首先會初始化日誌配置,然後建立xDs伺服器,這裡的xDs指的是x Discovery Service的意思,x代表了一系列的元件如:Cluster、Endpoint、Listener、Route 等。

func NewServer(args *PilotArgs) (*Server, error) {
	 
	args.Default()
	e := &model.Environment{
		ServiceDiscovery: aggregate.NewController(),
		PushContext:      model.NewPushContext(),
	}

	s := &Server{
		basePort:       args.BasePort,
		clusterID:      getClusterID(args),
		environment:    e,
		EnvoyXdsServer: envoyv2.NewDiscoveryServer(e, args.Plugins),
		forceStop:      args.ForceStop,
		mux:            http.NewServeMux(),
	}
 
	// 初始化處理Istio Config的控制器
	if err := s.initConfigController(args); err != nil {
		return nil, fmt.Errorf("config controller: %v", err)
	}
	// 初始化處理Service Discovery的控制器
	if err := s.initServiceControllers(args); err != nil {
		return nil, fmt.Errorf("service controllers: %v", err)
	} 
	... 
	//初始化xDS服務端
	if err := s.initDiscoveryService(args); err != nil {
		return nil, fmt.Errorf("discovery service: %v", err)
	}
	... 
	// Webhook 回撥服務
	if err := s.initHTTPSWebhookServer(args); err != nil {
		return nil, fmt.Errorf("injectionWebhook server: %v", err)
	}
    //sidecar注入相關
	if err := s.initSidecarInjector(args); err != nil {
		return nil, fmt.Errorf("sidecar injector: %v", err)
	}

	... 
	return s, nil
}

NewServer方法裡面初始化了很多模組,這裡挑相關的看看initConfigController是和配置服務相關的,我們之後再看,這裡我們主要看initServiceControllers。

ServiceControllers

服務發現的主要邏輯在Pilot中由ServiceController(服務控制器)實現,通過監聽底層平臺的服務註冊中心來快取Istio服務模型,並監視服務模型的變化,在服務模型更新時觸發相關事件回撥處理函式的執行。

初始化

Controller的初始化執行流程很簡單,這裡用一張圖來描述,initServiceControllers方法最後會呼叫到NewController方法來進行初始化。

func NewController(client kubernetes.Interface, options Options) *Controller {
	log.Infof("Service controller watching namespace %q for services, endpoints, nodes and pods, refresh %s",
		options.WatchedNamespace, options.ResyncPeriod)

	// The queue requires a time duration for a retry delay after a handler error
	// 初始化Controller
	c := &Controller{
		domainSuffix:               options.DomainSuffix,
		client:                     client,
		//控制器任務佇列
		queue:                      queue.NewQueue(1 * time.Second),
		clusterID:                  options.ClusterID,
		xdsUpdater:                 options.XDSUpdater,
		servicesMap:                make(map[host.Name]*model.Service),
		externalNameSvcInstanceMap: make(map[host.Name][]*model.ServiceInstance),
		networksWatcher:            options.NetworksWatcher,
		metrics:                    options.Metrics,
	}
	//獲取informer
	sharedInformers := informers.NewSharedInformerFactoryWithOptions(client, options.ResyncPeriod, informers.WithNamespace(options.WatchedNamespace))
	//註冊 informer處理器
	c.services = sharedInformers.Core().V1().Services().Informer()
	//Services Handler
	registerHandlers(c.services, c.queue, "Services", c.onServiceEvent)
	//endpoints Handler
	switch options.EndpointMode {
	case EndpointsOnly:
		c.endpoints = newEndpointsController(c, sharedInformers)
	case EndpointSliceOnly:
		c.endpoints = newEndpointSliceController(c, sharedInformers)
	}
	//Nodes Handler
	c.nodes = sharedInformers.Core().V1().Nodes().Informer()
	registerHandlers(c.nodes, c.queue, "Nodes", c.onNodeEvent)

	podInformer := sharedInformers.Core().V1().Pods().Informer()
	c.pods = newPodCache(podInformer, c)
	//Pods Handler
	registerHandlers(podInformer, c.queue, "Pods", c.pods.onEvent)

	return c
}

NewController方法裡面首先是初始化Controller,然後獲取informer後分別註冊Services Handler、endpoints Handler、Nodes Handler、Pods Handler。

核心功能就是監聽k8s相關資源(Service、Endpoint、Pod、Node)的更新事件,執行相應的事件處理回撥函式。

這裡的Controller結構體實現了Controller介面:

type Controller interface {
	// AppendServiceHandler notifies about changes to the service catalog.
	AppendServiceHandler(f func(*Service, Event)) error

	// AppendInstanceHandler notifies about changes to the service instances
	// for a service.
	AppendInstanceHandler(f func(*ServiceInstance, Event)) error

	// Run until a signal is received
	Run(stop <-chan struct{})
}

再註冊完畢後會呼叫其Run方法非同步執行。

//非同步呼叫Run方法
go serviceControllers.Run(stop)
//run方法裡面會遍歷GetRegistries列表,並非同步執行其Run方法
func (c *Controller) Run(stop <-chan struct{}) {

	for _, r := range c.GetRegistries() {
		go r.Run(stop)
	}

	<-stop
	log.Info("Registry Aggregator terminated")
}

到這裡ServiceController為四種資源分別建立了一個監聽器,用於監聽K8s的資源更新,並註冊EventHandler。

Service處理器

func (c *Controller) onServiceEvent(curr interface{}, event model.Event) error {
	if err := c.checkReadyForEvents(); err != nil {
		return err
	}

	svc, ok := curr.(*v1.Service)
	if !ok {
		tombstone, ok := curr.(cache.DeletedFinalStateUnknown)
		if !ok {
			log.Errorf("Couldn't get object from tombstone %#v", curr)
			return nil
		}
		svc, ok = tombstone.Obj.(*v1.Service)
		if !ok {
			log.Errorf("Tombstone contained object that is not a service %#v", curr)
			return nil
		}
	}

	log.Debugf("Handle event %s for service %s in namespace %s", event, svc.Name, svc.Namespace)
	//將k8s service 轉換成 istio service
	svcConv := kube.ConvertService(*svc, c.domainSuffix, c.clusterID)
	//根據事件型別處理事件
	switch event {
	//刪除事件
	case model.EventDelete:
		c.Lock()
		delete(c.servicesMap, svcConv.Hostname)
		delete(c.externalNameSvcInstanceMap, svcConv.Hostname)
		c.Unlock()
		// EDS needs to just know when service is deleted.
		//更新服務快取
		c.xdsUpdater.SvcUpdate(c.clusterID, svc.Name, svc.Namespace, event)
	default:
		// instance conversion is only required when service is added/updated.
		instances := kube.ExternalNameServiceInstances(*svc, svcConv)
		c.Lock()
		c.servicesMap[svcConv.Hostname] = svcConv
		if instances == nil {
			delete(c.externalNameSvcInstanceMap, svcConv.Hostname)
		} else {
			c.externalNameSvcInstanceMap[svcConv.Hostname] = instances
		}
		c.Unlock()
		//更新服務快取
		c.xdsUpdater.SvcUpdate(c.clusterID, svc.Name, svc.Namespace, event)
	}

	// Notify service handlers.
	// 觸發XDS事件處理器
	for _, f := range c.serviceHandlers {
		f(svcConv, event)
	}

	return nil
}

Service事件處理器會將根據事件的型別更新快取,然後呼叫serviceHandlers的事件處理器進行回撥。serviceHandlers事件處理器是在初始化DiscoveryService的時候設定的。

serviceHandler := func(svc *model.Service, _ model.Event) {
		pushReq := &model.PushRequest{
			Full:               true,
			NamespacesUpdated:  map[string]struct{}{svc.Attributes.Namespace: {}},
			ConfigTypesUpdated: map[resource.GroupVersionKind]struct{}{collections.IstioNetworkingV1Alpha3Serviceentries.Resource().GroupVersionKind(): {}},
			Reason:             []model.TriggerReason{model.ServiceUpdate},
		}
        //配置更新
		s.EnvoyXdsServer.ConfigUpdate(pushReq)
	}

Endpoint處理器

Endpoint處理器會在呼叫newEndpointsController建立endpointsController的時候進行註冊

func newEndpointsController(c *Controller, sharedInformers informers.SharedInformerFactory) *endpointsController {
	informer := sharedInformers.Core().V1().Endpoints().Informer()
	out := &endpointsController{
		kubeEndpoints: kubeEndpoints{
			c:        c,
			informer: informer,
		},
	}
	//註冊處理器
	out.registerEndpointsHandler()
	return out
}

在回撥的時候會呼叫到endpointsController的onEvent方法:

func (e *endpointsController) onEvent(curr interface{}, event model.Event) error {
	... 
	return e.handleEvent(ep.Name, ep.Namespace, event, curr, func(obj interface{}, event model.Event) {
		ep := obj.(*v1.Endpoints)
		//EDS更新處理
		e.c.updateEDS(ep, event)
	})
}

這裡會呼叫updateEDS進行EDS(Endpoint Discovery service)更新處理。

func (c *Controller) updateEDS(ep *v1.Endpoints, event model.Event) {
	hostname := kube.ServiceHostname(ep.Name, ep.Namespace, c.domainSuffix)

	endpoints := make([]*model.IstioEndpoint, 0)
	if event != model.EventDelete {
		for _, ss := range ep.Subsets {
			for _, ea := range ss.Addresses {
				//獲取Endpoint對應的Pod例項
				pod := c.pods.getPodByIP(ea.IP)
				...   
				// 將Endpoint轉換成Istio模型IstioEndpoint
				for _, port := range ss.Ports {
					endpoints = append(endpoints, &model.IstioEndpoint{
						Address:         ea.IP,
						EndpointPort:    uint32(port.Port),
						ServicePortName: port.Name,
						Labels:          labelMap,
						UID:             uid,
						ServiceAccount:  sa,
						Network:         c.endpointNetwork(ea.IP),
						Locality:        locality,
						Attributes:      model.ServiceAttributes{Name: ep.Name, Namespace: ep.Namespace},
						TLSMode:         tlsMode,
					})
				}
			}
		}
	} 
	//使用xdsUpdater更新EDS
	_ = c.xdsUpdater.EDSUpdate(c.clusterID, string(hostname), ep.Namespace, endpoints)
}

在這裡會重新封裝endpoints然後呼叫EDSUpdate進行更新。

func (s *DiscoveryServer) EDSUpdate(clusterID, serviceName string, namespace string,
	istioEndpoints []*model.IstioEndpoint) error {
	inboundEDSUpdates.Increment()
	s.edsUpdate(clusterID, serviceName, namespace, istioEndpoints, false)
	return nil
}

func (s *DiscoveryServer) edsUpdate(clusterID, serviceName string, namespace string,
	istioEndpoints []*model.IstioEndpoint, internal bool) { 
	s.mutex.Lock()
	defer s.mutex.Unlock()
	requireFull := false
 
	...
	//找到之前快取的服務
	if _, f := s.EndpointShardsByService[serviceName]; !f { 
		s.EndpointShardsByService[serviceName] = map[string]*EndpointShards{}
	}
	ep, f := s.EndpointShardsByService[serviceName][namespace]
	//不存在則初始化
	if !f { 
		ep = &EndpointShards{
			Shards:          map[string][]*model.IstioEndpoint{},
			ServiceAccounts: map[string]bool{},
		}
		s.EndpointShardsByService[serviceName][namespace] = ep
		if !internal {
			adsLog.Infof("Full push, new service %s", serviceName)
			requireFull = true
		}
	} 
	... 
	ep.mutex.Lock()
	ep.Shards[clusterID] = istioEndpoints
	ep.ServiceAccounts = serviceAccounts
	ep.mutex.Unlock() 
	
	if !internal {
		var edsUpdates map[string]struct{}
		if !requireFull {
			edsUpdates = map[string]struct{}{serviceName: {}}
		}
		//配置更新
		s.ConfigUpdate(&model.PushRequest{
			Full:               requireFull,
			NamespacesUpdated:  map[string]struct{}{namespace: {}},
			ConfigTypesUpdated: map[resource.GroupVersionKind]struct{}{collections.IstioNetworkingV1Alpha3Serviceentries.Resource().GroupVersionKind(): {}},
			EdsUpdates:         edsUpdates,
			Reason:             []model.TriggerReason{model.EndpointUpdate},
		})
	}
}

edsUpdate方法裡面實際上就是做了兩件事,一是更新快取,二是呼叫ConfigUpdate進行配置更新。

ConfigUpdate資源更新實際上就是通過事件分發執行xDS分發,這塊的細節我們稍後再講。

總結

通過這篇我們掌握了服務發現是通過k8s的Informer來註冊監聽Service、EndPoint、nodes、pods等資源的更新事件,然後通過事件驅動模型執行回撥函式,再呼叫xDS的ConfigUpdate來執行非同步更新配置的操作。

Reference

https://www.servicemesher.com/blog/istio-analysis-4/

https://www.cnblogs.com/163yun/p/8962278.html

https://www.servicemesher.com/blog/envoy-proxy-config-deep-dive/