1. 程式人生 > >容器編排Kubernetes之kube

容器編排Kubernetes之kube

花了幾天時間,研究了Kubernetes DNS外掛的原始碼,對其實現有了個簡單的理解。這篇文章我簡單梳理下程式碼流程。

注:閱讀DNS原始碼前,可以閱讀DNS原理入門增加對DNS的認識。

架構圖

20170523213042

這是我簡單畫的架構圖,希望能幫助大家理解。

程式碼結構

| dns

| cmd // 三大元件的入口

| dnsmasq-nanny // DNS快取

| kube-dns // dns主專案

| sidecar // 附加元件

| pkg 元件程式碼庫,主要實現程式碼在該目錄下

| dns // kube-dns程式碼庫, 監聽service、pod等資源,動態更新DNS記錄

| dnsmasq // 內部封裝

dnsmasq程式用於快取,並可從dns伺服器獲取dns監控指標

| sidecar 用於監控和健康檢查

主要的程式碼都集中在上述樹形結構中,下面依次講解。

kube-dns

kube-dns是提供DNS功能的元件,我們重點關注。

首先看main方法:

func main() {
   config := options.NewKubeDNSConfig()
   config.AddFlags(pflag.CommandLine)

   flag.InitFlags()
   // Convinces goflags that we have called Parse() to avoid noisy logs.
// OSS Issue: kubernetes/kubernetes#17162. goflag.CommandLine.Parse([]string{}) // 解析引數 logs.InitLogs() defer logs.FlushLogs() // 初始化日誌 version.PrintAndExitIfRequested() glog.V(0).Infof("version: %+v", version.VERSION) // 例項化KubeDNSServer並執行 server := app.NewKubeDNSServerDefault
(config) server.Run() }

下面我們分析app包裡的server.go檔案

type KubeDNSServer struct {
   // DNS domain name.
   domain         string
   healthzPort    int
   dnsBindAddress string
   dnsPort        int
   nameServers    string
   kd             *dns.KubeDNS
} // KubeDNSServer類裡前幾個變數都是main函式裡傳遞過來的引數直接賦值,沒啥可講的。kd是pkg/dns包裡的KubeDNS類的例項,我們後續再講。
func NewKubeDNSServerDefault(config *options.KubeDNSConfig) *KubeDNSServer {} // 根據引數,填充KubeDNSServer物件
func newKubeClient(dnsConfig *options.KubeDNSConfig) (kubernetes.Interface, error) {} // 根據引數,例項化一個與apiserver通訊的client,有兩種模式的client可以使用。叢集內client(通過serviceAccount認證)以及叢集外client(當前Kubernetes叢集配置的其他認證方式進行認證)
// main函式中的server.Run()呼叫的函式
func (server *KubeDNSServer) Run() {
   pflag.VisitAll(func(flag *pflag.Flag) {
      glog.V(0).Infof("FLAG: --%s=%q", flag.Name, flag.Value)
   })
   setupSignalHandlers() // 監聽系統事件,主要作用是等待日誌處理完成
   server.startSkyDNSServer() // 配置SkyDNS服務並啟動服務,此處使用了SkyDNS的相關程式碼,這篇文章就不贅述了(我沒看程式碼,囧)
   server.kd.Start() // 啟動KubeDNS,後續再深入
   server.setupHandlers() // 新增兩個http方法,/readiness用於健康檢查,/cache返回當前dns中快取的dns記錄JSON

   glog.V(0).Infof("Status HTTP port %v", server.healthzPort)
   if server.nameServers != "" {
      glog.V(0).Infof("Upstream nameservers: %s", server.nameServers)
   }
   glog.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", server.healthzPort), nil)) // 監聽http服務
}

app/options包中還有個options.go檔案,這裡主要包含app需要的配置資訊,不再贅述。下面我們跳入pkg/dns包中探一探究竟,首先入場(也是唯一入場)的是pkg/dns/dns.go

