使用client-go實現自定義控制器
阿新 • • 發佈:2022-05-11
使用client-go實現自定義控制器
介紹
我們已經知道,Service對叢集之外暴露服務的主要方式有兩種:NodePort和LoadBalancer,但是這兩種方式,都有一定的缺點:
- NodePort方式的缺點是會佔用很多叢集機器的埠,那麼當叢集服務變多的時候,這個缺點就愈發明顯。
- LoadBalancer的缺點是每個Service都需要一個LB,浪費,麻煩,並且需要Kubernetes之外的裝置的支援。
基於這種現狀,Kubernetes提供了Ingress資源物件,Ingress只需要一個NodePort或者一個LB就可以滿足暴露多個Service的需求。
客戶端首先對 域名 執行 DNS 解析,得到 Ingress Controller 所在節點的 IP,然後客戶端向 Ingress Controller 傳送 HTTP 請求,然後根據 Ingress 物件裡面的描述匹配域名,找到對應的 Service 物件,並獲取關聯的 Endpoints 列表,將客戶端的請求轉發給其中一個 Pod。
本文我們來使用client-go實現一個自定義控制器,通過判斷service
的Annotations
屬性是否包含ingress/http
欄位,如果包含則建立ingress,如果不包含則不建立。而且如果存在ingress
則進行刪除。
具體實現
首先我們建立專案。
$ mkdir ingress-manager && cd ingress-manager $ go mod init ingress-manager # 由於控制器部分的內容比較多,將它們單獨放到pkg目錄下 $ mkdir pkg # 最終專案目錄結構如下 . ├── go.mod ├── go.sum ├── main.go └── pkg └── controller.go
接著我們來實現controller部分:
package pkg import ( "context" apiCoreV1 "k8s.io/api/core/v1" netV1 "k8s.io/api/networking/v1" "k8s.io/apimachinery/pkg/api/errors" metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" informersCoreV1 "k8s.io/client-go/informers/core/v1" informersNetV1 "k8s.io/client-go/informers/networking/v1" "k8s.io/client-go/kubernetes" coreV1 "k8s.io/client-go/listers/core/v1" v1 "k8s.io/client-go/listers/networking/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "reflect" "time" ) const ( workNum = 5 // 工作的節點數 maxRetry = 10 // 最大重試次數 ) // 定義控制器 type Controller struct { client kubernetes.Interface ingressLister v1.IngressLister serviceLister coreV1.ServiceLister queue workqueue.RateLimitingInterface } // 初始化控制器 func NewController(client kubernetes.Interface, serviceInformer informersCoreV1.ServiceInformer, ingressInformer informersNetV1.IngressInformer) Controller { c := Controller{ client: client, ingressLister: ingressInformer.Lister(), serviceLister: serviceInformer.Lister(), queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ingressManager"), } // 新增事件處理函式 serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.addService, UpdateFunc: c.updateService, }) ingressInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ DeleteFunc: c.deleteIngress, }) return c } // 入隊 func (c *Controller) enqueue(obj interface{}) { key, err := cache.MetaNamespaceKeyFunc(obj) if err != nil { runtime.HandleError(err) } c.queue.Add(key) } func (c *Controller) addService(obj interface{}) { c.enqueue(obj) } func (c *Controller) updateService(oldObj, newObj interface{}) { // todo 比較annotation // 這裡只是比較了物件是否相同,如果相同,直接返回 if reflect.DeepEqual(oldObj, newObj) { return } c.enqueue(newObj) } func (c *Controller) deleteIngress(obj interface{}) { ingress := obj.(*netV1.Ingress) ownerReference := metaV1.GetControllerOf(ingress) if ownerReference == nil { return } // 判斷是否為真的service if ownerReference.Kind != "Service" { return } c.queue.Add(ingress.Namespace + "/" + ingress.Name) } // 啟動控制器,可以看到開了五個協程,真正幹活的是worker func (c *Controller) Run(stopCh chan struct{}) { for i := 0; i < workNum; i++ { go wait.Until(c.worker, time.Minute, stopCh) } <-stopCh } func (c *Controller) worker() { for c.processNextItem() { } } // 業務真正處理的地方 func (c *Controller) processNextItem() bool { // 獲取key item, shutdown := c.queue.Get() if shutdown { return false } defer c.queue.Done(item) // 呼叫業務邏輯 err := c.syncService(item.(string)) if err != nil { // 對錯誤進行處理 c.handlerError(item.(string), err) return false } return true } func (c *Controller) syncService(item string) error { namespace, name, err := cache.SplitMetaNamespaceKey(item) if err != nil { return err } // 獲取service service, err := c.serviceLister.Services(namespace).Get(name) if err != nil { if errors.IsNotFound(err) { return nil } return err } // 新增和刪除 _, ok := service.GetAnnotations()["ingress/http"] ingress, err := c.ingressLister.Ingresses(namespace).Get(name) if err != nil && !errors.IsNotFound(err) { return err } if ok && errors.IsNotFound(err) { // 建立ingress ig := c.constructIngress(service) _, err := c.client.NetworkingV1().Ingresses(namespace).Create(context.TODO(), ig, metaV1.CreateOptions{}) if err != nil { return err } } else if !ok && ingress != nil { // 刪除ingress err := c.client.NetworkingV1().Ingresses(namespace).Delete(context.TODO(), name, metaV1.DeleteOptions{}) if err != nil { return err } } return nil } func (c *Controller) handlerError(key string, err error) { // 如果出現錯誤,重新加入佇列,最大處理10次 if c.queue.NumRequeues(key) <= maxRetry { c.queue.AddRateLimited(key) return } runtime.HandleError(err) c.queue.Forget(key) } func (c *Controller) constructIngress(service *apiCoreV1.Service) *netV1.Ingress { // 構造ingress pathType := netV1.PathTypePrefix ingress := netV1.Ingress{} ingress.ObjectMeta.OwnerReferences = []metaV1.OwnerReference{ *metaV1.NewControllerRef(service, apiCoreV1.SchemeGroupVersion.WithKind("Service")), } ingress.Namespace = service.Namespace ingress.Name = service.Name ingress.Spec = netV1.IngressSpec{ Rules: []netV1.IngressRule{ { Host: "example.com", IngressRuleValue: netV1.IngressRuleValue{ HTTP: &netV1.HTTPIngressRuleValue{ Paths: []netV1.HTTPIngressPath{ { Path: "/", PathType: &pathType, Backend: netV1.IngressBackend{ Service: &netV1.IngressServiceBackend{ Name: service.Name, Port: netV1.ServiceBackendPort{ Number: 80, }, }, }, }, }, }, }, }, }, } return &ingress }
接下來我們來實現main:
package main
import (
"ingress-manager/pkg"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
func main() {
// 獲取config
// 先嚐試從叢集外部獲取,獲取不到則從叢集內部獲取
var config, err = clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
if err != nil {
clusterConfig, err := rest.InClusterConfig()
if err != nil {
panic(err)
}
config = clusterConfig
}
// 通過config建立 clientSet
clientSet, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err)
}
// 通過 client 建立 informer,新增事件處理函式
factory := informers.NewSharedInformerFactory(clientSet, 0)
serviceInformer := factory.Core().V1().Services()
ingressInformer := factory.Networking().V1().Ingresses()
newController := pkg.NewController(clientSet, serviceInformer, ingressInformer)
// 啟動 informer
stopCh := make(chan struct{})
factory.Start(stopCh)
factory.WaitForCacheSync(stopCh)
newController.Run(stopCh)
}
測試
首先建立deploy和service:
apiVersion: apps/v1
kind: Deployment
metadata:
name: my-nginx
spec:
selector:
matchLabels:
app: my-nginx
template:
metadata:
labels:
app: my-nginx
spec:
containers:
- name: my-nginx
image: nginx:1.17.1
ports:
- containerPort: 80
---
apiVersion: v1
kind: Service
metadata:
name: my-nginx
labels:
app: my-nginx
spec:
ports:
- port: 80
protocol: TCP
name: http
selector:
app: my-nginx
建立完成後進行檢視:
$ kubectl get deploy,service,ingress
NAME READY UP-TO-DATE AVAILABLE AGE
deployment.apps/my-nginx 1/1 1 1 7m
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
service/kubernetes ClusterIP 10.96.0.1 <none> 443/TCP 78d
service/my-nginx ClusterIP 10.105.32.46 <none> 80/TCP 7m
上面的命令我分別獲取deploy
,service
,ingress
,但是隻獲取到了deploy
和service
,這符合我們的預期。接著我們給service/m-nginx中的annotations
新增ingress/http: nginx
:
$ kubectl edit service/my-nginx
apiVersion: v1
kind: Service
metadata:
annotations:
ingress/http: nginx
kubectl.kubernetes.io/last-applied-configuration: |
{"apiVersion":"v1","kind":"Service","metadata":{"annotations":{},"labels":{"app":"my-nginx"},"name":"my-nginx","namespace":"default"},"spec":{"ports":[{"name":"http","port":80,"protocol":"TCP"}],"selector":{"app":"my-nginx"}}}
......
service/my-nginx edited
重新進行檢視:
$ kubectl get deploy,service,ingress
NAME READY UP-TO-DATE AVAILABLE AGE
deployment.apps/demo-deployment 1/1 1 1 41d
deployment.apps/my-nginx 1/1 1 1 11m
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
service/kubernetes ClusterIP 10.96.0.1 <none> 443/TCP 78d
service/my-nginx ClusterIP 10.105.32.46 <none> 80/TCP 11m
NAME CLASS HOSTS ADDRESS PORTS AGE
ingress.networking.k8s.io/my-nginx <none> example.com 80 19s
接著我們再來測試下,將ingress/http: nginx
註釋掉,看看ingress是否會自動刪除:
$ kubectl get deploy,service,ingress
NAME READY UP-TO-DATE AVAILABLE AGE
deployment.apps/demo-deployment 1/1 1 1 41d
deployment.apps/my-nginx 1/1 1 1 19m
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
service/kubernetes ClusterIP 10.96.0.1 <none> 443/TCP 78d
service/my-nginx ClusterIP 10.105.32.46 <none> 80/TCP 19m
我們發現和我們預期的效果一樣。
如果service被刪除了,ingress肯定也是不會存在的。這個這裡就不多演示了。有興趣可以自行測試下。