Ingress nginx Controller原始碼分析
主要結構圖
入口函式
cmd/nginx/main.go
func main() { klog.InitFlags(nil) rand.Seed(time.Now().UnixNano()) fmt.Println(version.String()) showVersion, conf, err := parseFlags() if showVersion { os.Exit(0) } if err != nil { klog.Fatal(err) } err = file.CreateRequiredDirectories() if err != nil { klog.Fatal(err) } kubeClient, err := createApiserverClient(conf.APIServerHost, conf.RootCAFile, conf.KubeConfigFile) if err != nil { handleFatalInitError(err) } if len(conf.DefaultService) > 0 { err := checkService(conf.DefaultService, kubeClient) if err != nil { klog.Fatal(err) } klog.InfoS("Valid default backend", "service", conf.DefaultService) } if len(conf.PublishService) > 0 { err := checkService(conf.PublishService, kubeClient) if err != nil { klog.Fatal(err) } } if conf.Namespace != "" { _, err = kubeClient.CoreV1().Namespaces().Get(context.TODO(), conf.Namespace, metav1.GetOptions{}) if err != nil { klog.Fatalf("No namespace with name %v found: %v", conf.Namespace, err) } } conf.FakeCertificate = ssl.GetFakeSSLCert() klog.InfoS("SSL fake certificate created", "file", conf.FakeCertificate.PemFileName) if !k8s.NetworkingIngressAvailable(kubeClient) { klog.Fatalf("ingress-nginx requires Kubernetes v1.19.0 or higher") } _, err = kubeClient.NetworkingV1().IngressClasses().List(context.TODO(), metav1.ListOptions{}) if err != nil { if !errors.IsNotFound(err) { if errors.IsForbidden(err) { klog.Warningf("No permissions to list and get Ingress Classes: %v, IngressClass feature will be disabled", err) conf.IngressClassConfiguration.IgnoreIngressClass = true } } } conf.Client = kubeClient err = k8s.GetIngressPod(kubeClient) if err != nil { klog.Fatalf("Unexpected error obtaining ingress-nginx pod: %v", err) } reg := prometheus.NewRegistry() reg.MustRegister(prometheus.NewGoCollector()) reg.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{ PidFn: func() (int, error) { return os.Getpid(), nil }, ReportErrors: true, })) mc := metric.NewDummyCollector() if conf.EnableMetrics { mc, err = metric.NewCollector(conf.MetricsPerHost, reg, conf.IngressClassConfiguration.Controller) if err != nil { klog.Fatalf("Error creating prometheus collector: %v", err) } } // Pass the ValidationWebhook status to determine if we need to start the collector // for the admissionWebhook mc.Start(conf.ValidationWebhook) if conf.EnableProfiling { go registerProfiler() } ngx := controller.NewNGINXController(conf, mc) mux := http.NewServeMux() registerHealthz(nginx.HealthPath, ngx, mux) registerMetrics(reg, mux) go startHTTPServer(conf.HealthCheckHost, conf.ListenPorts.Health, mux) go ngx.Start() handleSigterm(ngx, func(code int) { os.Exit(code) }) }
主要邏輯
-
Step1 初始化配置,獲取kubeClient
kubeClient, err := createApiserverClient(conf.APIServerHost, conf.RootCAFile, conf.KubeConfigFile)
-
Step2 檢查DefaultService、PublicService、Namespace等存在
-
step3 檢查IngressClasses許可權
-
step4 檢查ingress controller pod存在
-
Step5 建立NGINXController, 開啟健康檢查,metrics
ngx := controller.NewNGINXController(conf, mc)
方法NewNGINXController
//internal/ingress/controller/nginx.go // NewNGINXController creates a new NGINX Ingress controller. func NewNGINXController(config *Configuration, mc metric.Collector) *NGINXController { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(klog.Infof) eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{ Interface: config.Client.CoreV1().Events(config.Namespace), }) h, err := dns.GetSystemNameServers() if err != nil { klog.Warningf("Error reading system nameservers: %v", err) } n := &NGINXController{ isIPV6Enabled: ing_net.IsIPv6Enabled(), resolver: h, cfg: config, syncRateLimiter: flowcontrol.NewTokenBucketRateLimiter(config.SyncRateLimit, 1), recorder: eventBroadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{ Component: "nginx-ingress-controller", }), stopCh: make(chan struct{}), updateCh: channels.NewRingChannel(1024), ngxErrCh: make(chan error), stopLock: &sync.Mutex{}, runningConfig: new(ingress.Configuration), Proxy: &TCPProxy{}, metricCollector: mc, command: NewNginxCommand(), } if n.cfg.ValidationWebhook != "" { n.validationWebhookServer = &http.Server{ Addr: config.ValidationWebhook, Handler: adm_controller.NewAdmissionControllerServer(&adm_controller.IngressAdmission{Checker: n}), TLSConfig: ssl.NewTLSListener(n.cfg.ValidationWebhookCertPath, n.cfg.ValidationWebhookKeyPath).TLSConfig(), // disable http/2 // https://github.com/kubernetes/kubernetes/issues/80313 // https://github.com/kubernetes/ingress-nginx/issues/6323#issuecomment-737239159 TLSNextProto: make(map[string]func(*http.Server, *tls.Conn, http.Handler)), } } n.store = store.New( config.Namespace, config.WatchNamespaceSelector, config.ConfigMapName, config.TCPConfigMapName, config.UDPConfigMapName, config.DefaultSSLCertificate, config.ResyncPeriod, config.Client, n.updateCh, config.DisableCatchAll, config.IngressClassConfiguration) n.syncQueue = task.NewTaskQueue(n.syncIngress) if config.UpdateStatus { n.syncStatus = status.NewStatusSyncer(status.Config{ Client: config.Client, PublishService: config.PublishService, PublishStatusAddress: config.PublishStatusAddress, IngressLister: n.store, UpdateStatusOnShutdown: config.UpdateStatusOnShutdown, UseNodeInternalIP: config.UseNodeInternalIP, }) } else { klog.Warning("Update of Ingress status is disabled (flag --update-status)") } onTemplateChange := func() { template, err := ngx_template.NewTemplate(nginx.TemplatePath) if err != nil { // this error is different from the rest because it must be clear why nginx is not working klog.ErrorS(err, "Error loading new template") return } n.t = template klog.InfoS("New NGINX configuration template loaded") n.syncQueue.EnqueueTask(task.GetDummyObject("template-change")) } ngxTpl, err := ngx_template.NewTemplate(nginx.TemplatePath) if err != nil { klog.Fatalf("Invalid NGINX configuration template: %v", err) } n.t = ngxTpl _, err = watch.NewFileWatcher(nginx.TemplatePath, onTemplateChange) if err != nil { klog.Fatalf("Error creating file watcher for %v: %v", nginx.TemplatePath, err) } filesToWatch := []string{} err = filepath.Walk("/etc/nginx/geoip/", func(path string, info os.FileInfo, err error) error { if err != nil { return err } if info.IsDir() { return nil } filesToWatch = append(filesToWatch, path) return nil }) if err != nil { klog.Fatalf("Error creating file watchers: %v", err) } for _, f := range filesToWatch { _, err = watch.NewFileWatcher(f, func() { klog.InfoS("File changed detected. Reloading NGINX", "path", f) n.syncQueue.EnqueueTask(task.GetDummyObject("file-change")) }) if err != nil { klog.Fatalf("Error creating file watcher for %v: %v", f, err) } } return n }
主要邏輯:
-
例項化NGINXController
-
建立validationWebhookServer
-
建立store快取
n.store = store.New
-
監聽模板檔案以及/etc/nginx/geoip下的檔案變化,如有變化,傳送對應的檔案變化事件
-
當ingress有變化時,執行syncIngress方法回撥
n.syncQueue = task.NewTaskQueue(n.syncIngress)
萬能的死迴圈
Start方法,開啟萬能的死迴圈,處理各種Event
//internal/ingress/controller/nginx.go
for {
select {
case err := <-n.ngxErrCh:
if n.isShuttingDown {
return
}
// if the nginx master process dies, the workers continue to process requests
// until the failure of the configured livenessProbe and restart of the pod.
if process.IsRespawnIfRequired(err) {
return
}
case event := <-n.updateCh.Out():
if n.isShuttingDown {
break
}
if evt, ok := event.(store.Event); ok {
klog.V(3).InfoS("Event received", "type", evt.Type, "object", evt.Obj)
if evt.Type == store.ConfigurationEvent {
// TODO: is this necessary? Consider removing this special case
n.syncQueue.EnqueueTask(task.GetDummyObject("configmap-change"))
continue
}
n.syncQueue.EnqueueSkippableTask(evt.Obj)
} else {
klog.Warningf("Unexpected event type received %T", event)
}
case <-n.stopCh:
return
}
}
當updateCh中有資料時,放到syncQueue中
//internal/task/queue.go
// worker processes work in the queue through sync.
func (t *Queue) worker() {
for {
key, quit := t.queue.Get()
if quit {
if !isClosed(t.workerDone) {
close(t.workerDone)
}
return
}
ts := time.Now().UnixNano()
item := key.(Element)
if item.Timestamp != 0 && t.lastSync > item.Timestamp {
klog.V(3).InfoS("skipping sync", "key", item.Key, "last", t.lastSync, "now", item.Timestamp)
t.queue.Forget(key)
t.queue.Done(key)
continue
}
klog.V(3).InfoS("syncing", "key", item.Key)
if err := t.sync(key); err != nil {
klog.ErrorS(err, "requeuing", "key", item.Key)
t.queue.AddRateLimited(Element{
Key: item.Key,
Timestamp: 0,
})
} else {
t.queue.Forget(key)
t.lastSync = ts
}
t.queue.Done(key)
}
}
當syncQueue中有資料,回撥syncIngress方法 t.sync(key)
//internal/ingress/controller/store/store.go
ingEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
ing, _ := toIngress(obj)
if !watchedNamespace(ing.Namespace) {
return
}
ic, err := store.GetIngressClass(ing, icConfig)
if err != nil {
klog.InfoS("Ignoring ingress because of error while validating ingress class", "ingress", klog.KObj(ing), "error", err)
return
}
klog.InfoS("Found valid IngressClass", "ingress", klog.KObj(ing), "ingressclass", ic)
if hasCatchAllIngressRule(ing.Spec) && disableCatchAll {
klog.InfoS("Ignoring add for catch-all ingress because of --disable-catch-all", "ingress", klog.KObj(ing))
return
}
recorder.Eventf(ing, corev1.EventTypeNormal, "Sync", "Scheduled for sync")
store.syncIngress(ing)
store.updateSecretIngressMap(ing)
store.syncSecrets(ing)
updateCh.In() <- Event{
Type: CreateEvent,
Obj: obj,
}
},
DeleteFunc: ingDeleteHandler,
UpdateFunc: func(old, cur interface{}) {
oldIng, _ := toIngress(old)
curIng, _ := toIngress(cur)
if !watchedNamespace(oldIng.Namespace) {
return
}
var errOld, errCur error
var classCur string
if !icConfig.IgnoreIngressClass {
_, errOld = store.GetIngressClass(oldIng, icConfig)
classCur, errCur = store.GetIngressClass(curIng, icConfig)
}
if errOld != nil && errCur == nil {
if hasCatchAllIngressRule(curIng.Spec) && disableCatchAll {
klog.InfoS("ignoring update for catch-all ingress because of --disable-catch-all", "ingress", klog.KObj(curIng))
return
}
klog.InfoS("creating ingress", "ingress", klog.KObj(curIng), "ingressclass", classCur)
recorder.Eventf(curIng, corev1.EventTypeNormal, "Sync", "Scheduled for sync")
} else if errOld == nil && errCur != nil {
klog.InfoS("removing ingress because of unknown ingressclass", "ingress", klog.KObj(curIng))
ingDeleteHandler(old)
return
} else if errCur == nil && !reflect.DeepEqual(old, cur) {
if hasCatchAllIngressRule(curIng.Spec) && disableCatchAll {
klog.InfoS("ignoring update for catch-all ingress and delete old one because of --disable-catch-all", "ingress", klog.KObj(curIng))
ingDeleteHandler(old)
return
}
recorder.Eventf(curIng, corev1.EventTypeNormal, "Sync", "Scheduled for sync")
} else {
klog.V(3).InfoS("No changes on ingress. Skipping update", "ingress", klog.KObj(curIng))
return
}
store.syncIngress(curIng)
store.updateSecretIngressMap(curIng)
store.syncSecrets(curIng)
updateCh.In() <- Event{
Type: UpdateEvent,
Obj: cur,
}
},
}
internal/ingress/controller/store/store.go
New方法構造各種資源的處理EventHandler
Ingress舉例:ResourceEventHandlerFuncs
處理Ingress的Add、Delete、Update
store.informers.Ingress.AddEventHandler(ingEventHandler)
if !icConfig.IgnoreIngressClass {
store.informers.IngressClass.AddEventHandler(ingressClassEventHandler)
}
store.informers.Endpoint.AddEventHandler(epEventHandler)
store.informers.Secret.AddEventHandler(secrEventHandler)
store.informers.ConfigMap.AddEventHandler(cmEventHandler)
store.informers.Service.AddEventHandler(serviceHandler)
Ingress、IngressClass、Endpoint、Secret、ConfigMap、Service註冊到k8s的sdk Informers中