1. 程式人生 > 其它 >kubernetes/k8s CRI分析-kubelet建立pod分析

kubernetes/k8s CRI分析-kubelet建立pod分析

kubernetes CRI分析-k8s CRI分析。kubelet建立pod分析。kubelet呼叫CRI建立pod分析。kubernetes中有3個功能介面,分別是容器網路介面CNI、容器執行時介面CRI和容器儲存介面CSI。本文會對kubelet呼叫CRI建立pod分析。

先來簡單回顧上一篇部落格《kubernetes/k8s CRI 分析-容器執行時介面分析》的內容。

上篇博文先對 CRI 做了介紹,然後對 kubelet CRI 相關原始碼包括 kubelet 元件 CRI 相關啟動引數分析、CRI 相關 interface/struct 分析、CRI 相關初始化分析 3 個部分進行了分析,沒有看的小夥伴,可以點選上面的連結去看一下。

把上一篇部落格分析到的CRI架構圖再貼出來一遍。

本篇博文將對kubelet呼叫CRI建立pod做分析。

kubelet中CRI相關的原始碼分析

kubelet的CRI原始碼分析包括如下幾部分:
(1)kubelet CRI相關啟動引數分析;
(2)kubelet CRI相關interface/struct分析;
(3)kubelet CRI初始化分析;
(4)kubelet呼叫CRI建立pod分析;
(5)kubelet呼叫CRI刪除pod分析。

上篇博文先對前三部分做了分析,本篇博文將對kubelet呼叫CRI建立pod做分析。

基於tag v1.17.4

https://github.com/kubernetes/kubernetes/releases/tag/v1.17.4

4.kubelet呼叫CRI建立pod分析

kubelet CRI建立pod呼叫流程

下面以kubelet dockershim建立pod呼叫流程為例做一下分析。

kubelet通過呼叫dockershim來建立並啟動容器,而dockershim則呼叫docker來建立並啟動容器,並呼叫CNI來構建pod網路。

圖1:kubelet dockershim建立pod呼叫流程圖示

dockershim屬於kubelet內建CRI shim,其餘remote CRI shim的建立pod呼叫流程其實與dockershim呼叫基本一致,只不過是呼叫了不同的容器引擎來操作容器,但一樣由CRI shim呼叫CNI來構建pod網路。

下面開始詳細的原始碼分析。

直接看到kubeGenericRuntimeManagerSyncPod方法,呼叫CRI建立pod的邏輯將在該方法裡觸發發起。

從該方法程式碼也可以看出,kubelet建立一個pod的邏輯為:
(1)先建立並啟動pod sandbox容器,並構建好pod網路;
(2)建立並啟動ephemeral containers;
(3)建立並啟動init containers;
(4)最後建立並啟動normal containers(即普通業務容器)。

這裡對呼叫m.createPodSandbox來建立pod sandbox進行分析,m.startContainer等呼叫分析可以參照該分析自行進行分析,呼叫流程幾乎一致。

// pkg/kubelet/kuberuntime/kuberuntime_manager.go
// SyncPod syncs the running pod into the desired pod by executing following steps:
//
//  1. Compute sandbox and container changes.
//  2. Kill pod sandbox if necessary.
//  3. Kill any containers that should not be running.
//  4. Create sandbox if necessary.
//  5. Create ephemeral containers.
//  6. Create init containers.
//  7. Create normal containers.
func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
	...
	// Step 4: Create a sandbox for the pod if necessary.
	podSandboxID := podContainerChanges.SandboxID
	if podContainerChanges.CreateSandbox {
		var msg string
		var err error

		klog.V(4).Infof("Creating sandbox for pod %q", format.Pod(pod))
		createSandboxResult := kubecontainer.NewSyncResult(kubecontainer.CreatePodSandbox, format.Pod(pod))
		result.AddSyncResult(createSandboxResult)
		podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt)
		...
}

4.1 m.createPodSandbox

m.createPodSandbox方法主要是呼叫m.runtimeService.RunPodSandbox

runtimeService即RemoteRuntimeService,實現了CRI shim客戶端-容器執行時介面RuntimeService interface,持有與CRI shim容器執行時服務端通訊的客戶端。所以呼叫m.runtimeService.RunPodSandbox,實際上等於呼叫了CRI shim服務端的RunPodSandbox方法,來進行pod sandbox的建立。

