4.深入Istio原始碼:Pilot的Discovery Server如何執行xDS非同步分發
阿新 • • 發佈:2020-12-05
> 轉載請宣告出處哦~,本篇文章釋出於luozhiyun的部落格:https://www.luozhiyun.com
>
> 本文使用的Istio原始碼是 release 1.5。
## 介紹
Discovery Service主要為資料面(執行在 sidecar 中的 Envoy 等 proxy 元件)提供控制資訊服務。Discovery Service為資料面提供的資訊叫做xds ,這裡的 `x` 是一個代詞,在 Istio 中,xds 包括 `cds`(cluster discovery service)、`lds`(listener discovery service)、`rds`(route discovery service)、`eds`(endpoint discovery service),而 `ads`(aggregated discovery service) 是對這些服務的一個統一封裝。
Discovery Service中主要包含下述邏輯:
* 啟動GRPC Server並接收來自Envoy端的連線請求;
* 接收Envoy端的xDS請求,從Config Controller和Service Controller中獲取配置和服務資訊,生成響應訊息傳送給Envoy;
* 監聽來自Config Controller的配置變化和來自Service Controller的服務變化訊息,並將配置和服務變化內容通過xDS介面推送到Envoy。
## Discovery Service初始化
![Blank diagram](https://img.luozhiyun.com/20201205120241.png)
從上面的流程圖可以知道,在呼叫NewServer建立XdsServer的時候會做很多初始化的工作。如初始化Pilot Server、網格初始化、初始化Istio Config的控制器、初始化Service Discovery的控制器等。我們下面列出和Discovery Service初始化相關的程式碼:
```go
func NewServer(args *PilotArgs) (*Server, error) {
//建立Pilot Server
s := &Server{
basePort: args.BasePort,
clusterID: getClusterID(args),
environment: e,
//初始化XdsServer
EnvoyXdsServer: envoyv2.NewDiscoveryServer(e, args.Plugins),
forceStop: args.ForceStop,
mux: http.NewServeMux(),
}
...
//初始化xDS服務端
if err := s.initDiscoveryService(args); err != nil {
return nil, fmt.Errorf("discovery service: %v", err)
}
...
}
```
從上面的程式碼可以看出XdsServer是通過呼叫NewDiscoveryServer方法來進行初始化的,返回的是一個DiscoveryServer例項,具體欄位的使用會在後面說到。
```go
type DiscoveryServer struct {
...
// Endpoint 的快取,以服務名和 namespace 作為索引,主要用於 EDS 更新
EndpointShardsByService map[string]map[string]*EndpointShards
//統一接收其他元件發來的 PushRequest 的 channel
pushChannel chan *model.PushRequest
updateMutex sync.RWMutex
//pushQueue 主要是在真正 xDS 推送前做防抖快取
pushQueue *PushQueue
}
```
建立完Server後會呼叫initDiscoveryService方法:
```go
func (s *Server) initDiscoveryService(args *PilotArgs) error {
...
//初始化Service Controller和Config Controller的Handler,用於informer回撥
if err := s.initEventHandlers(); err != nil {
return err
}
...
// 會在初始化完畢之後呼叫Start方法,啟動XdsServer
s.addStartFunc(func(stop <-chan struct{}) error {
s.EnvoyXdsServer.Start(stop)
return nil
})
//初始化Grpc Server服務,並註冊到XdsServer中
s.initGrpcServer(args.KeepaliveOptions)
s.httpServer = &http.Server{
Addr: args.DiscoveryOptions.HTTPAddr,
Handler: s.mux,
}
...
}
```
這個方法主要做了這麼幾件事:
1. 初始化各種回撥處理器;
2. 將XdsServer啟動函式新增到Server的startFuncs佇列中,會在初始化完畢之後呼叫;
3. 呼叫initGrpcServer方法初始化Grpc Server服務,並註冊到XdsServer中。
在初始化 grpcServer 的時候,呼叫了 `DiscoveryServer.Register()` 方法,向 grpcServer 註冊了以下幾個服務:
```go
func (s *DiscoveryServer) Register(rpcs *grpc.Server) {
//註冊的時候傳入grpc server 和 DiscoveryServer
ads.RegisterAggregatedDiscoveryServiceServer(rpcs, s)
}
```
DiscoveryServer實際上實現了AggregatedDiscoveryServiceServer介面:
```go
type AggregatedDiscoveryServiceServer interface {
// 全量 ADS Stream 介面
StreamAggregatedResources(AggregatedDiscoveryService_StreamAggregatedResourcesServer) error
// 增量 ADS Stream 介面
DeltaAggregatedResources(AggregatedDiscoveryService_DeltaAggregatedResourcesServer) error
}
}
```
StreamAggregatedResources 接收DiscoveryRequest ,返回 DiscoveryResponse 流,包含全量的 xDS 資料,DeltaAggregatedResources方法目前沒有具體實現。
大致呼叫流程如下:
![Group 4](https://img.luozhiyun.com/20201205120247.png)
## Discovery Service啟動
![dsServerStart](https://img.luozhiyun.com/20201205120250.png)
discoveryServer.Start方法還是在pilot discovery的main方法中被呼叫。main方法會在呼叫完bootstrap.NewServer方法後,進行Start方法的呼叫:
```go
discoveryCmd = &cobra.Command{
...
RunE: func(c *cobra.Command, args []string) error {
...
stop := make(chan struct{})
// 建立xDs伺服器
discoveryServer, err := bootstrap.NewServer(&serverArgs)
if err != nil {
return fmt.Errorf("failed to create discovery service: %v", err)
}
// 啟動伺服器
if err := discoveryServer.Start(stop); err != nil {
return fmt.Errorf("failed to start discovery service: %v", err)
}
...
return nil
},
}
```
在呼叫Start方法的時候會獲取到Server的startFuncs集合,然後依次執行裡面設定的函式:
```go
func (s *Server) Start(stop <-chan struct{}) error {
// Now start all of the components.
for _, fn := range s.startFuncs {
if err := fn(stop); err != nil {
return err
}
}
...
}
```
遍歷呼叫完畢後會分別啟動server Controller和config Controller的run函式,以及呼叫xdsServer的Start方法,Start方法主要分別啟動了三個執行緒:
```go
func (s *DiscoveryServer) Start(stopCh <-chan struct{}) {
adsLog.Infof("Starting ADS server")
go s.handleUpdates(stopCh)
go s.periodicRefreshMetrics(stopCh)
go s.sendPushes(stopCh)
}
```
比較重要的是handleUpdates方法和sendPushes方法。
handleUpdates方法主要是處理 pushChannel 中收到的推送請求,最後會呼叫startPush將資料推入到DiscoveryServer的pushQueue管道中;sendPushes方法則是獲取pushQueue管道中的資料,封裝成XdsEvent推入到XdsConnection的pushChannel進行非同步處理。
### handleUpdates
```go
func (s *DiscoveryServer) handleUpdates(stopCh <-chan struct{}) {
debounce(s.pushChannel, stopCh, s.Push)
}
func debounce(ch chan *model.PushRequest, stopCh <-chan struct{}, pushFn func(req *model.PushRequest)) {
var timeChan <-chan time.Time
var startDebounce time.Time
var lastConfigUpdateTime time.Time
pushCounter := 0
debouncedEvents := 0
var req *model.PushRequest
free := true
freeCh := make(chan struct{}, 1)
push := func(req *model.PushRequest) {
pushFn(req)
freeCh <- struct{}{}
}
pushWorker := func() {
eventDelay := time.Since(startDebounce)
quietTime := time.Since(lastConfigUpdateTime)
// debounceMax為10s ,debounceAfter為100毫秒
//延遲時間大於等於最大延遲時間 或者 靜默時間大於等於最小靜默時間
if eventDelay > = debounceMax || quietTime >= debounceAfter {
if req != nil {
pushCounter++
adsLog.Infof("Push debounce stable[%d] %d: %v since last change, %v since last push, full=%v",
pushCounter, debouncedEvents,
quietTime, eventDelay, req.Full)
free = false
go push(req)
req = nil
debouncedEvents = 0
}
} else {
timeChan = time.After(debounceAfter - quietTime)
}
}
for {
select {
case <-freeCh:
free = true
pushWorker()
case r := <-ch:
// If reason is not set, record it as an unknown reason
if len(r.Reason) == 0 {
r.Reason = []model.TriggerReason{model.UnknownTrigger}
}
if !enableEDSDebounce && !r.Full {
// trigger push now, just for EDS
go pushFn(r)
continue
}
lastConfigUpdateTime = time.Now()
//首次進入會呼叫延時器 timeChan 先延遲一個最小靜默時間(100 毫秒)
if debouncedEvents == 0 {
timeChan = time.After(debounceAfter)
startDebounce = lastConfigUpdateTime
}
debouncedEvents++
//合併請求
req = req.Merge(r)
case <-timeChan:
if free {
pushWorker()
}
case <-stopCh:
return
}
}
}
```
handleUpdates是直接呼叫了debounce方法,並將pushChannel以及DiscoveryServer的Push函式傳入內。
debounce這個方法裡面的處理非常的有意思,我們下面來講一下它的一個執行流程:
1. 進入到這個方法的時候,pushWorker函式以及push函式都不會被立即呼叫,而是會走到一個for迴圈中,裡面有select執行語句,這個for迴圈會一直等待,直到ch有資料`case r := <-ch`被執行;
2. 首次進入到`case r := <-ch`程式碼塊的時候,debouncedEvents是等於0的,那麼會直接呼叫time.After等待debounceAfter設定的時間,也就是100毫秒,被喚醒之後會將timeChan設值,並執行合併請求;
3. 第二次迴圈的時候會執行到`case <-timeChan`這塊邏輯中,執行pushWorker函式,在函式裡面會判斷是否等待超過了最大延遲時間debounceMax(10s)或 靜默時間超過了debounceAfter(100ms),如果是的話,那麼執行push函式,呼叫pushFn進行推送,並將freeCh設定一個空的結構體;
4. 下次迴圈的時候會執行到`case <-freeCh:`這塊邏輯中,再執行下次的pushWorker操作;
push方法會一直往下呼叫,直到把資料推入到DiscoveryServer的pushQueue管道中:
![startPush](https://img.luozhiyun.com/20201205120257.png)
### send Pushes
```go
func (s *DiscoveryServer) sendPushes(stopCh <-chan struct{}) {
doSendPushes(stopCh, s.concurrentPushLimit, s.pushQueue)
}
```
sendPushes會呼叫doSendPushes方法傳入PushQueue,以及concurrentPushLimit,它是由環境變數 `PILOT_PUSH_THROTTLE` 控制的,預設為 100 。
```go
func doSendPushes(stopCh <-chan struct{}, semaphore chan struct{}, queue *PushQueue) {
for {
select {
case <-stopCh:
return
default:
// 這裡semaphore容量只有100,用來控制速率
semaphore <- struct{}{}
// Get the next proxy to push. This will block if there are no updates required.
client, info := queue.Dequeue()
recordPushTriggers(info.Reason...)
// Signals that a push is done by reading from the semaphore, allowing another send on it.
doneFunc := func() {
queue.MarkDone(client)
<-semaphore
}
proxiesQueueTime.Record(time.Since(info.Start).Seconds())
go func() {
edsUpdates := info.EdsUpdates
if info.Full {
// Setting this to nil will trigger a full push
edsUpdates = nil
}
select {
case client.pushChannel <- &XdsEvent{
push: info.Push,
edsUpdatedServices: edsUpdates,
done: doneFunc,
start: info.Start,
namespacesUpdated: info.NamespacesUpdated,
configTypesUpdated: info.ConfigTypesUpdated,
noncePrefix: info.Push.Version,
}:
return
case <-client.stream.Context().Done(): // grpc stream was closed
doneFunc()
adsLog.Infof("Client closed connection %v", client.ConID)
}
}()
}
}
}
```
在doSendPushes方法內啟動了一個無限迴圈,在default程式碼塊中實現了主要的功能邏輯。semaphore引數可以看出是用來控制速率用的,當semaphore滿了之後會阻塞。然後會啟動一個執行緒將XdsEvent初始化後放入到pushChannel中。
總體來說流程如下:
1. 從pushQueue出隊一個xdsConnection;
2. 然後初始化一個XdsEvent入隊到xdsConnection的pushChannel管道中;
![Group 3](https://img.luozhiyun.com/20201205120301.png)
這裡放入到pushChannel管道中的訊息會在StreamAggregatedResources方法中被處理:
```go
func (s *DiscoveryServer) StreamAggregatedResources(stream ads.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
...
con := newXdsConnection(peerAddr, stream)
var receiveError error
reqChannel := make(chan *xdsapi.DiscoveryRequest, 1)
//從XdsConnection中接收來自Envoy的DiscoveryRequest
go receiveThread(con, reqChannel, &receiveError)
for {
select {
//reqChannel處理部分
case discReq, ok := <-reqChannel:
...
//pushChannel處理部分
case pushEv := <-con.pushChannel:
err := s.pushConnection(con, pushEv)
pushEv.done()
if err != nil {
return nil
}
}
}
}
```
這裡總體來說分為兩部分,一個是 reqChannel的資料處理這部分稍放到Client Request中說,另一部分是pushChannel的資料處理。
在獲取到pushChannel管道的資料後會呼叫pushConnection進行處理。
```go
func (s *DiscoveryServer) pushConnection(con *XdsConnection, pushEv *XdsEvent) error {
//處理增量推送 EDS 的情況
if pushEv.edsUpdatedServices != nil {
if !ProxyNeedsPush(con.node, pushEv) {
adsLog.Debugf("Skipping EDS push to %v, no updates required", con.ConID)
return nil
}
if len(con.Clusters) > 0 {
if err := s.pushEds(pushEv.push, con, versionInfo(), pushEv.edsUpdatedServices); err != nil {
return err
}
}
return nil
}
...
currentVersion := versionInfo()
pushTypes := PushTypeFor(con.node, pushEv)
// 根據型別判斷推送型別
if con.CDSWatch && pushTypes[CDS] {
err := s.pushCds(con, pushEv.push, currentVersion)
if err != nil {
return err
}
}
if len(con.Clusters) > 0 && pushTypes[EDS] {
err := s.pushEds(pushEv.push, con, currentVersion, nil)
if err != nil {
return err
}
}
if con.LDSWatch && pushTypes[LDS] {
err := s.pushLds(con, pushEv.push, currentVersion)
if err != nil {
return err
}
}
if len(con.Routes) > 0 && pushTypes[RDS] {
err := s.pushRoute(con, pushEv.push, currentVersion)
if err != nil {
return err
}
}
proxiesConvergeDelay.Record(time.Since(pushEv.start).Seconds())
return nil
}
```
這裡會根據pushEv的型別來判斷,需要推送什麼型別的配置資訊,下面以EDS為例看一下pushEds裡面做了什麼:
```go
func (s *DiscoveryServer) pushEds(push *model.PushContext, con *XdsConnection, version string, edsUpdatedServices map[string]struct{}) error {
pushStart := time.Now()
loadAssignments := make([]*xdsapi.ClusterLoadAssignment, 0)
endpoints := 0
empty := 0
for _, clusterName := range con.Clusters {
// 構建生成器生成 EDS
l := s.generateEndpoints(clusterName, con.node, push, edsUpdatedServices)
if l == nil {
continue
}
for _, e := range l.Endpoints {
endpoints += len(e.LbEndpoints)
}
if len(l.Endpoints) == 0 {
empty++
}
loadAssignments = append(loadAssignments, l)
}
//構建DiscoveryResponse
response := endpointDiscoveryResponse(loadAssignments, version, push.Version)
//傳送響應
err := con.send(response)
edsPushTime.Record(time.Since(pushStart).Seconds())
...
return nil
}
```
pushEds裡面主要就是構建DiscoveryResponse,然後呼叫send方法傳送響應。
![Group 5](https://img.luozhiyun.com/20201205120307.png)
### Client Request
這部分的程式碼和上面的其實差不多,主要是資料的獲取是從reqChannel管道中獲取。
```go
//從XdsConnection中接收來自Envoy的DiscoveryRequest
go receiveThread(con, reqChannel, &receiveError)
for {
select {
case discReq, ok := <-reqChannel:
if !ok {
// Remote side closed connection.
return receiveError
}
// This should be only set for the first request. Guard with ID check regardless.
if discReq.Node != nil && discReq.Node.Id != "" {
if cancel, err := s.initConnection(discReq.Node, con); err != nil {
return err
} else if cancel != nil {
defer cancel()
}
}
switch discReq.TypeUrl {
case ClusterType:
...
err := s.pushCds(con, s.globalPushContext(), versionInfo())
if err != nil {
return err
}
case ListenerType:
...
err := s.pushLds(con, s.globalPushContext(), versionInfo())
if err != nil {
return err
}
case RouteType:
...
con.Routes = routes
adsLog.Debugf("ADS:RDS: REQ %s %s routes:%d", peerAddr, con.ConID, len(con.Routes))
err := s.pushRoute(con, s.globalPushContext(), versionInfo())
if err != nil {
return err
}
case EndpointType:
...
err := s.pushEds(s.globalPushContext(), con, versionInfo(), nil)
if err != nil {
return err
}
default:
adsLog.Warnf("ADS: Unknown watched resources %s", discReq.String())
}
case pushEv := <-con.pushChannel:
...
}
```
這部分會非同步掉啟動一個執行緒用來迴圈的接受grpc的請求,然後將資料存放到reqChannel管道中,然後在for迴圈中消費管道中的資料。
![Group 6](https://img.luozhiyun.com/20201205120311.png)
## 總結
到這裡Pilot部分的原始碼解析就差不多結束了,回顧一下前兩篇的內容,第一篇主要是講通過service controller來監聽Service、EndPoint、nodes、pods等資源的更新事件;第二篇主要是講通過config controller來監聽Istio的Gateway、DestinationRule及VirtualService等配置變動情況;這篇文章主要講解了xDS協議管理伺服器端是如何做的,通過接受service controller以及config controller中的訊息,從中獲取各種各樣的資源變動情況,然後建立RPC連線Envoy端的,並告知配置變動。
## Reference
https://www.servicemesher.com/blog/istio-analysis-4/
https://zhaohuabing.com/post/2019-10-21-pilot-discovery-code-analysis/
https://jimmysong.io/blog/service-mesh-the-microservices-in-post-kubernetes-era/
https://blog.gmem.cc/interaction-between-istio-pilot-and-envoy
https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol
https://cloudnative.to/blog/istio-