1. 程式人生 > 程式設計 >基於任務量進行k8s叢集的靈活排程處理

基於任務量進行k8s叢集的靈活排程處理

前言

最近公司內有個需求是為了進一步控制某個專案的k8s叢集的資源,避免資源浪費。

目前專案需要的資源佔用率很高,需要3核CPU、2G記憶體。在一開始的時候是沒有做靈活排程處理的。會讓Pod一直處於執行狀態,即使沒有任務的時候也會一直執行,雖然說可以通過k8sResourcesRequestsLimits減少一點資源,但是還是會照成一定資源的浪費。

介紹

在正文開始前,需要把流程介紹一下,方便後文的理解。

首先別的部門會往資料庫裡插入一條資料,然後在由排程器去定期的掃資料庫,掃到一個新資料,則由排程器去呼叫k8s的api去建立一個Job資源,在Job裡有一個Pod,由Pod去做一些任務。然後結束。

看起來比較簡單,但是有幾個需要注意的地方:

  1. 由於Pod是需要環境變數的,而Pod是由排程器去建立的。那麼這個時候就需要把變數一步步傳進去
  2. 排程器不能去更改任何的資料,只能從資料庫裡拿,這是為了更好的解耦。不能讓排程器去關心任何的業務邏輯及資料
  3. 排程器的本身不能存有任何的狀態,因為一旦涉及到狀態,就要去有個地方去儲存它。因為要考慮到排程器本身重啟。這樣做只會帶來更大的負擔。
  4. 需要考慮當前的叢集是否有資源再啟動Pod

開始

排程器使用了GoLang進行開發,所以後文都將使用Go做為主力語言。

建立一個可除錯的k8s環境

目前因為使用的是Go進行開發,所以使用了k8s官方的client-go

這個庫。而這個庫本身就提供了一些建立clientset的方法(可以把clientset理解成一個可以和當前叢集master通訊的管道)

package main

import (
  "fmt"

  "k8s.io/client-go/kubernetes"
  "k8s.io/client-go/rest"
  "k8s.io/client-go/tools/clientcmd"
)

