【kubernetes/k8s原始碼分析】kubelet原始碼分析之啟動容器
阿新 • • 發佈:2018-11-06
主要是呼叫runtime,這裡預設為docker
0. 資料流
NewMainKubelet(cmd/kubelet/app/server.go) ->
NewKubeGenericRuntimeManager(pkg/kubelet/kuberuntime/kuberuntime_manager.go) ->
- syncPod(pkg/kubelet/kubelet.go) ->
- SyncPod(pkg/kubelet/kuberuntime/kuberuntime_manager.go)
1. 資料結構
1.1 介面ContainerManager
管理執行在宿主機上的container,介面定義比較清晰
// Manages the containers running on a machine. type ContainerManager interface { // Runs the container manager's housekeeping. // - Ensures that the Docker daemon is in a container. // - Creates the system container where all non-containerized processes run. Start(*v1.Node, ActivePodsFunc, config.SourcesReady, status.PodStatusProvider, internalapi.RuntimeService) error // SystemCgroupsLimit returns resources allocated to system cgroups in the machine. // These cgroups include the system and Kubernetes services. SystemCgroupsLimit() v1.ResourceList // GetNodeConfig returns a NodeConfig that is being used by the container manager. GetNodeConfig() NodeConfig // Status returns internal Status. Status() Status // NewPodContainerManager is a factory method which returns a podContainerManager object // Returns a noop implementation if qos cgroup hierarchy is not enabled NewPodContainerManager() PodContainerManager // GetMountedSubsystems returns the mounted cgroup subsystems on the node GetMountedSubsystems() *CgroupSubsystems // GetQOSContainersInfo returns the names of top level QoS containers GetQOSContainersInfo() QOSContainersInfo // GetNodeAllocatableReservation returns the amount of compute resources that have to be reserved from scheduling. GetNodeAllocatableReservation() v1.ResourceList // GetCapacity returns the amount of compute resources tracked by container manager available on the node. GetCapacity() v1.ResourceList // GetDevicePluginResourceCapacity returns the node capacity (amount of total device plugin resources), // node allocatable (amount of total healthy resources reported by device plugin), // and inactive device plugin resources previously registered on the node. GetDevicePluginResourceCapacity() (v1.ResourceList, v1.ResourceList, []string) // UpdateQOSCgroups performs housekeeping updates to ensure that the top // level QoS containers have their desired state in a thread-safe way UpdateQOSCgroups() error // GetResources returns RunContainerOptions with devices, mounts, and env fields populated for // extended resources required by container. GetResources(pod *v1.Pod, container *v1.Container) (*kubecontainer.RunContainerOptions, error) // UpdatePluginResources calls Allocate of device plugin handler for potential // requests for device plugin resources, and returns an error if fails. // Otherwise, it updates allocatableResource in nodeInfo if necessary, // to make sure it is at least equal to the pod's requested capacity for // any registered device plugin resource UpdatePluginResources(*schedulercache.NodeInfo, *lifecycle.PodAdmitAttributes) error InternalContainerLifecycle() InternalContainerLifecycle // GetPodCgroupRoot returns the cgroup which contains all pods. GetPodCgroupRoot() string // GetPluginRegistrationHandler returns a plugin registration handler // The pluginwatcher's Handlers allow to have a single module for handling // registration. GetPluginRegistrationHandler() pluginwatcher.PluginHandler }
2. NewMainKubelet函式
runtime初始化,呼叫NewKubeGenericRuntimeManager函式
runtime, err := kuberuntime.NewKubeGenericRuntimeManager( kubecontainer.FilterEventRecorder(kubeDeps.Recorder), klet.livenessManager, seccompProfileRoot, containerRefManager, machineInfo, klet, kubeDeps.OSInterface, klet, httpClient, imageBackOff, kubeCfg.SerializeImagePulls, float32(kubeCfg.RegistryPullQPS), int(kubeCfg.RegistryBurst), kubeCfg.CPUCFSQuota, kubeCfg.CPUCFSQuotaPeriod, runtimeService, imageService, kubeDeps.ContainerManager.InternalContainerLifecycle(), legacyLogProvider, klet.runtimeClassManager, )
2.1 可以看到containerRuntime為runtime,實現了介面
klet.containerRuntime = runtime
klet.streamingRuntime = runtime
klet.runner = runtime
3. syncPod函式
路徑pkg/kubelet/kubelt.go
吧啦吧啦初始化一大堆,最終呼叫SyncPod,根據klet.containerRuntime = runtime,可以得到kubeGenericRuntimeManager實現了
// Call the container runtime's SyncPod callback
result := kl.containerRuntime.SyncPod(pod, apiPodStatus, podStatus, pullSecrets, kl.backOff)
kl.reasonCache.Update(pod.UID, result)
if err := result.Error(); err != nil {
// Do not return error if the only failures were pods in backoff
for _, r := range result.SyncResults {
if r.Error != kubecontainer.ErrCrashLoopBackOff && r.Error != images.ErrImagePullBackOff {
// Do not record an event here, as we keep all event logging for sync pod failures
// local to container runtime so we get better errors
return err
}
}
return nil
}
4. SyncPod函式
路徑pkg/kuberuntime/kuberuntime_manager.go
// SyncPod syncs the running pod into the desired pod by executing following steps:
//
// 1. Compute sandbox and container changes.
// 2. Kill pod sandbox if necessary.
// 3. Kill any containers that should not be running.
// 4. Create sandbox if necessary.
// 5. Create init containers.
// 6. Create normal containers.
4.1 Step 1: Compute sandbox and container changes
確定哪些容器要建立,哪些容器要刪除,需要刪除的放入podContainerChanges.ContainersToKill,需要建立的放入podContainerChanges.ContainersToStart
podContainerChanges := m.computePodActions(pod, podStatus)
glog.V(3).Infof("computePodActions got %+v for pod %q", podContainerChanges, format.Pod(pod))
if podContainerChanges.CreateSandbox {
ref, err := ref.GetReference(legacyscheme.Scheme, pod)
if err != nil {
glog.Errorf("Couldn't make a ref to pod %q: '%v'", format.Pod(pod), err)
}
if podContainerChanges.SandboxID != "" {
m.recorder.Eventf(ref, v1.EventTypeNormal, events.SandboxChanged, "Pod sandbox changed, it will be killed and re-created.")
} else {
glog.V(4).Infof("SyncPod received new pod %q, will create a sandbox for it", format.Pod(pod))
}
}
4.2 Step 2: Kill the pod if the sandbox has changed
當沙箱變化的時候,需要重新建立pod
if podContainerChanges.KillPod {
if !podContainerChanges.CreateSandbox {
glog.V(4).Infof("Stopping PodSandbox for %q because all other containers are dead.", format.Pod(pod))
} else {
glog.V(4).Infof("Stopping PodSandbox for %q, will start new one", format.Pod(pod))
}
killResult := m.killPodWithSyncResult(pod, kubecontainer.ConvertPodStatusToRunningPod(m.runtimeName, podStatus), nil)
result.AddPodSyncResult(killResult)
if killResult.Error() != nil {
glog.Errorf("killPodWithSyncResult failed: %v", killResult.Error())
return
}
if podContainerChanges.CreateSandbox {
m.purgeInitContainers(pod, podStatus)
}
}
4.3 Step 3: kill any running containers in this pod which are not to keep
刪除不需要的pod下的容器
// Step 3: kill any running containers in this pod which are not to keep.
for containerID, containerInfo := range podContainerChanges.ContainersToKill {
glog.V(3).Infof("Killing unwanted container %q(id=%q) for pod %q", containerInfo.name, containerID, format.Pod(pod))
killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, containerInfo.name)
result.AddSyncResult(killContainerResult)
if err := m.killContainer(pod, containerID, containerInfo.name, containerInfo.message, nil); err != nil {
killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error())
glog.Errorf("killContainer %q(id=%q) for pod %q failed: %v", containerInfo.name, containerID, format.Pod(pod), err)
return
}
}
}
4.4 Step 4: Create a sandbox for the pod if necessary
建立sandbox(容器標準),呼叫createPodSandbox,建立pod配置,建立pod log目錄,呼叫m.runtimeService.RunPodSandbox
// Step 4: Create a sandbox for the pod if necessary.
podSandboxID := podContainerChanges.SandboxID
if podContainerChanges.CreateSandbox {
var msg string
var err error
glog.V(4).Infof("Creating sandbox for pod %q", format.Pod(pod))
createSandboxResult := kubecontainer.NewSyncResult(kubecontainer.CreatePodSandbox, format.Pod(pod))
result.AddSyncResult(createSandboxResult)
podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt)
if err != nil {
createSandboxResult.Fail(kubecontainer.ErrCreatePodSandbox, msg)
glog.Errorf("createPodSandbox for pod %q failed: %v", format.Pod(pod), err)
ref, referr := ref.GetReference(legacyscheme.Scheme, pod)
if referr != nil {
glog.Errorf("Couldn't make a ref to pod %q: '%v'", format.Pod(pod), referr)
}
m.recorder.Eventf(ref, v1.EventTypeWarning, events.FailedCreatePodSandBox, "Failed create pod sandbox: %v", err)
return
}
glog.V(4).Infof("Created PodSandbox %q for pod %q", podSandboxID, format.Pod(pod))
podSandboxStatus, err := m.runtimeService.PodSandboxStatus(podSandboxID)
if err != nil {
ref, referr := ref.GetReference(legacyscheme.Scheme, pod)
if referr != nil {
glog.Errorf("Couldn't make a ref to pod %q: '%v'", format.Pod(pod), referr)
}
m.recorder.Eventf(ref, v1.EventTypeWarning, events.FailedStatusPodSandBox, "Unable to get pod sandbox status: %v", err)
glog.Errorf("Failed to get pod sandbox status: %v; Skipping pod %q", err, format.Pod(pod))
result.Fail(err)
return
}
// If we ever allow updating a pod from non-host-network to
// host-network, we may use a stale IP.
if !kubecontainer.IsHostNetworkPod(pod) {
// Overwrite the podIP passed in the pod status, since we just started the pod sandbox.
podIP = m.determinePodSandboxIP(pod.Namespace, pod.Name, podSandboxStatus)
glog.V(4).Infof("Determined the ip %q for pod %q after sandbox changed", podIP, format.Pod(pod))
}
}
4.5 Step 5: start the init container
init是為容器做初始化工作的,
// Step 5: start the init container.
if container := podContainerChanges.NextInitContainerToStart; container != nil {
// Start the next init container.
startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, container.Name)
result.AddSyncResult(startContainerResult)
isInBackOff, msg, err := m.doBackOff(pod, container, podStatus, backOff)
if isInBackOff {
startContainerResult.Fail(err, msg)
glog.V(4).Infof("Backing Off restarting init container %+v in pod %v", container, format.Pod(pod))
return
}
glog.V(4).Infof("Creating init container %+v in pod %v", container, format.Pod(pod))
if msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP, kubecontainer.ContainerTypeInit); err != nil {
startContainerResult.Fail(err, msg)
utilruntime.HandleError(fmt.Errorf("init container start failed: %v: %s", err, msg))
return
}
// Successfully started the container; clear the entry in the failure
glog.V(4).Infof("Completed init container %q for pod %q", container.Name, format.Pod(pod))
}
4.6 Step 6: start containers in podContainerChanges.ContainersToStart
真是真正啟動容器程序,呼叫startContainer第五章節講解
// Step 6: start containers in podContainerChanges.ContainersToStart.
for _, idx := range podContainerChanges.ContainersToStart {
container := &pod.Spec.Containers[idx]
startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, container.Name)
result.AddSyncResult(startContainerResult)
isInBackOff, msg, err := m.doBackOff(pod, container, podStatus, backOff)
if isInBackOff {
startContainerResult.Fail(err, msg)
glog.V(4).Infof("Backing Off restarting container %+v in pod %v", container, format.Pod(pod))
continue
}
glog.V(4).Infof("Creating container %+v in pod %v", container, format.Pod(pod))
if msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP, kubecontainer.ContainerTypeRegular); err != nil {
startContainerResult.Fail(err, msg)
// known errors that are logged in other places are logged at higher levels here to avoid
// repetitive log spam
switch {
case err == images.ErrImagePullBackOff:
glog.V(3).Infof("container start failed: %v: %s", err, msg)
default:
utilruntime.HandleError(fmt.Errorf("container start failed: %v: %s", err, msg))
}
continue
}
}
5. startContainer函式
路徑:pkg/kubelet/kuberuntime/kuberuntime_container.go
5.1 Step 1: pull the image
一看就明白,拉去映象
// Step 1: pull the image.
imageRef, msg, err := m.imagePuller.EnsureImageExists(pod, container, pullSecrets)
if err != nil {
m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", grpc.ErrorDesc(err))
return msg, err
}
5.2 Step 2: create the container
create container,最終呼叫API使用GRPC連線,相當於docker create操作
// Step 2: create the container.
ref, err := kubecontainer.GenerateContainerRef(pod, container)
if err != nil {
glog.Errorf("Can't make a ref to pod %q, container %v: %v", format.Pod(pod), container.Name, err)
}
glog.V(4).Infof("Generating ref for container %s: %#v", container.Name, ref)
// For a new container, the RestartCount should be 0
restartCount := 0
containerStatus := podStatus.FindContainerStatusByName(container.Name)
if containerStatus != nil {
restartCount = containerStatus.RestartCount + 1
}
containerConfig, cleanupAction, err := m.generateContainerConfig(container, pod, restartCount, podIP, imageRef, containerType)
if cleanupAction != nil {
defer cleanupAction()
}
if err != nil {
m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", grpc.ErrorDesc(err))
return grpc.ErrorDesc(err), ErrCreateContainerConfig
}
containerID, err := m.runtimeService.CreateContainer(podSandboxID, containerConfig, podSandboxConfig)
if err != nil {
m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", grpc.ErrorDesc(err))
return grpc.ErrorDesc(err), ErrCreateContainer
}
5.3 Step 3: start the container
相當於docker start這種操作
// Step 3: start the container.
err = m.runtimeService.StartContainer(containerID)
if err != nil {
m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToStartContainer, "Error: %v", grpc.ErrorDesc(err))
return grpc.ErrorDesc(err), kubecontainer.ErrRunContainer
}
m.recordContainerEvent(pod, container, containerID, v1.EventTypeNormal, events.StartedContainer, "Started container")
// Symlink container logs to the legacy container log location for cluster logging
// support.
// TODO(random-liu): Remove this after cluster logging supports CRI container log path.
containerMeta := containerConfig.GetMetadata()
sandboxMeta := podSandboxConfig.GetMetadata()
legacySymlink := legacyLogSymlink(containerID, containerMeta.Name, sandboxMeta.Name,
sandboxMeta.Namespace)
containerLog := filepath.Join(podSandboxConfig.LogDirectory, containerConfig.LogPath)
// only create legacy symlink if containerLog path exists (or the error is not IsNotExist).
// Because if containerLog path does not exist, only dandling legacySymlink is created.
// This dangling legacySymlink is later removed by container gc, so it does not make sense
// to create it in the first place. it happens when journald logging driver is used with docker.
if _, err := m.osInterface.Stat(containerLog); !os.IsNotExist(err) {
if err := m.osInterface.Symlink(containerLog, legacySymlink); err != nil {
glog.Errorf("Failed to create legacy symbolic link %q to container %q log %q: %v",
legacySymlink, containerID, containerLog, err)
}
}