1. 程式人生 > >pod刪除主要流程原始碼解析

pod刪除主要流程原始碼解析

本文以v1.12版本進行分析

當一個pod刪除時,client端向apiserver傳送請求,apiserver將pod的deletionTimestamp打上時間。kubelet watch到該事件,開始處理。

syncLoop

kubelet對pod的處理主要都是在syncLoop中處理的。

func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
for {
...
        if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
            break
        }
...

與pod刪除主要在syncLoopIteration中需要關注的是以下這兩個。

func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
    syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
    select {
    case u, open := <-configCh:
...
        switch u.Op {
...
        case kubetypes.UPDATE:
            handler.HandlePodUpdates(u.Pods)
...
    case <-housekeepingCh:
        if !kl.sourcesReady.AllReady() {
        } else {
            if err := handler.HandlePodCleanups(); err != nil {
                glog.Errorf("Failed cleaning pods: %v", err)
            }
        }
    }

第一個是由於傳送給apiserver的DELETE請求觸發的,增加了deletionTimestamp的事件。這裡對應於kubetypes.UPDATE。也就是會走到HandlePodUpdates函式。

另外一個與delete相關的是每2s執行一次的來自於housekeepingCh的定時事件,用於清理pod,執行的是handler.HandlePodCleanups函式。這兩個作用不同,下面分別進行介紹。

HandlePodUpdates

先看HandlePodUpdates這個流程。只要打上了deletionTimestamp,就必然走到這個流程裡去。

func (kl *Kubelet) HandlePodUpdates(pods []*v1.Pod) {
    for _, pod := range pods {
...
        kl.dispatchWork(pod, kubetypes.SyncPodUpdate, mirrorPod, start)
    }
}

在HandlePodUpdates中,進而將pod的資訊傳遞到dispatchWork中處理。

func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
    if kl.podIsTerminated(pod) {
        if pod.DeletionTimestamp != nil {
            kl.statusManager.TerminatePod(pod)
        }
        return
    }
    // Run the sync in an async worker.
    kl.podWorkers.UpdatePod(&UpdatePodOptions{
        Pod:        pod,
        MirrorPod:  mirrorPod,
        UpdateType: syncType,
        OnCompleteFunc: func(err error) {
...

這裡首先通過判斷了kl.podIsTerminated(pod)判斷pod是不是已經處於了Terminated狀態。如果是的話,則不進行下面的kl.podWorkers.UpdatePod。

func (kl *Kubelet) podIsTerminated(pod *v1.Pod) bool {
    status, ok := kl.statusManager.GetPodStatus(pod.UID)
    if !ok {
        status = pod.Status
    }
    return status.Phase == v1.PodFailed || status.Phase == v1.PodSucceeded || (pod.DeletionTimestamp != nil && notRunning(status.ContainerStatuses))
}

這個地方特別值得注意的是,並不是由了DeletionTimestamp就會認為是Terminated狀態,而是有DeletionTimestamp且所有的容器不在運行了。也就是說如果是一個正在正常執行的pod,是也會走到kl.podWorkers.UpdatePod中的。UpdatePod通過一系列函式呼叫,最終會通過非同步的方式執行syncPod函式中進入到syncPod函式中。

func (kl *Kubelet) syncPod(o syncPodOptions) error {
...
    if !runnable.Admit || pod.DeletionTimestamp != nil || apiPodStatus.Phase == v1.PodFailed {
        var syncErr error
        if err := kl.killPod(pod, nil, podStatus, nil); err != nil {
...

在syncPod中,呼叫killPod從而對pod進行停止操作。

killPod

killPod是停止pod的主體。在很多地方都會使用。這裡主要介紹下起主要的工作流程。停止pod的過程主要發生在killPodWithSyncResult函式中。

func (m *kubeGenericRuntimeManager) killPodWithSyncResult(pod *v1.Pod, runningPod kubecontainer.Pod, gracePeriodOverride *int64) (result kubecontainer.PodSyncResult) {
    killContainerResults := m.killContainersWithSyncResult(pod, runningPod, gracePeriodOverride)
...
    for _, podSandbox := range runningPod.Sandboxes {
            if err := m.runtimeService.StopPodSandbox(podSandbox.ID.ID); err != nil {
...

killPodWithSyncResult的主要工作分為兩個部分。killContainersWithSyncResult負責將pod中的container停止掉,在停止後再執行StopPodSandbox。

func (m *kubeGenericRuntimeManager) killContainer(pod *v1.Pod, containerID kubecontainer.ContainerID, containerName string, reason string, gracePeriodOverride *int64) error {
    if err := m.internalLifecycle.PreStopContainer(containerID.ID); err != nil {
        return err
    }
...
    err := m.runtimeService.StopContainer(containerID.ID, gracePeriod)

killContainersWithSyncResult的主要工作是在killContainer中完成的,這裡可以看到,其中的主要兩個步驟是在容器中進行prestop的操作。待其成功後,進行container的stop工作。至此所有的應用容器都已經停止了。下一步是停止pause容器。而StopPodSandbox就是執行這一過程的。將sandbox,也就是pause容器停止掉。StopPodSandbox是在dockershim中執行的。

func (ds *dockerService) StopPodSandbox(ctx context.Context, r *runtimeapi.StopPodSandboxRequest) (*runtimeapi.StopPodSandboxResponse, error) {
...
if !hostNetwork && (ready || !ok) {
...
        err := ds.network.TearDownPod(namespace, name, cID, annotations)
...
    }
    if err := ds.client.StopContainer(podSandboxID, defaultSandboxGracePeriod); err != nil {

StopPodSandbox中主要的部分是先進行網路解除安裝,再停止相應的容器。在完成StopPodSandbox後,至此pod的所有容器都已經停止完成。至於volume的解除安裝,是在volumeManager中進行的。本文不做單獨介紹了。停止後的容器在pod徹底清理後,會被gc回收。這裡也不展開講了。

HandlePodCleanups

上面這個流程並不是刪除流程的全部。一個典型的情況就是,如果container都不是running,那麼在UpdatePod的時候都return了,那麼又由誰來處理呢?這裡我們回到最開始,就是那個每2s執行一次的HandlePodCleanups的流程。也就是說比如container處於crash,container正好不是running等情況,其實是在這個流程裡進行處理的。當然HandlePodCleanups的作用不僅僅是清理not running的pod,再比如資料已經在apiserver中強制清理掉了,或者由於其他原因這個節點上還有一些沒有完成清理的pod,都是在這個流程中進行處理。

func (kl *Kubelet) HandlePodCleanups() error {
... 
    for _, pod := range runningPods {
        if _, found := desiredPods[pod.ID]; !found {
            kl.podKillingCh <- &kubecontainer.PodPair{APIPod: nil, RunningPod: pod}
        }
    }

runningPods是從cache中獲取節點現有的pod,而desiredPods則是節點上應該存在未被停止的pod。如果存在runningPods中有而desiredPods中沒有的pod,那麼它應該被停止,所以傳送到podKillingCh中。

func (kl *Kubelet) podKiller() {
...
    for podPair := range kl.podKillingCh {
...

        if !exists {
            go func(apiPod *v1.Pod, runningPod *kubecontainer.Pod) {
                glog.V(2).Infof("Killing unwanted pod %q", runningPod.Name)
                err := kl.killPod(apiPod, runningPod, nil, nil)
...
            }(apiPod, runningPod)
        }
    }
}

在podKiller流程中,會去接收來自podKillingCh的訊息,從而執行killPod,上文已經做了該函式的介紹了。

statusManager

在最後,statusManager中的syncPod流程,將會進行檢測,通過canBeDeleted確認是否所有的容器關閉了,volume解除安裝了,cgroup清理了等等。如果這些全部完成了,則從apiserver中將pod資訊徹底刪除。

func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
...
    if m.canBeDeleted(pod, status.status) {
        deleteOptions := metav1.NewDeleteOptions(0)
        deleteOptions.Preconditions = metav1.NewUIDPreconditions(string(pod.UID))
        err = m.kubeClient.CoreV1().Pods(pod.Namespace).Delete(pod.Name, deleteOptions)
...