// pkg/kubelet/kuberuntime/kuberuntime_sandbox.go
// createPodSandbox creates a pod sandbox and returns (podSandBoxID, message, error).
func (m *kubeGenericRuntimeManager) createPodSandbox(pod *v1.Pod, attempt uint32) (string, string, error) {
	podSandboxConfig, err := m.generatePodSandboxConfig(pod, attempt)
	if err != nil {
		message := fmt.Sprintf("GeneratePodSandboxConfig for pod %q failed: %v", format.Pod(pod), err)
		klog.Error(message)
		return "", message, err
	}

	// Create pod logs directory
	err = m.osInterface.MkdirAll(podSandboxConfig.LogDirectory, 0755)
	if err != nil {
		message := fmt.Sprintf("Create pod log directory for pod %q failed: %v", format.Pod(pod), err)
		klog.Errorf(message)
		return "", message, err
	}

	runtimeHandler := ""
	if utilfeature.DefaultFeatureGate.Enabled(features.RuntimeClass) && m.runtimeClassManager != nil {
		runtimeHandler, err = m.runtimeClassManager.LookupRuntimeHandler(pod.Spec.RuntimeClassName)
		if err != nil {
			message := fmt.Sprintf("CreatePodSandbox for pod %q failed: %v", format.Pod(pod), err)
			return "", message, err
		}
		if runtimeHandler != "" {
			klog.V(2).Infof("Running pod %s with RuntimeHandler %q", format.Pod(pod), runtimeHandler)
		}
	}

	podSandBoxID, err := m.runtimeService.RunPodSandbox(podSandboxConfig, runtimeHandler)
	if err != nil {
		message := fmt.Sprintf("CreatePodSandbox for pod %q failed: %v", format.Pod(pod), err)
		klog.Error(message)
		return "", message, err
	}

	return podSandBoxID, "", nil
}
m.runtimeService.RunPodSandbox

m.runtimeService.RunPodSandbox方法,會呼叫r.runtimeClient.RunPodSandbox,即利用CRI shim客戶端,呼叫CRI shim服務端來進行pod sandbox的建立。

分析到這裡,kubelet中的CRI相關呼叫就分析完畢了,接下來將會進入到CRI shim(以kubelet內建CRI shim-dockershim為例)裡進行建立pod sandbox的分析。

// pkg/kubelet/remote/remote_runtime.go
// RunPodSandbox creates and starts a pod-level sandbox. Runtimes should ensure
// the sandbox is in ready state.
func (r *RemoteRuntimeService) RunPodSandbox(config *runtimeapi.PodSandboxConfig, runtimeHandler string) (string, error) {
	// Use 2 times longer timeout for sandbox operation (4 mins by default)
	// TODO: Make the pod sandbox timeout configurable.
	ctx, cancel := getContextWithTimeout(r.timeout * 2)
	defer cancel()

	resp, err := r.runtimeClient.RunPodSandbox(ctx, &runtimeapi.RunPodSandboxRequest{
		Config:         config,
		RuntimeHandler: runtimeHandler,
	})
	if err != nil {
		klog.Errorf("RunPodSandbox from runtime service failed: %v", err)
		return "", err
	}

	if resp.PodSandboxId == "" {
		errorMessage := fmt.Sprintf("PodSandboxId is not set for sandbox %q", config.GetMetadata())
		klog.Errorf("RunPodSandbox failed: %s", errorMessage)
		return "", errors.New(errorMessage)
	}

	return resp.PodSandboxId, nil
}

4.2 r.runtimeClient.RunPodSandbox

接下來將會以dockershim為例,進入到CRI shim來進行建立pod sandbox的分析。

前面kubelet呼叫r.runtimeClient.RunPodSandbox,會進入到dockershim下面的RunPodSandbox方法。

建立pod sandbox主要有5個步驟:
(1)呼叫docker,拉取pod sandbox的映象;
(2)呼叫docker,建立pod sandbox容器;
(3)建立pod sandbox的Checkpoint;
(4)呼叫docker,啟動pod sandbox容器;
(5)呼叫CNI,給pod sandbox構建網路。