type KubeDNS struct {
   // 與apiserver通訊的client
   kubeClient clientset.Interface

   // 域名,預設為cluster.local.
   domain string
   // configMap的名稱,預設為空,使用命令列引數
   configMap string

   // 儲存叢集中所有的endpoints
   endpointsStore kcache.Store
   // 儲存叢集中所有的services
   servicesStore kcache.Store
   // 儲存叢集中所有的nodes
   nodesStore kcache.Store

   // dns快取
   cache treecache.TreeCache
   // PTR記錄 ip --> skymsg.Service
   reverseRecordMap map[string]*skymsg.Service
   // 叢集服務列表 ip --> v1.Service
   clusterIPServiceMap map[string]*v1.Service
   // 快取鎖,更新上述三者的資料時,需加鎖
   cacheLock sync.RWMutex

   // 域名路徑,是域名反向分割後的列表,比如預設的為[]string{"local", "cluster"}
   domainPath []string

   // endpointsController 管理endpoints的變更
   endpointsController *kcache.Controller
   // serviceController 管理services的變更
   serviceController *kcache.Controller

   // 配置物件
   config *config.Config
   // 配置更新鎖
   configLock sync.RWMutex
   // 配置更新管理物件
   configSync config.Sync

   // 初始化同步endpoints和services的過期時間
   initialSyncTimeout time.Duration
}
func NewKubeDNS(client clientset.Interface, clusterDomain string, timeout time.Duration, configSync config.Sync) *KubeDNS {} // 根據引數配置KubeDNS物件,設定endpoint和service相關store和controller。
func (kd *KubeDNS) Start() {} // 啟動KudeDNS,其實也就是啟動在NewKubeDNS中設定的endpointsController和serviceController,並且監聽配置檔案的變化
func (kd *KubeDNS) GetCacheAsJSON() (string, error) {} // 獲取dns快取物件JSON,在cmd/app/server.go中被http方法/cache使用
func (kd *KubeDNS) setServicesStore() {} // 在NewKubeDNS中呼叫。監聽service的變化,針對不同的操作(新增,刪除,更新)執行不同的callback
func (kd *KubeDNS) setEndpointsStore() {} // 在NewKubeDNS中呼叫。監聽endpoint的變化,針對不同的操作(新增,刪除,更新)執行不同的callback
func (kd *KubeDNS) newService(obj interface{}) {} // 根據service的型別,生成不同型別的dns記錄。簡單的說,ExternalName型別的service是CNAME,只在dns快取(KubeDNS.cache)中儲存該記錄;Headless(無ClusterIp)型別的service不在KubeDNS.clusterIPServiceMap中記錄;而其他型別的service則在cache,reverseRecordMap,clusterIPServiceMap中一併儲存
func (kd *KubeDNS) removeService(obj interface{}) {} // 刪除service,也即刪除在cache,reverseRecordMap,clusterIPServiceMap中的相關記錄
func (kd *KubeDNS) updateService(oldObj, newObj interface{}) {} // 更新=刪除+新增
func (kd *KubeDNS) handleEndpointAdd(obj interface{}) {} // 新增endpoints
func (kd *KubeDNS) handleEndpointUpdate(oldObj, newObj interface{}) {} // 更新endpoints,刪除新endpoints子網裡跟老endpoints子網裡一樣的PTR記錄(KubeDNS.reverseRecordMap),即刪除相同ip的endpoint,然後呼叫handleEndpointAdd新增endpoints
func (kd *KubeDNS) handleEndpointDelete(obj interface{}) {} // 刪除相關的PTR記錄(KubeDNS.reverseRecordMap)即可
func (kd *KubeDNS) addDNSUsingEndpoints(e *v1.Endpoints) error {} // 新增endpoints,如果endpoints對應的service是Headless service,則生成相關記錄。如果不是,什麼也不做。
func (kd *KubeDNS) getServiceFromEndpoints(e *v1.Endpoints) (*v1.Service, error) {} // 根據endpoints返回service
func (kd *KubeDNS) fqdn(service *v1.Service, subpaths ...string) string {} // 生成一個完整網域名稱(Fully qualified domain name)
func (kd *KubeDNS) newPortalService(service *v1.Service) {} // 生成portalService,我的理解是一般型別的service,同時在cache,reverseRecordMap,clusterIPServiceMap中儲存該service的記錄
func (kd *KubeDNS) generateRecordsForHeadlessService(e *v1.Endpoints, svc *v1.Service) error {} // 生成headlessService,同時在cache,reverseRecordMap中儲存該service的記錄
func (kd *KubeDNS) newExternalNameService(service *v1.Service) {} // 生成ExternalNameService,只在cache中記錄該條資訊
func (kd *KubeDNS) Records(name string, exact bool) (retval []skymsg.Service, err error) {} // 查詢DNS記錄,引數中的exact標識是否精確匹配。其中federation相關的東西我還不是太明白
func (kd *KubeDNS) ReverseRecord(name string) (*skymsg.Service, error) {} // 查詢PTR記錄

