基於任務量進行k8s叢集的靈活排程處理
前言
最近公司內有個需求是為了進一步控制某個專案的k8s叢集的資源,避免資源浪費。
目前專案需要的資源佔用率很高,需要3核CPU、2G記憶體。在一開始的時候是沒有做靈活排程處理的。會讓Pod
一直處於執行狀態,即使沒有任務的時候也會一直執行,雖然說可以通過k8s
下Resources
的Requests
和Limits
減少一點資源,但是還是會照成一定資源的浪費。
介紹
在正文開始前,需要把流程介紹一下,方便後文的理解。
首先別的部門會往資料庫裡插入一條資料,然後在由排程器去定期的掃資料庫,掃到一個新資料,則由排程器去呼叫k8s的api去建立一個Job
資源,在Job裡有一個Pod
,由Pod
去做一些任務。然後結束。
看起來比較簡單,但是有幾個需要注意的地方:
- 由於
Pod
是需要環境變數的,而Pod
是由排程器去建立的。那麼這個時候就需要把變數一步步傳進去 - 排程器不能去更改任何的資料,只能從資料庫裡拿,這是為了更好的解耦。不能讓排程器去關心任何的業務邏輯及資料
- 排程器的本身不能存有任何的狀態,因為一旦涉及到狀態,就要去有個地方去儲存它。因為要考慮到排程器本身重啟。這樣做只會帶來更大的負擔。
- 需要考慮當前的叢集是否有資源再啟動
Pod
了
開始
排程器使用了GoLang
進行開發,所以後文都將使用Go
做為主力語言。
建立一個可除錯的k8s環境
目前因為使用的是Go
進行開發,所以使用了k8s官方的client-go
clientset
的方法(可以把clientset
理解成一個可以和當前叢集master通訊的管道)
package main
import (
"fmt"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
func main() {
// 這個方法裡包含了k8s自身對Cluster的配置操作
kubeConfig,err := rest.InClusterConfig()
if err != nil {
// 如果是開發環境,則使用當前minikube。需要配置KUBECONFIG變數
// 如果是minikube,KUBECONFIG變數可以指向$HOME/.kube/config
kubeConfig,err = clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
clientcmd.NewDefaultClientConfigLoadingRules(),&clientcmd.ConfigOverrides{}).ClientConfig()
// 如果沒有配置KUBECONFIG變數,且當前也沒有在叢集裡執行
if err != nil {
panic("get k8s config fail: " + err.Error())
}
}
// 建立clientset失敗
clientset,err := kubernetes.NewForConfig(kubeConfig)
if err != nil {
panic("failed to create k8s clientset: " + err.Error())
}
// 建立成功
fmt.Println(clientset)
}
複製程式碼
其中rest.InClusterConfig()
程式碼也十分簡單,就是去當前機器下的/var/run/secrets/kubernetes.io/serviceaccount/
讀取token
和ca
。以及讀取KUBERNETES_SERVICE_HOST
和KUBERNETES_SERVICE_PORT
環境變數,再把他們拼在一起,感興趣的同學可以去看下原始碼。
根據上文可以知道rest.InClusterConfig()
是針對以及身在叢集中的機器而言的。在本地開發環境是肯定不行的。所以我們需要另一個方法去解決這個問題。
可以看到上面我已經做了處理,當發現InClusterConfig
失敗後,會轉而執行下面的程式碼:
kubeConfig,err = clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
clientcmd.NewDefaultClientConfigLoadingRules(),&clientcmd.ConfigOverrides{}).ClientConfig()
複製程式碼
這段程式碼其實也比較簡單,就是去讀取當前環境下的KUBECONFIG
獲取本地k8s的配置路徑。如果沒有這個變數,再去獲取當前使用者目錄下的.kube/config
檔案。最終根據檔案改造成所需要的配置。主要原始碼可見: NewDefaultClientConfigLoadingRules、ClientConfig
現在只要保證你本機有minikube
環境就可以正常除錯、開發了。
以上的方法參考rook的寫法
建立Job及Pod
資料庫查詢的這裡就不再闡述了,可以根據自身的業務進行適配、開發。這裡只是起到一個拋磚引玉的效果。不止是資料庫,其他任何東西都可以,主要還是要看自身的業務適合什麼。
我們先假設,這裡從資料庫裡拿到了一條資料,我們需要把資料庫的值傳給Pod。避免Pod裡再做一次查詢。現在我們需要先把Job定義好:
import (
batchv1 "k8s.io/api/batch/v1"
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// job所需配置
type JobsSpec struct {
Namespace string
Image string
Prefix string
}
// 返回指定的cpu、memory資源值
// 寫法參考k8s見:https://github.com/kubernetes/kubernetes/blob/b3875556b0edf3b5eaea32c69678edcf4117d316/pkg/kubelet/cm/helpers_linux_test.go#L36-L53
func getResourceList(cpu,memory string) apiv1.ResourceList {
res := apiv1.ResourceList{}
if cpu != "" {
res[apiv1.ResourceCPU] = resource.MustParse(cpu)
}
if memory != "" {
res[apiv1.ResourceMemory] = resource.MustParse(memory)
}
return res
}
// 返回ResourceRequirements物件,詳細見getResourceList函式註釋
func getResourceRequirements(requests,limits apiv1.ResourceList) apiv1.ResourceRequirements {
res := apiv1.ResourceRequirements{}
res.Requests = requests
res.Limits = limits
return res
}
// 轉為指標
func newInt64(i int64) *int64 {
return &i
}
// 建立job的配置
// 返回指定的cpu、memory資源值
// 寫法參考k8s見:https://github.com/kubernetes/kubernetes/blob/b3875556b0edf3b5eaea32c69678edcf4117d316/pkg/kubelet/cm/helpers_linux_test.go#L36-L53
func getResourceList(cpu,limits apiv1.ResourceList) apiv1.ResourceRequirements {
res := apiv1.ResourceRequirements{}
res.Requests = requests
res.Limits = limits
return res
}
// job所需配置
type jobsSpec struct {
Namespace string
Image string
Prefix string
}
// 建立job的配置
func (j *jobsSpec) Create(envMap map[string]string) *batchv1.Job {
u2 := uuid.NewV4().String()[:8]
name := fmt.Sprint(j.Prefix,"-",u2)
return &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: name,Namespace: j.Namespace,},Spec: batchv1.JobSpec{
Template: apiv1.PodTemplateSpec{
Spec: apiv1.PodSpec{
RestartPolicy: "Never",Containers: []apiv1.Container{
{
Name: name,Image: j.Image,Env: EnvToVars(envMap),ImagePullPolicy: "Always",Resources: getResourceRequirements(getResourceList("2500m","2048Mi"),getResourceList("3000m","2048Mi")),}
}
複製程式碼
這裡沒什麼好說的,基本就是資源定義,以及上門還有註釋。
上面的程式碼其實少了一部分,這部分是把變數注入進去的。也就是EnvToVars
,核心程式碼如下:
// 把物件轉化成k8s所能接受的環境變數格式
func EnvToVars(envMap map[string]string) []v1.EnvVar {
var envVars []v1.EnvVar
for k,v := range envMap {
envVar := v1.EnvVar{
Name: k,Value: v,}
envVars = append(envVars,envVar)
}
return envVars
}
// 獲取當前系統中所有的變數,並轉成map方式
func GetAllEnvToMap() map[string]string {
item := make(map[string]string)
for _,k := range os.Environ() {
splits := strings.Split(k,"=")
item[splits[0]] = splits[1]
}
return item
}
// 合併兩個map,為了更好的效能,使用閉包的方式,這樣sourceMap只需要呼叫一次即可
func MergeMap(sourceMap map[string]string) func(insertMap map[string]string) map[string]string {
return func(insertMap map[string]string) map[string]string {
for k,v := range insertMap {
sourceMap[k] = v
}
return sourceMap
}
}
複製程式碼
然後在使用的時候就是這樣了:
job := jobsSpec{
Prefix: "project-" + "dev" + "-job",Image: "docker image name",Namespace: "default",}
willMergeMap := MergeMap(GetAllEnvToMap())
// dbData是從資料庫裡拿到的資料,格式大致如下
// [ { id: 1,url: 'xxx' },{ id: 2,url: 'yyy' } ]
for _,data := range dbData {
currentEnvMap := willMergeMap(data)
// 建立Job
_,err = api.CreateJob(currentEnvMap)
if err != nil {
panic("create job fail",err.Error())
}
}
複製程式碼
這樣一來,就實現了把當前環境變數及資料通過變數的方式傳給Pod
。這樣的話,只需要保證當前的排程器裡存在一些Pod
可能會用到的變數就行了,如:S3 Token
、DB Host
等。通過這種方式,Pod
基本上什麼都不用關係,它所需要的變數,會由排程器傳給它,分工明確。
優化
其實以上其實就已經完成了最核心的東西,本身也不是特別的難。很簡單的邏輯。只不過光有這些是不夠的,還有很多地方需要考慮。
資源判斷
這裡在說之前有個前提,之前說過這個排程器是不能去更改任何資料的,更改資料只能由Pod
裡的容器去更改。
那麼這個時候就有問題了。
叢集如果資源不夠分配的話,那Pod
將會一直處於Pending
狀態,根據上文,變數已經注入到Pod裡了,而且由於裡面的容器沒有啟動。那就會導致資料沒有更改,而沒有更改的資料,排程器就會一直認為他的新的。導致會為這條資料再啟動一個Job,一直迴圈到當叢集資源足夠後其中的一個Pod去更改了資料。
舉個例子,假設資料庫裡有一個status
的欄位,當值為wating
時,排程器就認為這是一條新資料,會把這個資料轉變成環境變數注入到Pod裡,再由Pod去把waiting
更改成process
。排程器每3分鐘去掃一次資料,所以Pod必須在3分鐘內把資料更改完畢。
而這時由於資源不夠,k8s建立了這個Pod,但是裡面的程式碼沒有執行,導致沒有去更改資料,就會導致排程器一直去為同一條資料建立Pod。
解決方案也比較簡單,只要去判斷下Pod的狀態是否為Pending
,如果是,則不再建立Pod。下面是核心程式碼:
func HavePendingPod() (bool,error) {
// 獲取當前namespace下所有的pod
pods,err := clientset.CoreV1().Pods(Namespace).List(metaV1.ListOptions{})
if err != nil {
return false,err
}
// 迴圈pod,判斷每個pod是否符合當前的字首,如果符合,則說明當前的環境已經存在Pending狀態了
for _,v := range pods.Items {
phase := v.Status.Phase
if phase == "Pending" {
if strings.HasPrefix(v.Name,Prefix) {
return true,nil
}
}
}
return false,nil
}
複製程式碼
當為true
時,就不再建立Job
Job數量最大值
叢集的資源也不是無限的,雖然我們對Pending
情況做了處理,但是這只是一種防禦手段。我們還是要對數量進行一個管控,當Job數量等於某個值時,不在建立Job了。程式碼也很簡單,我這裡就把獲取當前環境下Job數量的程式碼放出來:
// 獲取當前namespace下同環境的job Item例項
func GetJobListByNS() ([]v1.Job,error) {
var jobList,err = clientset.BatchV1().Jobs(Namespace).List(metaV1.ListOptions{})
if err != nil {
return nil,err
}
// 過濾非同字首的Job
var item []v1.Job
for _,v := range jobList.Items {
if strings.HasPrefix(v.Name,Prefix) {
item = append(item,v)
}
}
return item,nil
}
func GetJobLenByNS() (int,error) {
jobItem,err := api.GetJobListByNS()
if err != nil {
return 最大值,err
}
return len(jobItem),nil
}
複製程式碼
刪除已完成和失敗的
上面的程式碼其實是有問題的,k8s的Job
資源型別是有一個特性是,當完成或者失敗的時候,並不會刪除自身,也就說即使他完成了,它的資料還會一直停留在那。所以上面的程式碼會把一些已經完成或者失敗的Job也統計進去。到最後會出現一直無法建立Job的窘迫。
解決方案有兩個,第一個是在宣告Job
資源時,新增spec.ttlSecondsAfterFinished
屬性來做到k8s自動回收完成、失敗的Job。可惜的是這是高版本才有的屬性,我司很遺憾是低版本的。那就只能用第二種方法了,就是在每次獲取數量前,呼叫api把完成、失敗的Job刪除:
func DeleteCompleteJob() error {
jobItem,err := GetJobListByNS()
if err != nil {
return err
}
// 如果不指定此屬性,刪除job時,不會刪除pod
propagationPolicy := metaV1.DeletePropagationForeground
for _,v := range jobItem {
// 只刪除已經結束的job
if v.Status.Failed == 1 || v.Status.Succeeded == 1 {
err := clientset.BatchV1().Jobs(Namespace).Delete(v.Name,&metaV1.DeleteOptions{
PropagationPolicy: &propagationPolicy,})
if err != nil {
return err
}
}
}
return nil
}
複製程式碼
結尾
整個排程器的程式碼比較簡單,沒有必要專門抽成一個庫來做。只要知道大概的思路,就可以根據自己的專案需求做出適合自己專案組的排程器。
感謝我司大佬@qqshfox提供的思路