// pkg/kubelet/dockershim/docker_sandbox.go
// RunPodSandbox creates and starts a pod-level sandbox. Runtimes should ensure
// the sandbox is in ready state.
// For docker, PodSandbox is implemented by a container holding the network
// namespace for the pod.
// Note: docker doesn't use LogDirectory (yet).
func (ds *dockerService) RunPodSandbox(ctx context.Context, r *runtimeapi.RunPodSandboxRequest) (*runtimeapi.RunPodSandboxResponse, error) {
	config := r.GetConfig()

	// Step 1: Pull the image for the sandbox.
	image := defaultSandboxImage
	podSandboxImage := ds.podSandboxImage
	if len(podSandboxImage) != 0 {
		image = podSandboxImage
	}

	// NOTE: To use a custom sandbox image in a private repository, users need to configure the nodes with credentials properly.
	// see: http://kubernetes.io/docs/user-guide/images/#configuring-nodes-to-authenticate-to-a-private-repository
	// Only pull sandbox image when it's not present - v1.PullIfNotPresent.
	if err := ensureSandboxImageExists(ds.client, image); err != nil {
		return nil, err
	}

	// Step 2: Create the sandbox container.
	if r.GetRuntimeHandler() != "" && r.GetRuntimeHandler() != runtimeName {
		return nil, fmt.Errorf("RuntimeHandler %q not supported", r.GetRuntimeHandler())
	}
	createConfig, err := ds.makeSandboxDockerConfig(config, image)
	if err != nil {
		return nil, fmt.Errorf("failed to make sandbox docker config for pod %q: %v", config.Metadata.Name, err)
	}
	createResp, err := ds.client.CreateContainer(*createConfig)
	if err != nil {
		createResp, err = recoverFromCreationConflictIfNeeded(ds.client, *createConfig, err)
	}

	if err != nil || createResp == nil {
		return nil, fmt.Errorf("failed to create a sandbox for pod %q: %v", config.Metadata.Name, err)
	}
	resp := &runtimeapi.RunPodSandboxResponse{PodSandboxId: createResp.ID}

	ds.setNetworkReady(createResp.ID, false)
	defer func(e *error) {
		// Set networking ready depending on the error return of
		// the parent function
		if *e == nil {
			ds.setNetworkReady(createResp.ID, true)
		}
	}(&err)

	// Step 3: Create Sandbox Checkpoint.
	if err = ds.checkpointManager.CreateCheckpoint(createResp.ID, constructPodSandboxCheckpoint(config)); err != nil {
		return nil, err
	}

	// Step 4: Start the sandbox container.
	// Assume kubelet's garbage collector would remove the sandbox later, if
	// startContainer failed.
	err = ds.client.StartContainer(createResp.ID)
	if err != nil {
		return nil, fmt.Errorf("failed to start sandbox container for pod %q: %v", config.Metadata.Name, err)
	}

	// Rewrite resolv.conf file generated by docker.
	// NOTE: cluster dns settings aren't passed anymore to docker api in all cases,
	// not only for pods with host network: the resolver conf will be overwritten
	// after sandbox creation to override docker's behaviour. This resolv.conf
	// file is shared by all containers of the same pod, and needs to be modified
	// only once per pod.
	if dnsConfig := config.GetDnsConfig(); dnsConfig != nil {
		containerInfo, err := ds.client.InspectContainer(createResp.ID)
		if err != nil {
			return nil, fmt.Errorf("failed to inspect sandbox container for pod %q: %v", config.Metadata.Name, err)
		}

		if err := rewriteResolvFile(containerInfo.ResolvConfPath, dnsConfig.Servers, dnsConfig.Searches, dnsConfig.Options); err != nil {
			return nil, fmt.Errorf("rewrite resolv.conf failed for pod %q: %v", config.Metadata.Name, err)
		}
	}

	// Do not invoke network plugins if in hostNetwork mode.
	if config.GetLinux().GetSecurityContext().GetNamespaceOptions().GetNetwork() == runtimeapi.NamespaceMode_NODE {
		return resp, nil
	}

	// Step 5: Setup networking for the sandbox.
	// All pod networking is setup by a CNI plugin discovered at startup time.
	// This plugin assigns the pod ip, sets up routes inside the sandbox,
	// creates interfaces etc. In theory, its jurisdiction ends with pod
	// sandbox networking, but it might insert iptables rules or open ports
	// on the host as well, to satisfy parts of the pod spec that aren't
	// recognized by the CNI standard yet.
	cID := kubecontainer.BuildContainerID(runtimeName, createResp.ID)
	networkOptions := make(map[string]string)
	if dnsConfig := config.GetDnsConfig(); dnsConfig != nil {
		// Build DNS options.
		dnsOption, err := json.Marshal(dnsConfig)
		if err != nil {
			return nil, fmt.Errorf("failed to marshal dns config for pod %q: %v", config.Metadata.Name, err)
		}
		networkOptions["dns"] = string(dnsOption)
	}
	err = ds.network.SetUpPod(config.GetMetadata().Namespace, config.GetMetadata().Name, cID, config.Annotations, networkOptions)
	if err != nil {
		errList := []error{fmt.Errorf("failed to set up sandbox container %q network for pod %q: %v", createResp.ID, config.Metadata.Name, err)}

		// Ensure network resources are cleaned up even if the plugin
		// succeeded but an error happened between that success and here.
		err = ds.network.TearDownPod(config.GetMetadata().Namespace, config.GetMetadata().Name, cID)
		if err != nil {
			errList = append(errList, fmt.Errorf("failed to clean up sandbox container %q network for pod %q: %v", createResp.ID, config.Metadata.Name, err))
		}

		err = ds.client.StopContainer(createResp.ID, defaultSandboxGracePeriod)
		if err != nil {
			errList = append(errList, fmt.Errorf("failed to stop sandbox container %q for pod %q: %v", createResp.ID, config.Metadata.Name, err))
		}

		return resp, utilerrors.NewAggregate(errList)
	}

	return resp, nil
}