func main() {
  // 這個方法裡包含了k8s自身對Cluster的配置操作
  kubeConfig,err := rest.InClusterConfig()

  if err != nil {
    // 如果是開發環境,則使用當前minikube。需要配置KUBECONFIG變數
// 如果是minikube,KUBECONFIG變數可以指向$HOME/.kube/config kubeConfig,err = clientcmd.NewNonInteractiveDeferredLoadingClientConfig( clientcmd.NewDefaultClientConfigLoadingRules(),&clientcmd.ConfigOverrides{}).ClientConfig() // 如果沒有配置KUBECONFIG變數,且當前也沒有在叢集裡執行 if err != nil { panic("get k8s config fail: " + err.Error()) } } // 建立clientset失敗 clientset,err := kubernetes.NewForConfig(kubeConfig) if err != nil { panic("failed to create k8s clientset: " + err.Error()) } // 建立成功 fmt.Println(clientset) } 複製程式碼

其中rest.InClusterConfig()程式碼也十分簡單,就是去當前機器下的/var/run/secrets/kubernetes.io/serviceaccount/讀取tokenca。以及讀取KUBERNETES_SERVICE_HOSTKUBERNETES_SERVICE_PORT環境變數,再把他們拼在一起,感興趣的同學可以去看下原始碼

根據上文可以知道rest.InClusterConfig()是針對以及身在叢集中的機器而言的。在本地開發環境是肯定不行的。所以我們需要另一個方法去解決這個問題。

可以看到上面我已經做了處理,當發現InClusterConfig失敗後,會轉而執行下面的程式碼:

kubeConfig,err = clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
  clientcmd.NewDefaultClientConfigLoadingRules(),&clientcmd.ConfigOverrides{}).ClientConfig()
複製程式碼

這段程式碼其實也比較簡單,就是去讀取當前環境下的KUBECONFIG獲取本地k8s的配置路徑。如果沒有這個變數,再去獲取當前使用者目錄下的.kube/config檔案。最終根據檔案改造成所需要的配置。主要原始碼可見: NewDefaultClientConfigLoadingRulesClientConfig

現在只要保證你本機有minikube環境就可以正常除錯、開發了。

以上的方法參考rook的寫法

建立Job及Pod

資料庫查詢的這裡就不再闡述了,可以根據自身的業務進行適配、開發。這裡只是起到一個拋磚引玉的效果。不止是資料庫,其他任何東西都可以,主要還是要看自身的業務適合什麼。

我們先假設,這裡從資料庫裡拿到了一條資料,我們需要把資料庫的值傳給Pod。避免Pod裡再做一次查詢。現在我們需要先把Job定義好:

import (
  batchv1 "k8s.io/api/batch/v1"
  apiv1 "k8s.io/api/core/v1"
  "k8s.io/apimachinery/pkg/api/resource"
  metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// job所需配置
type JobsSpec struct {
  Namespace string
  Image     string
  Prefix    string
}

// 返回指定的cpu、memory資源值
// 寫法參考k8s見:https://github.com/kubernetes/kubernetes/blob/b3875556b0edf3b5eaea32c69678edcf4117d316/pkg/kubelet/cm/helpers_linux_test.go#L36-L53
func getResourceList(cpu,memory string) apiv1.ResourceList {
  res := apiv1.ResourceList{}
  if cpu != "" {
    res[apiv1.ResourceCPU] = resource.MustParse(cpu)
  }
  if memory != "" {
    res[apiv1.ResourceMemory] = resource.MustParse(memory)
  }
  return res
}

// 返回ResourceRequirements物件,詳細見getResourceList函式註釋
func getResourceRequirements(requests,limits apiv1.ResourceList) apiv1.ResourceRequirements {
  res := apiv1.ResourceRequirements{}
  res.Requests = requests
  res.Limits = limits
  return res
}

// 轉為指標
func newInt64(i int64) *int64 {
  return &i
}

// 建立job的配置
// 返回指定的cpu、memory資源值
// 寫法參考k8s見:https://github.com/kubernetes/kubernetes/blob/b3875556b0edf3b5eaea32c69678edcf4117d316/pkg/kubelet/cm/helpers_linux_test.go#L36-L53
func getResourceList(cpu,limits apiv1.ResourceList) apiv1.ResourceRequirements {
  res := apiv1.ResourceRequirements{}
  res.Requests = requests
  res.Limits = limits
  return res
}

// job所需配置
type jobsSpec struct {
  Namespace string
  Image     string
  Prefix    string
}

// 建立job的配置
func (j *jobsSpec) Create(envMap map[string]string) *batchv1.Job {
  u2 := uuid.NewV4().String()[:8]
  name := fmt.Sprint(j.Prefix,"-",u2)
  
  return &batchv1.Job{
    ObjectMeta: metav1.ObjectMeta{
      Name:      name,Namespace: j.Namespace,},Spec: batchv1.JobSpec{
      Template: apiv1.PodTemplateSpec{
        Spec: apiv1.PodSpec{
          RestartPolicy: "Never",Containers: []apiv1.Container{
            {
              Name:            name,Image:           j.Image,Env:             EnvToVars(envMap),ImagePullPolicy: "Always",Resources:       getResourceRequirements(getResourceList("2500m","2048Mi"),getResourceList("3000m","2048Mi")),}
}
複製程式碼

這裡沒什麼好說的,基本就是資源定義,以及上門還有註釋。

上面的程式碼其實少了一部分,這部分是把變數注入進去的。也就是EnvToVars,核心程式碼如下:

// 把物件轉化成k8s所能接受的環境變數格式
func EnvToVars(envMap map[string]string) []v1.EnvVar {
  var envVars []v1.EnvVar
  for k,v := range envMap {
    envVar := v1.EnvVar{
      Name:  k,Value: v,}
    envVars = append(envVars,envVar)
  }
  return envVars
}

// 獲取當前系統中所有的變數,並轉成map方式
func GetAllEnvToMap() map[string]string {
  item := make(map[string]string)
  for _,k := range os.Environ() {
    splits := strings.Split(k,"=")
    item[splits[0]] = splits[1]
  }

  return item
}

// 合併兩個map,為了更好的效能,使用閉包的方式,這樣sourceMap只需要呼叫一次即可
func MergeMap(sourceMap map[string]string) func(insertMap map[string]string) map[string]string {
  return func(insertMap map[string]string) map[string]string {
    for k,v := range insertMap {
      sourceMap[k] = v
    }

    return sourceMap
  }
}
複製程式碼

然後在使用的時候就是這樣了:

job := jobsSpec{
  Prefix:    "project-" + "dev" + "-job",Image:     "docker image name",Namespace: "default",}

willMergeMap := MergeMap(GetAllEnvToMap())

// dbData是從資料庫裡拿到的資料,格式大致如下
// [ { id: 1,url: 'xxx' },{ id: 2,url: 'yyy' } ]
for _,data := range dbData {
  currentEnvMap := willMergeMap(data)

  // 建立Job
  _,err = api.CreateJob(currentEnvMap)
  
  if err != nil {
    panic("create job fail",err.Error())
  }
}
複製程式碼

這樣一來,就實現了把當前環境變數及資料通過變數的方式傳給Pod。這樣的話,只需要保證當前的排程器裡存在一些Pod可能會用到的變數就行了,如:S3 TokenDB Host等。通過這種方式,Pod基本上什麼都不用關係,它所需要的變數,會由排程器傳給它,分工明確。

優化

其實以上其實就已經完成了最核心的東西,本身也不是特別的難。很簡單的邏輯。只不過光有這些是不夠的,還有很多地方需要考慮。

資源判斷

這裡在說之前有個前提,之前說過這個排程器是不能去更改任何資料的,更改資料只能由Pod裡的容器去更改。

那麼這個時候就有問題了。

叢集如果資源不夠分配的話,那Pod將會一直處於Pending狀態,根據上文,變數已經注入到Pod裡了,而且由於裡面的容器沒有啟動。那就會導致資料沒有更改,而沒有更改的資料,排程器就會一直認為他的新的。導致會為這條資料再啟動一個Job,一直迴圈到當叢集資源足夠後其中的一個Pod去更改了資料。

舉個例子,假設資料庫裡有一個status的欄位,當值為wating時,排程器就認為這是一條新資料,會把這個資料轉變成環境變數注入到Pod裡,再由Pod去把waiting更改成process。排程器每3分鐘去掃一次資料,所以Pod必須在3分鐘內把資料更改完畢。

而這時由於資源不夠,k8s建立了這個Pod,但是裡面的程式碼沒有執行,導致沒有去更改資料,就會導致排程器一直去為同一條資料建立Pod。

解決方案也比較簡單,只要去判斷下Pod的狀態是否為Pending,如果是,則不再建立Pod。下面是核心程式碼:

func HavePendingPod() (bool,error) {
  // 獲取當前namespace下所有的pod
  pods,err := clientset.CoreV1().Pods(Namespace).List(metaV1.ListOptions{})
  if err != nil {
    return false,err
  }

  // 迴圈pod,判斷每個pod是否符合當前的字首,如果符合,則說明當前的環境已經存在Pending狀態了
  for _,v := range pods.Items {
    phase := v.Status.Phase
    if phase == "Pending" {
      if strings.HasPrefix(v.Name,Prefix) {
        return true,nil
      }
    }
  }

  return false,nil
}
複製程式碼

當為true時,就不再建立Job

Job數量最大值

叢集的資源也不是無限的,雖然我們對Pending情況做了處理,但是這只是一種防禦手段。我們還是要對數量進行一個管控,當Job數量等於某個值時,不在建立Job了。程式碼也很簡單,我這裡就把獲取當前環境下Job數量的程式碼放出來:

// 獲取當前namespace下同環境的job Item例項
func GetJobListByNS() ([]v1.Job,error) {
  var jobList,err = clientset.BatchV1().Jobs(Namespace).List(metaV1.ListOptions{})
    if err != nil {
    return nil,err
  }

  // 過濾非同字首的Job
  var item []v1.Job
  for _,v := range jobList.Items {
    if strings.HasPrefix(v.Name,Prefix) {
      item = append(item,v)
    }
  }

  return item,nil
}

func GetJobLenByNS() (int,error) {
  jobItem,err := api.GetJobListByNS()
  if err != nil {
      return 最大值,err
  }
  
  return len(jobItem),nil
}
複製程式碼

刪除已完成和失敗的

上面的程式碼其實是有問題的,k8s的Job資源型別是有一個特性是,當完成或者失敗的時候,並不會刪除自身,也就說即使他完成了,它的資料還會一直停留在那。所以上面的程式碼會把一些已經完成或者失敗的Job也統計進去。到最後會出現一直無法建立Job的窘迫。

解決方案有兩個,第一個是在宣告Job資源時,新增spec.ttlSecondsAfterFinished屬性來做到k8s自動回收完成、失敗的Job。可惜的是這是高版本才有的屬性,我司很遺憾是低版本的。那就只能用第二種方法了,就是在每次獲取數量前,呼叫api把完成、失敗的Job刪除:

func DeleteCompleteJob() error {
  jobItem,err := GetJobListByNS()
  if err != nil {
    return err
  }

  // 如果不指定此屬性,刪除job時,不會刪除pod
  propagationPolicy := metaV1.DeletePropagationForeground
  for _,v := range jobItem {
    // 只刪除已經結束的job
    if v.Status.Failed == 1 || v.Status.Succeeded == 1 {
      err := clientset.BatchV1().Jobs(Namespace).Delete(v.Name,&metaV1.DeleteOptions{
        PropagationPolicy: &propagationPolicy,})

      if err != nil {
        return err
      }
    }
  }

  return nil
}
複製程式碼

結尾

整個排程器的程式碼比較簡單,沒有必要專門抽成一個庫來做。只要知道大概的思路,就可以根據自己的專案需求做出適合自己專案組的排程器。

感謝我司大佬@qqshfox提供的思路