Records 和 ReverseRecord 兩個方法是實現了skydns的Backend介面,這樣的話,KubeDNS就可以作為skydns的後端儲存提供dns查詢服務了,二者怎麼關聯起來的呢?回看cmd/kube-dns/app/server.go檔案裡的startSkyDNSServer方法,你會找到匯合點的,試試看吧。

至此,我們對kubeDNS元件有了初步大概的認識。接下來,我們再接著看dnsmasq元件。

dnsmasq

cmd/dnsmasq-nanny沒有什麼可以講的,也就是解析命令列引數然後呼叫pkg/dnsmasq/nanny.go的RunNanny方法。或者可以這麼說,整個dnsmasq就沒什麼事,就是在其中內嵌了dnsmasql應用程式,通過程式碼啟動並管理該程式的生命程序

// RunNanny runs the nanny and handles configuration updates.
func RunNanny(sync config.Sync, opts RunNannyOpts) {
   defer glog.Flush()

   currentConfig, err := sync.Once()
   if err != nil {
      glog.Errorf("Error getting initial config, using default: %v", err)
      currentConfig = config.NewDefaultConfig()
   } // 解析配置

   nanny := &Nanny{Exec: opts.DnsmasqExec}
   nanny.Configure(opts.DnsmasqArgs, currentConfig)
   if err := nanny.Start(); err != nil {
      glog.Fatalf("Could not start dnsmasq with initial configuration: %v", err)
   } // 啟動dnsmasq應用程式

   configChan := sync.Periodic()

   for {
      select {
      case status := <-nanny.ExitChannel:
         glog.Flush()
         glog.Fatalf("dnsmasq exited: %v", status)
         break
      case currentConfig = <-configChan:
         if opts.RestartOnChange {
            glog.V(0).Infof("Restarting dnsmasq with new configuration")
            nanny.Kill()
            nanny = &Nanny{Exec: opts.DnsmasqExec}
            nanny.Configure(opts.DnsmasqArgs, currentConfig)
            nanny.Start()
         } else {
            glog.V(2).Infof("Not restarting dnsmasq (--restartDnsmasq=false)")
         }
         break
      }
   } // 持續監聽退出狀態和配置更新
}
 
func (n *Nanny) Kill() error {} // 殺掉執行中的dnsmasq程序
func (n *Nanny) Start() error {} // 啟動dnsmasq程序,並將日誌資訊輸出到glog
func (n *Nanny) Configure(args []string, config *config.Config) {} // 解析配置,必須在Start方法前呼叫

dnsmasq的另一部分的作用是從dnsmasq應用程式讀取監控資訊,包括快取命中數量,快取未命中數量,快取刪除數量,快取插入數量,快取大小等資訊。

sidecar

sidecar元件的主要作用是提供kube-dns和dnsmasq的健康檢查和dns的監控。我們略過cmd/sidecar/main.go,他的主要作用也無非是解析引數。我們重點關注pkg/sidecar/server.go

func (s *server) Run(options *Options) {
   s.options = options
   glog.Infof("Starting server (options %+v)", *s.options)

   // 之前說sidecar監控kube-dns和dnsmasq,其實是通過引數傳遞進來的,具體的引數解析可以參考cmd/sidecar/main.go
   for _, probeOption := range options.Probes {
      probe := &dnsProbe{DNSProbeOption: probeOption}
      s.probes = append(s.probes, probe)
      probe.Start(options) // 啟動元件健康檢查
   }

   s.runMetrics(options) // dns監控資訊
}