接下來以ds.client.CreateContainer呼叫為例,分析下dockershim是如何呼叫docker的。

ds.client.CreateContainer

主要是呼叫d.client.ContainerCreate

// pkg/kubelet/dockershim/libdocker/kube_docker_client.go
func (d *kubeDockerClient) CreateContainer(opts dockertypes.ContainerCreateConfig) (*dockercontainer.ContainerCreateCreatedBody, error) {
	ctx, cancel := d.getTimeoutContext()
	defer cancel()
	// we provide an explicit default shm size as to not depend on docker daemon.
	// TODO: evaluate exposing this as a knob in the API
	if opts.HostConfig != nil && opts.HostConfig.ShmSize <= 0 {
		opts.HostConfig.ShmSize = defaultShmSize
	}
	createResp, err := d.client.ContainerCreate(ctx, opts.Config, opts.HostConfig, opts.NetworkingConfig, opts.Name)
	if ctxErr := contextError(ctx); ctxErr != nil {
		return nil, ctxErr
	}
	if err != nil {
		return nil, err
	}
	return &createResp, nil
}
d.client.ContainerCreate

構建請求引數,向docker指定的url傳送http請求,建立pod sandbox容器。

// vendor/github.com/docker/docker/client/container_create.go
// ContainerCreate creates a new container based in the given configuration.
// It can be associated with a name, but it's not mandatory.
func (cli *Client) ContainerCreate(ctx context.Context, config *container.Config, hostConfig *container.HostConfig, networkingConfig *network.NetworkingConfig, containerName string) (container.ContainerCreateCreatedBody, error) {
	var response container.ContainerCreateCreatedBody

	if err := cli.NewVersionError("1.25", "stop timeout"); config != nil && config.StopTimeout != nil && err != nil {
		return response, err
	}

	// When using API 1.24 and under, the client is responsible for removing the container
	if hostConfig != nil && versions.LessThan(cli.ClientVersion(), "1.25") {
		hostConfig.AutoRemove = false
	}

	query := url.Values{}
	if containerName != "" {
		query.Set("name", containerName)
	}

	body := configWrapper{
		Config:           config,
		HostConfig:       hostConfig,
		NetworkingConfig: networkingConfig,
	}

	serverResp, err := cli.post(ctx, "/containers/create", query, body, nil)
	defer ensureReaderClosed(serverResp)
	if err != nil {
		return response, err
	}

	err = json.NewDecoder(serverResp.body).Decode(&response)
	return response, err
}
// vendor/github.com/docker/docker/client/request.go
// post sends an http request to the docker API using the method POST with a specific Go context.
func (cli *Client) post(ctx context.Context, path string, query url.Values, obj interface{}, headers map[string][]string) (serverResponse, error) {
	body, headers, err := encodeBody(obj, headers)
	if err != nil {
		return serverResponse{}, err
	}
	return cli.sendRequest(ctx, "POST", path, query, body, headers)
}

總結

CRI架構圖

在 CRI 之下,包括兩種型別的容器執行時的實現:
(1)kubelet內建的 dockershim,實現了 Docker 容器引擎的支援以及 CNI 網路外掛(包括 kubenet)的支援。dockershim程式碼內置於kubelet,被kubelet呼叫,讓dockershim起獨立的server來建立CRI shim,向kubelet暴露grpc server;
(2)外部的容器執行時,用來支援 rktcontainerd 等容器引擎的外部容器執行時。

kubelet呼叫CRI建立pod流程分析

kubelet建立一個pod的邏輯為:
(1)先建立並啟動pod sandbox容器,並構建好pod網路;
(2)建立並啟動ephemeral containers;
(3)建立並啟動init containers;
(4)最後建立並啟動normal containers(即普通業務容器)。

kubelet CRI建立pod呼叫流程

下面以kubelet dockershim建立pod呼叫流程為例做一下分析。

kubelet通過呼叫dockershim來建立並啟動容器,而dockershim則呼叫docker來建立並啟動容器,並呼叫CNI來構建pod網路。

圖1:kubelet dockershim建立pod呼叫流程圖示

dockershim屬於kubelet內建CRI shim,其餘remote CRI shim的建立pod呼叫流程其實與dockershim呼叫基本一致,只不過是呼叫了不同的容器引擎來操作容器,但一樣由CRI shim呼叫CNI來構建pod網路。

本篇博文將對kubelet呼叫CRI建立pod做了分析,下一篇部落格將對kubelet中CRI相關的原始碼分析最後一個部分進行分析,也就是kubelet呼叫CRI刪除pod分析,敬請期待。

關聯部落格:《kubernetes/k8s CSI分析-容器儲存介面分析》

《kubernetes/k8s CRI 分析-容器執行時介面分析》