使用client-go包訪問Kubernetes CRD
安裝kubernetes
最簡單的方式當然是使用sealos 不管是單機還是生產環境還是雲上都可以使用sealos。
wget https://github.com/fanux/sealos/releases/download/v2.0.7/sealos && chmod +x sealos && mv sealos /usr/bin sealos init --passwd YOUR_SERVER_PASSWD \ --master 192.168.0.2 --master 192.168.0.3 --master 192.168.0.4 \ --node 192.168.0.5 \ --pkg-url https://sealyun.oss-cn-beijing.aliyuncs.com/cf6bece970f6dab3d8dc8bc5b588cc18-1.16.0/kube1.16.0.tar.gz \ --version v1.16.0
以上兩條命令就搞定了一個kubernetes 1.16.0叢集,其它版本也一樣
本文非常實用,比較全比較完善的相關文件也比較難找到。
舉個栗子:
我們在實現虛擬機器CRD時,節點上agent需要查詢虛擬機器CRD,這種情況顯然我們不會通過controller進行操作,此時我們就需要知道怎麼直接用client-go操作CRD。
<!--more-->
建立CRD
apiVersion: "apiextensions.k8s.io/v1beta1" kind: "CustomResourceDefinition" metadata: name: "projects.example.sealyun.com" spec: group: "example.sealyun.com" version: "v1alpha1" scope: "Namespaced" names: plural: "projects" singular: "project" kind: "Project" validation: openAPIV3Schema: required: ["spec"] properties: spec: required: ["replicas"] properties: replicas: type: "integer" minimum: 1
這個可以使用kubebuilder或者operator-framework生成, 自己寫太累
要定義自定義資源定義,您需要考慮API組名稱(在本例中example.sealyun.com)。按照慣例,這通常是您控制的域的域名(例如,您組織的域),以防止命名衝突。然後CRD的名稱遵循模式<plural-resource-name>.<api-group-name>,因此在這種情況下projects.example.sealyun.com。
通常,您希望根據特定架構驗證使用者在自定義資源中儲存的資料。這就是spec.validation.openAPIV3Schema它的用途:它包含一個描述資源應具有的格式的JSON模式。
使用kubectl建立資源定義, 如果用kubebuilder可以直接make && make deploy:
> kubectl apply -f projects-crd.yaml
customresourcedefinition "projects.example.sealyun.com" created
可以建立此新資源型別的例項:
apiVersion: "example.sealyun.com/v1alpha1"
kind: "Project"
metadata:
name: "example-project"
namespace: "default"
spec:
replicas: 1
> kubectl apply -f project.yaml
project "example-project" created
> kubectl get projects
NAME AGE
example-project 2m
建立Golang客戶端
接下來,我們將使用client-go包來訪問這些自定義資源。
定義型別
kubebuilder等都會自動為您生成,我這裡為了講清楚所有的東西也加上這塊的相關說明
首先定義自定義資源的型別。通過API組版本組織這些型別是一個很好的做法; 例如,api/types/v1alpha1/project.go:
package v1alpha1
import metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
type ProjectSpec struct {
Replicas int `json:"replicas"`
}
type Project struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec ProjectSpec `json:"spec"`
}
type ProjectList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []Project `json:"items"`
}
該metav1.ObjectMeta型別包含了典型metadata的屬性
定義DeepCopy方法
Kubernetes API(在本例中為Project和ProjectList)提供的每種型別都需要實現該k8s.io/apimachinery/pkg/runtime.Object介面。該介面定義了兩種方法GetObjectKind()和DeepCopyObject()。第一種方法已經由嵌入式metav1.TypeMeta結構提供; 第二個你必須自己實現。
該DeepCopyObject方法旨在生成物件的深層副本。由於這涉及許多樣板程式碼,因此很多工具通常會自動生成這些方法。為了本文的目的,我們將手動完成。繼續向deepcopy.go同一個包新增第二個檔案:
package v1alpha1
import "k8s.io/apimachinery/pkg/runtime"
// DeepCopyInto copies all properties of this object into another object of the
// same type that is provided as a pointer.
func (in *Project) DeepCopyInto(out *Project) {
out.TypeMeta = in.TypeMeta
out.ObjectMeta = in.ObjectMeta
out.Spec = ProjectSpec{
Replicas: in.Spec.Replicas,
}
}
// DeepCopyObject returns a generically typed copy of an object
func (in *Project) DeepCopyObject() runtime.Object {
out := Project{}
in.DeepCopyInto(&out)
return &out
}
// DeepCopyObject returns a generically typed copy of an object
func (in *ProjectList) DeepCopyObject() runtime.Object {
out := ProjectList{}
out.TypeMeta = in.TypeMeta
out.ListMeta = in.ListMeta
if in.Items != nil {
out.Items = make([]Project, len(in.Items))
for i := range in.Items {
in.Items[i].DeepCopyInto(&out.Items[i])
}
}
return &out
}
註冊型別
接下來,您需要使客戶端庫知道新型別。允許客戶端在與API伺服器通訊時自動處理新型別。
為此,register.go請在包中新增一個新檔案:
package v1alpha1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
)
const GroupName = "example.sealyun.com"
const GroupVersion = "v1alpha1"
var SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: GroupVersion}
var (
SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)
AddToScheme = SchemeBuilder.AddToScheme
)
func addKnownTypes(scheme *runtime.Scheme) error {
scheme.AddKnownTypes(SchemeGroupVersion,
&Project{},
&ProjectList{},
)
metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
return nil
}
此程式碼實際上並沒有做任何事情(除了建立新runtime.SchemeBuilder例項)。重要的部分是AddToScheme函式,它是runtime.SchemeBuilder中建立的型別。一旦Kubernetes客戶端初始化為註冊您的型別,您可以稍後從客戶端程式碼的任何部分呼叫此函式。
構建HTTP客戶端
在定義型別並新增方法以在全域性方案構建器中註冊它們之後,您現在可以建立能夠載入自定義資源的HTTP客戶端。
為此,將以下程式碼新增到包的main.go檔案中:
package main
import (
"flag"
"log"
"time"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"github.com/martin-helmich/kubernetes-crd-example/api/types/v1alpha1"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
var kubeconfig string
func init() {
flag.StringVar(&kubeconfig, "kubeconfig", "", "path to Kubernetes config file")
flag.Parse()
}
func main() {
var config *rest.Config
var err error
if kubeconfig == "" {
log.Printf("using in-cluster configuration")
config, err = rest.InClusterConfig()
} else {
log.Printf("using configuration from '%s'", kubeconfig)
config, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
}
if err != nil {
panic(err)
}
v1alpha1.AddToScheme(scheme.Scheme)
crdConfig := *config
crdConfig.ContentConfig.GroupVersion = &schema.GroupVersion{Group: v1alpha1.GroupName, Version: v1alpha1.GroupVersion}
crdConfig.APIPath = "/apis"
crdConfig.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: scheme.Codecs}
crdConfig.UserAgent = rest.DefaultKubernetesUserAgent()
exampleRestClient, err := rest.UnversionedRESTClientFor(&crdConfig)
if err != nil {
panic(err)
}
}
您現在可以使用第exampleRestClient中建立的內容來查詢example.sealyun.com/v1alpha1API組中的所有自定義資源。示例可能如下所示:
result := v1alpha1.ProjectList{}
err := exampleRestClient.
Get().
Resource("projects").
Do().
Into(&result)
為了以更加型別安全的方式使用您的API,通常最好將這些操作包裝在您自己的客戶端集中。為此,建立一個新的子包clientset/v1alpha1。首先,實現一個定義API組型別的介面,並將配置設定從您的main方法移動到該clientset的建構函式中(NewForConfig在下面的示例中):
package v1alpha1
import (
"github.com/martin-helmich/kubernetes-crd-example/api/types/v1alpha1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
)
type ExampleV1Alpha1Interface interface {
Projects(namespace string) ProjectInterface
}
type ExampleV1Alpha1Client struct {
restClient rest.Interface
}
func NewForConfig(c *rest.Config) (*ExampleV1Alpha1Client, error) {
config := *c
config.ContentConfig.GroupVersion = &schema.GroupVersion{Group: v1alpha1.GroupName, Version: v1alpha1.GroupVersion}
config.APIPath = "/apis"
config.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: scheme.Codecs}
config.UserAgent = rest.DefaultKubernetesUserAgent()
client, err := rest.RESTClientFor(&config)
if err != nil {
return nil, err
}
return &ExampleV1Alpha1Client{restClient: client}, nil
}
func (c *ExampleV1Alpha1Client) Projects(namespace string) ProjectInterface {
return &projectClient{
restClient: c.restClient,
ns: namespace,
}
}
以上是對client的封裝
接下來,您需要實現一個特定的Project客戶端集來訪問自定義資源(請注意,上面的示例已經使用了我們仍需要提供的ProjectInterface和projectClient型別)。projects.go在同一個包中建立第二個檔案:
package v1alpha1
import (
"github.com/martin-helmich/kubernetes-crd-example/api/types/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
)
type ProjectInterface interface {
List(opts metav1.ListOptions) (*v1alpha1.ProjectList, error)
Get(name string, options metav1.GetOptions) (*v1alpha1.Project, error)
Create(*v1alpha1.Project) (*v1alpha1.Project, error)
Watch(opts metav1.ListOptions) (watch.Interface, error)
// ...
}
type projectClient struct {
restClient rest.Interface
ns string
}
func (c *projectClient) List(opts metav1.ListOptions) (*v1alpha1.ProjectList, error) {
result := v1alpha1.ProjectList{}
err := c.restClient.
Get().
Namespace(c.ns).
Resource("projects").
VersionedParams(&opts, scheme.ParameterCodec).
Do().
Into(&result)
return &result, err
}
func (c *projectClient) Get(name string, opts metav1.GetOptions) (*v1alpha1.Project, error) {
result := v1alpha1.Project{}
err := c.restClient.
Get().
Namespace(c.ns).
Resource("projects").
Name(name).
VersionedParams(&opts, scheme.ParameterCodec).
Do().
Into(&result)
return &result, err
}
func (c *projectClient) Create(project *v1alpha1.Project) (*v1alpha1.Project, error) {
result := v1alpha1.Project{}
err := c.restClient.
Post().
Namespace(c.ns).
Resource("projects").
Body(project).
Do().
Into(&result)
return &result, err
}
func (c *projectClient) Watch(opts metav1.ListOptions) (watch.Interface, error) {
opts.Watch = true
return c.restClient.
Get().
Namespace(c.ns).
Resource("projects").
VersionedParams(&opts, scheme.ParameterCodec).
Watch()
}
上面還缺少一些Delete Update方法,照抄就行,或者參考pod的實現
再去使用就變的非常簡單了:
import clientV1alpha1 "github.com/martin-helmich/kubernetes-crd-example/clientset/v1alpha1"
// ...
func main() {
// ...
clientSet, err := clientV1alpha1.NewForConfig(config)
if err != nil {
panic(err)
}
projects, err := clientSet.Projects("default").List(metav1.ListOptions{})
if err != nil {
panic(err)
}
fmt.Printf("projects found: %+v\n", projects)
}
建立informer
構建Kubernetes operator時,通常希望能夠對新建立或更新的事件進行監聽。理論上,可以定期呼叫該List()方法並檢查是否添加了新資源。
大多數情況通過使用初始List()初始載入資源的所有相關例項,然後使用Watch()訂閱相關事件進行處理。然後,使用從informer接收的初始物件列表和更新來構建本地快取,該快取允許快速訪問任何自定義資源,而無需每次都訪問API伺服器。
這種模式非常普遍,以至於client-go庫為此提供了一個cache包:來自包的Informerk8s.io/client-go/tools/cache。您可以為自定義資源構建新的Informer,如下所示:
package main
import (
"time"
"github.com/martin-helmich/kubernetes-crd-example/api/types/v1alpha1"
client_v1alpha1 "github.com/martin-helmich/kubernetes-crd-example/clientset/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
)
func WatchResources(clientSet client_v1alpha1.ExampleV1Alpha1Interface) cache.Store {
projectStore, projectController := cache.NewInformer(
&cache.ListWatch{
ListFunc: func(lo metav1.ListOptions) (result runtime.Object, err error) {
return clientSet.Projects("some-namespace").List(lo)
},
WatchFunc: func(lo metav1.ListOptions) (watch.Interface, error) {
return clientSet.Projects("some-namespace").Watch(lo)
},
},
&v1alpha1.Project{},
1*time.Minute,
cache.ResourceEventHandlerFuncs{},
)
go projectController.Run(wait.NeverStop)
return projectStore
}
該NewInformer方法返回兩個物件:第二個返回值,控制器控制List()和Watch()呼叫並填充第一個返回值,Store快取監聽到的一些資訊。 您現在可以使用store輕鬆訪問CRD,列出所有CRD或通過名稱訪問它們。store函式返回interface{}型別,因此您必須將它們強制轉換回CRD型別:
store := WatchResource(clientSet)
project := store.GetByKey("some-namespace/some-project").(*v1alpha1.Project)
如此很多情況下就不需要再去呼叫apiserver了,給apiserver減輕壓力.
在kubebuilder中進行訪問
通過獲取manager中的reader, 但是這裡只能讀不能寫,寫的話需要mgr.GetClient() 但是這個就必須是長時間執行的
package main
import (
"context"
"fmt"
"os"
"k8s.io/apimachinery/pkg/types"
v1 "github.com/fanux/sealvm/api/v1"
"github.com/prometheus/common/log"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
)
var scheme = runtime.NewScheme()
func init() {
v1.AddToScheme(scheme)
clientgoscheme.AddToScheme(scheme)
}
func main() {
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
})
if err != nil {
os.Exit(1)
}
client := mgr.GetAPIReader() // 如果是長時間執行用mgr.GetClient()
ctx := context.Background()
name := types.NamespacedName{Namespace: "default", Name: "virtulmachine-sample"}
vm := &v1.VirtulMachine{}
if err := client.Get(ctx, name, vm); err != nil {
log.Error(err, "unable to fetch vm")
} else {
fmt.Println(vm.Spec.CPU, vm.Spec.Memory, vm)
}
}
推薦做法,直接呼叫client:
package main
import (
"context"
"fmt"
"sigs.k8s.io/controller-runtime/pkg/client"
"k8s.io/apimachinery/pkg/types"
v1 "github.com/fanux/sealvm/api/v1"
"github.com/prometheus/common/log"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
)
var scheme = runtime.NewScheme()
func init() {
v1.AddToScheme(scheme)
clientgoscheme.AddToScheme(scheme)
}
func getClient() (client.Client, error){
config := ctrl.GetConfigOrDie()
if config == nil {
return nil, fmt.Errorf("config is nil")
}
options := ctrl.Options{Scheme:scheme}
// Set default values for options fields
//options = setOptionsDefaults(options)
//mapper, err := options.MapperProvider(config)
//if err != nil {
// log.Error(err, "Failed to get API Group-Resources")
// return nil, err
//}
client, err := client.New(config, client.Options{Scheme: options.Scheme})
if err !=nil {
return nil, err
}
return client,nil
}
func main() {
client,err := getClient()
if err != nil {
fmt.Println("client is nil",err)
return
}
ctx := context.Background()
name := types.NamespacedName{Namespace: "default", Name: "virtulmachine-sample"}
vm := &v1.VirtulMachine{}
if err = client.Get(ctx, name, vm); err != nil {
log.Error(err, "unable to fetch vm")
} else {
fmt.Println(vm.Spec.CPU, vm.Spec.Memory, vm)
}
}
總結
雖然現在很多工具給我們寫CRD controller帶來了極大的便捷,但是對於client-go這些基本的使用還是非常必要的,而官方client-go的開發文件和事例真的是少之又少,基本僅包含非常基本的操作。
還有一個dynamic client的方式也可以用來訪問自定義CRD,但是文中的方式會更優雅更清晰更適合工程化。 k