5.深入Istio原始碼:Pilot-agent作用及其原始碼分析
阿新 • • 發佈:2020-12-12
> 轉載請宣告出處哦~,本篇文章釋出於luozhiyun的部落格:https://www.luozhiyun.com
>
> 本文使用的Istio原始碼是 release 1.5。
## 介紹
Sidecar在注入的時候會注入istio-init和istio-proxy兩個容器。Pilot-agent就是啟動istio-proxy的入口。通過kubectl命令我們可以看到啟動命令:
```sh
[root@localhost ~]# kubectl exec -it details-v1-6c9f8bcbcb-shltm -c istio-proxy -- ps -efww
UID PID PPID C STIME TTY TIME CMD
istio-p+ 1 0 0 08:52 ? 00:00:13 /usr/local/bin/pilot-agent proxy sidecar --domain default.svc.cluster.local --configPath /etc/istio/proxy --binaryPath /usr/local/bin/envoy --serviceCluster details.default --drainDuration 45s --parentShutdownDuration 1m0s --discoveryAddress istiod.istio-system.svc:15012 --zipkinAddress zipkin.istio-system:9411 --proxyLogLevel=warning --proxyComponentLogLevel=misc:error --connectTimeout 10s --proxyAdminPort 15000 --concurrency 2 --controlPlaneAuthPolicy NONE --dnsRefreshRate 300s --statusPort 15020 --trust-domain=cluster.local --controlPlaneBootstrap=false
istio-p+ 18 1 0 08:52 ? 00:01:11 /usr/local/bin/envoy -c /etc/istio/proxy/envoy-rev0.json --restart-epoch 0 --drain-time-s 45 --parent-shutdown-time-s 60 --service-cluster details.default --service-node sidecar~172.20.0.14~details-v1-6c9f8bcbcb-shltm.default~default.svc.cluster.local --max-obj-name-len 189 --local-address-ip-version v4 --log-format [Envoy (Epoch 0)] [%Y-%m-%d %T.%e][%t][%l][%n] %v -l warning --component-log-level misc:error --concurrency 2
```
Pilot-agent除了啟動istio-proxy以外還有以下能力:
* 生成Envoy的Bootstrap配置檔案;
* 健康檢查;
* 監視證書的變化,通知Envoy程序熱重啟,實現證書的熱載入;
* 提供Envoy守護功能,當Envoy異常退出的時候重啟Envoy;
* 通知Envoy優雅退出;
## 程式碼執行流程分析
```go
proxyCmd = &cobra.Command{
Use: "proxy",
Short: "Envoy proxy agent",
FParseErrWhitelist: cobra.FParseErrWhitelist{
UnknownFlags: true,
},
RunE: func(c *cobra.Command, args []string) error {
...
// 用於設定預設配置檔案的預設配置相關引數
proxyConfig := mesh.DefaultProxyConfig()
// set all flags
proxyConfig.CustomConfigFile = customConfigFile
proxyConfig.ProxyBootstrapTemplatePath = templateFile
proxyConfig.ConfigPath = configPath
proxyConfig.BinaryPath = binaryPath
proxyConfig.ServiceCluster = serviceCluster
proxyConfig.DrainDuration = types.DurationProto(drainDuration)
proxyConfig.ParentShutdownDuration = types.DurationProto(parentShutdownDuration)
proxyConfig.DiscoveryAddress = discoveryAddress
proxyConfig.ConnectTimeout = types.DurationProto(connectTimeout)
proxyConfig.StatsdUdpAddress = statsdUDPAddress
...
ctx, cancel := context.WithCancel(context.Background())
// 啟動 status server
if statusPort > 0 {
localHostAddr := localHostIPv4
if proxyIPv6 {
localHostAddr = localHostIPv6
}
prober := kubeAppProberNameVar.Get()
//健康探測
statusServer, err := status.NewServer(status.Config{
LocalHostAddr: localHostAddr,
AdminPort: proxyAdminPort,
//通過引數--statusPort 15020設定
StatusPort: statusPort,
KubeAppProbers: prober,
NodeType: role.Type,
})
if err != nil {
cancel()
return err
}
go waitForCompletion(ctx, statusServer.Run)
}
...
//構造Proxy例項,包括配置,啟動引數等
envoyProxy := envoy.NewProxy(envoy.ProxyConfig{
Config: proxyConfig,
Node: role.ServiceNode(),
LogLevel: proxyLogLevel,
ComponentLogLevel: proxyComponentLogLevel,
PilotSubjectAltName: pilotSAN,
MixerSubjectAltName: mixerSAN,
NodeIPs: role.IPAddresses,
DNSRefreshRate: dnsRefreshRate,
PodName: podName,
PodNamespace: podNamespace,
PodIP: podIP,
SDSUDSPath: sdsUDSPath,
SDSTokenPath: sdsTokenPath,
STSPort: stsPort,
ControlPlaneAuth: controlPlaneAuthEnabled,
DisableReportCalls: disableInternalTelemetry,
OutlierLogPath: outlierLogPath,
PilotCertProvider: pilotCertProvider,
})
//構造agent例項,實現了Agent介面
agent := envoy.NewAgent(envoyProxy, features.TerminationDrainDuration())
if nodeAgentSDSEnabled {
tlsCertsToWatch = []string{}
}
//構造watcher例項
watcher := envoy.NewWatcher(tlsCertsToWatch, agent.Restart)
//啟動 watcher
go watcher.Run(ctx)
// 優雅退出
go cmd.WaitSignalFunc(cancel)
//啟動 agent
return agent.Run(ctx)
},
}
```
執行流程大概分成這麼幾步:
1. 用於設定預設配置檔案的預設配置相關引數;
2. 啟動 status server進行健康檢測;
3. 構造Proxy例項,包括配置,啟動引數,並構造構造agent例項;
4. 構造watcher例項,並啟動;
5. 開啟執行緒監聽訊號,進行優雅退出;
6. 啟動 agent;
### 預設配置相關引數
```go
kubectl exec -it details-v1-6c9f8bcbcb-shltm -c istio-proxy -- /usr/local/bin/pilot-agent proxy --help
Envoy proxy agent
Usage:
pilot-agent proxy [flags]
Flags:
--binaryPath string Path to the proxy binary (default "/usr/local/bin/envoy")
--concurrency int number of worker threads to run
--configPath string Path to the generated configuration file directory (default "/etc/istio/proxy")
--connectTimeout duration Connection timeout used by Envoy for supporting services (default 1s)
--controlPlaneAuthPolicy string Control Plane Authentication Policy (default "NONE")
--controlPlaneBootstrap Process bootstrap provided via templateFile to be used by control plane components. (default true)
--customConfigFile string Path to the custom configuration file
--datadogAgentAddress string Address of the Datadog Agent
--disableInternalTelemetry Disable internal telemetry
--discoveryAddress string Address of the discovery service exposing xDS (e.g. istio-pilot:8080) (default "istio-pilot:15010")
--dnsRefreshRate string The dns_refresh_rate for bootstrap STRICT_DNS clusters (default "300s")
--domain string DNS domain suffix. If not provided uses ${POD_NAMESPACE}.svc.cluster.local
--drainDuration duration The time in seconds that Envoy will drain connections during a hot restart (default 45s)
--envoyAccessLogService string Settings of an Envoy gRPC Access Log Service API implementation
--envoyMetricsService string Settings of an Envoy gRPC Metrics Service API implementation
-h, --help help for proxy
--id string Proxy unique ID. If not provided uses ${POD_NAME}.${POD_NAMESPACE} from environment variables
--ip string Proxy IP address. If not provided uses ${INSTANCE_IP} environment variable.
--lightstepAccessToken string Access Token for LightStep Satellite pool
--lightstepAddress string Address of the LightStep Satellite pool
--lightstepCacertPath string Path to the trusted cacert used to authenticate the pool
--lightstepSecure Should connection to the LightStep Satellite pool be secure
--mixerIdentity string The identity used as the suffix for mixer's spiffe SAN. This would only be used by pilot all other proxy would get this value from pilot
--outlierLogPath string The log path for outlier detection
--parentShutdownDuration duration The time in seconds that Envoy will wait before shutting down the parent process during a hot restart (default 1m0s)
--pilotIdentity string The identity used as the suffix for pilot's spiffe SAN
--proxyAdminPort uint16 Port on which Envoy should listen for administrative commands (default 15000)
--proxyComponentLogLevel string The component log level used to start the Envoy proxy (default "misc:error")
--proxyLogLevel string The log level used to start the Envoy proxy (choose from {trace, debug, info, warning, error, critical, off}) (default "warning")
--serviceCluster string Service cluster (default "istio-proxy")
--serviceregistry string Select the platform for service registry, options are {Kubernetes, Consul, Mock} (default "Kubernetes")
--statsdUdpAddress string IP Address and Port of a statsd UDP listener (e.g. 10.75.241.127:9125)
--statusPort uint16 HTTP Port on which to serve pilot agent status. If zero, agent status will not be provided.
--stsPort int HTTP Port on which to serve Security Token Service (STS). If zero, STS service will not be provided.
--templateFile string Go template bootstrap config
--tokenManagerPlugin string Token provider specific plugin name. (default "GoogleTokenExchange")
--trust-domain string The domain to use for identities
--zipkinAddress string Address of the Zipkin service (e.g. zipkin:9411)
```
從上面輸出我們也可以看到proxy引數的含義以及對應的預設值。
```go
func DefaultProxyConfig() meshconfig.ProxyConfig {
return meshconfig.ProxyConfig{
ConfigPath: constants.ConfigPathDir,
BinaryPath: constants.BinaryPathFilename,
ServiceCluster: constants.ServiceClusterName,
DrainDuration: types.DurationProto(45 * time.Second),
ParentShutdownDuration: types.DurationProto(60 * time.Second),
DiscoveryAddress: constants.DiscoveryPlainAddress,
ConnectTimeout: types.DurationProto(1 * time.Second),
StatsdUdpAddress: "",
EnvoyMetricsService: &meshconfig.RemoteService{Address: ""},
EnvoyAccessLogService: &meshconfig.RemoteService{Address: ""},
ProxyAdminPort: 15000,
ControlPlaneAuthPolicy: meshconfig.AuthenticationPolicy_NONE,
CustomConfigFile: "",
Concurrency: 0,
StatNameLength: 189,
Tracing: nil,
}
}
```
預設的啟動引數都在DefaultProxyConfig方法中設定,預設的啟動配置如下所示:
- ConfigPath:/etc/istio/proxy
- BinaryPath:/usr/local/bin/envoy
- ServiceCluster:istio-proxy
- DrainDuration:45s
- ParentShutdownDuration:60s
- DiscoveryAddress:istio-pilot:15010
- ConnectTimeout:1s
- StatsdUdpAddress:""
- EnvoyMetricsService:meshconfig.RemoteService
- EnvoyAccessLogService:meshconfig.RemoteService
- ProxyAdminPort:15000
- ControlPlaneAuthPolicy:0
- CustomConfigFile:""
- Concurrency:0
- StatNameLength:189
- Tracing:nil
### status server健康檢查
初始化status server:
```go
func NewServer(config Config) (*Server, error) {
s := &Server{
statusPort: config.StatusPort,
ready: &ready.Probe{
LocalHostAddr: config.LocalHostAddr,
AdminPort: config.AdminPort,
NodeType: config.NodeType,
},
}
...
return s, nil
}
```
初始化完成之後會開啟一個執行緒呼叫statusServer的 Run方法:
```go
go waitForCompletion(ctx, statusServer.Run)
func (s *Server) Run(ctx context.Context) {
log.Infof("Opening status port %d\n", s.statusPort)
mux := http.NewServeMux()
// Add the handler for ready probes.
// 初始化探針的回撥處理器
// /healthz/ready
mux.HandleFunc(readyPath, s.handleReadyProbe)
mux.HandleFunc(quitPath, s.handleQuit)
//應用埠檢查
mux.HandleFunc("/app-health/", s.handleAppProbe)
//埠通過引數--statusPort 15020設定
l, err := net.Listen("tcp", fmt.Sprintf(":%d", s.statusPort))
if err != nil {
log.Errorf("Error listening on status port: %v", err.Error())
return
}
...
defer l.Close()
//開啟監聽
go func() {
if err := http.Serve(l, mux); err != nil {
log.Errora(err)
notifyExit()
}
}()
<-ctx.Done()
log.Info("Status server has successfully terminated")
}
```
Run方法會開啟一個執行緒並監聽15020埠,呼叫路徑為 /healthz/ready,並通過呼叫handleReadyProbe處理器來呼叫Envoy的15000埠判斷Envoy是否已經 ready 接受相對應的流量。呼叫過程如下:
![Group 7](https://img.luozhiyun.com/20201212154401.png)
### watcher監控管理
在進行watcher監控之前會通過NewAgent生成agent例項:
```go
func NewAgent(proxy Proxy, terminationDrainDuration time.Duration) Agent {
return &agent{
proxy: proxy,
//用於管理啟動 Envoy 後的狀態通道,用於監視 Envoy 程序的狀態
statusCh: make(chan exitStatus),
//活躍的Epoch 集合
activeEpochs: map[int]chan error{},
//預設5s
terminationDrainDuration: terminationDrainDuration,
//當前的Epoch
currentEpoch: -1,
}
}
```
然後構建watcher例項:
```go
//構造watcher例項
watcher := envoy.NewWatcher(tlsCertsToWatch, agent.Restart)
type watcher struct {
//證書列表
certs []string
//envoy 重啟函式
updates func(interface{})
}
func NewWatcher(certs []string, updates func(interface{})) Watcher {
return &watcher{
certs: certs,
updates: updates,
}
}
```
watcher裡面總共就兩個引數certs是監聽的證書列表,updates是envoy 重啟函式,如果證書檔案發生變化則呼叫updates來reload envoy。
啟動watcher:
```go
go watcher.Run(ctx)
func (w *watcher) Run(ctx context.Context) {
//啟動envoy
w.SendConfig()
//監聽證書變化
go watchCerts(ctx, w.certs, watchFileEvents, defaultMinDelay, w.SendConfig)
<-ctx.Done()
log.Info("Watcher has successfully terminated")
}
```
watcher的Run方法首先會呼叫SendConfig啟動Envoy,然後啟動一個執行緒監聽證書的變化。
```go
func (w *watcher) SendConfig() {
h := sha256.New()
generateCertHash(h, w.certs)
w.updates(h.Sum(nil))
}
```
SendConfig方法會獲取當前的證書集合hash之後傳入到updates方法中,updates方法就是在初始化NewWatcher的時候傳入的,這裡是會呼叫到agent的Restart方法的:
```go
func (a *agent) Restart(config interface{}) {
a.restartMutex.Lock()
defer a.restartMutex.Unlock()
a.mutex.Lock()
//校驗傳入的引數是否產生了變化
if reflect.DeepEqual(a.currentConfig, config) {
// Same configuration - nothing to do.
a.mutex.Unlock()
return
}
//活躍的Epoch
hasActiveEpoch := len(a.activeEpochs) > 0
//獲取當前的Epoch
activeEpoch := a.currentEpoch
//因為配置變了,所以Epoch加1
epoch := a.currentEpoch + 1
log.Infof("Received new config, creating new Envoy epoch %d", epoch)
//更新當前的配置以及Epoch
a.currentEpoch = epoch
a.currentConfig = config
// 用來做做主動退出
abortCh := make(chan error, 1)
// 設定當前活躍Epoch的abortCh管道,用於優雅關閉
a.activeEpochs[a.currentEpoch] = abortCh
a.mutex.Unlock()
if hasActiveEpoch {
a.waitUntilLive(activeEpoch)
}
//啟動envoy,會將結果放入到statusCh管道中
go a.runWait(config, epoch, abortCh)
}
```
Restart方法會判斷傳入的配置是否和當前的配置一致,如果不一致,那麼設定好當前的配置後呼叫runWait方法啟動Envoy,並將啟動結果放入到statusCh管道中:
```go
func (a *agent) runWait(config interface{}, epoch int, abortCh <-chan error) {
log.Infof("Epoch %d starting", epoch)
//啟動envoy
err := a.proxy.Run(config, epoch, abortCh)
//刪除當前 epoch 對應的配置檔案
a.proxy.Cleanup(epoch)
a.statusCh <- exitStatus{epoch: epoch, err: err}
}
```
### envoy啟動流程
![Group 8](https://img.luozhiyun.com/20201212154410.png)
在上面講了,envoy的啟動會在runWait方法中進行,通過呼叫proxy的Run方法會通過模板檔案建立/etc/istio/proxy/envoy-rev0.json配置檔案,然會直接使用exec包呼叫envoy啟動命令啟動envoy。
```go
func (e *envoy) Run(config interface{}, epoch int, abort <-chan error) error {
var fname string
//如果指定了模板檔案,則使用使用者指定的,否則則使用預設的
if len(e.Config.CustomConfigFile) > 0 {
fname = e.Config.CustomConfigFile
} else {
out, err := bootstrap.New(bootstrap.Config{
Node: e.Node,
DNSRefreshRate: e.DNSRefreshRate,
Proxy: &e.Config,
PilotSubjectAltName: e.PilotSubjectAltName,
MixerSubjectAltName: e.MixerSubjectAltName,
LocalEnv: os.Environ(),
NodeIPs: e.NodeIPs,
PodName: e.PodName,
PodNamespace: e.PodNamespace,
PodIP: e.PodIP,
SDSUDSPath: e.SDSUDSPath,
SDSTokenPath: e.SDSTokenPath,
STSPort: e.STSPort,
ControlPlaneAuth: e.ControlPlaneAuth,
DisableReportCalls: e.DisableReportCalls,
OutlierLogPath: e.OutlierLogPath,
PilotCertProvider: e.PilotCertProvider,
}).CreateFileForEpoch(epoch)
if err != nil {
log.Errora("Failed to generate bootstrap config: ", err)
os.Exit(1) // Prevent infinite loop attempting to write the file, let k8s/systemd report
}
fname = out
}
//設定啟動引數
args := e.args(fname, epoch, istioBootstrapOverrideVar.Get())
log.Infof("Envoy command: %v", args)
//直接使用exec包呼叫envoy啟動命令
cmd := exec.Command(e.Config.BinaryPath, args...)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Start(); err != nil {
return err
}
done := make(chan error, 1)
go func() {
done <- cmd.Wait()
}()
//等待 abort channel 和 done,用於結束 Envoy 和正確返回當前的啟動狀態
select {
//用於優雅關閉,後面會講到
case err := <-abort:
log.Warnf("Aborting epoch %d", epoch)
if errKill := cmd.Process.Kill(); errKill != nil {
log.Warnf("killing epoch %d caused an error %v", epoch, errKill)
}
return err
case err := <-done:
return err
}
}
```
Run方法會通過呼叫CreateFileForEpoch方法獲取到模板檔案:/var/lib/istio/envoy/envoy_bootstrap_tmpl.json,然後生成/etc/istio/proxy/envoy-rev0.json檔案並返回路徑;通過呼叫args方法來配置envoy的啟動引數,然後呼叫exec.Command啟動envoy,BinaryPath為/usr/local/bin/envoy。
最後非同步獲取cmd的返回結果,存入到done管道中作為方法的引數返回。返回的引數在runWait方法中會被接受到,存入到statusCh管道中。
在呼叫agent的run方法的時候會監聽statusCh管道中的資料:
```go
agent.Run(ctx)
func (a *agent) Run(ctx context.Context) error {
log.Info("Starting proxy agent")
for {
select {
//如果 proxy-Envoy 的狀態發生了變化
case status := <-a.statusCh:
a.mutex.Lock()
if status.err != nil {
if status.err.Error() == errOutOfMemory {
log.Warnf("Envoy may have been out of memory killed. Check memory usage and limits.")
}
log.Errorf("Epoch %d exited with error: %v", status.epoch, status.err)
} else {
//正常退出
log.Infof("Epoch %d exited normally", status.epoch)
}
//刪除當前 epoch 對應的配置檔案
delete(a.activeEpochs, status.epoch)
active := len(a.activeEpochs)
a.mutex.Unlock()
if active == 0 {
log.Infof("No more active epochs, terminating")
return nil
}
...
}
}
```
### 優雅退出
pilot-agent會開啟一個執行緒呼叫WaitSignalFunc方法監聽syscall.SIGINT、syscall.SIGTERM訊號,然後呼叫context的cancel來實現優化關閉的效果:
```go
func WaitSignalFunc(cancel func()) {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
<-sigs
cancel()
_ = log.Sync()
}
```
當context的cancel方法被呼叫的時候,agent的Run方法裡面select監聽的ctx.Done()方法也會立即返回,呼叫terminate方法:
```go
func (a *agent) Run(ctx context.Context) error {
for {
select {
//如果 proxy-Envoy 的狀態發生了變化
case status := <-a.statusCh:
...
case <-ctx.Done():
a.terminate()
log.Info("Agent has successfully terminated")
return nil
}
}
}
func (a *agent) terminate() {
log.Infof("Agent draining Proxy")
e := a.proxy.Drain()
if e != nil {
log.Warnf("Error in invoking drain listeners endpoint %v", e)
}
log.Infof("Graceful termination period is %v, starting...", a.terminationDrainDuration)
//睡眠5s
time.Sleep(a.terminationDrainDuration)
log.Infof("Graceful termination period complete, terminating remaining proxies.")
a.abortAll()
}
```
terminate方法會呼叫sleep休眠5s,然後呼叫abortAll通知所有活躍Epoch進行優雅關閉。
```go
var errAbort = errors.New("epoch aborted")
func (a *agent) abortAll() {
a.mutex.Lock()
defer a.mutex.Unlock()
for epoch, abortCh := range a.activeEpochs {
log.Warnf("Aborting epoch %d...", epoch)
abortCh <- errAbort
}
log.Warnf("Aborted all epochs")
}
```
abortAll會獲取到所有活躍的Epoch對應的abortCh管道,並插入一條資料。如果這個時候有活躍的Epoch正在等待cmd返回結果,那麼會直接呼叫kill方法將程序殺死:
```go
func (e *envoy) Run(config interface{}, epoch int, abort <-chan error) error {
...
//等待 abort channel 和 done,用於結束 Envoy 和正確返回當前的啟動狀態
select {
//用於優雅關閉,後面會講到
case err := <-abort:
log.Warnf("Aborting epoch %d", epoch)
if errKill := cmd.Process.Kill(); errKill != nil {
log.Warnf("killing epoch %d caused an error %v", epoch, errKill)
}
return err
case err := <-done:
return err
}
}
```
## 總結
本篇文章講解了pilot-agent有什麼作用,在整個istio中起到了什麼樣的作用,以及Envoy是如何被監控,被重啟的。
## Reference
https://blog.csdn.net/zhonglinzhang/article/details/86551795
https://www.do1618.com/archives/1561/pilot-agent-%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90%EF%BC%88%E5%8E%9F%E5%88%9B%EF%BC%89/
https://www.servicemesher.com/blog/istio-service-mesh-source-code-pilot-agent