1. 程式人生 > >k8s 與 grpc

k8s 與 grpc

gRPC是一個高效能、通用的開源RPC框架,其由Google主要面向移動應用開發並基於HTTP/2協議標準而設計,基於ProtoBuf(Protocol Buffers)序列化協議開發,且支援眾多開發語言。 https://github.com/grpc/grpc-go/tree/master/examples/helloworld 裡面是個簡單的教程指導GO語言如何使用gRPC 其中 helloworld.proto 是用來定義ProtoBuf 訊息格式,helloworld.pb.go 是protoc生成的GO語言形式的訊息格式,生成方法是protoc -I helloworld/ helloworld/helloworld.proto --go_out=plugins=grpc:helloworld 。

這個例子服務端關鍵的程式碼如下

 s := grpc.NewServer()
	pb.RegisterGreeterServer(s, &server{})//註冊服務
	// Register reflection service on gRPC server.
	reflection.Register(s)
	if err := s.Serve(lis); err != nil {            //啟動服務
		log.Fatalf("failed to serve: %v", err)
	}

k8s 1.5以後開始,kubelet通過 Unix sockets
使用grpc框架 呼叫CRI 容器 runtim

pkg/kubelet/apis/cri/v1alpha1/runtime/api.proto 中定義了訊息格式,分runtime服務和映象服務。

service RuntimeService {
    rpc Version(VersionRequest) returns (VersionResponse) {}
    rpc RunPodSandbox(RunPodSandboxRequest) returns (RunPodSandboxResponse) {}
    rpc StopPodSandbox(StopPodSandboxRequest) returns (StopPodSandboxResponse) {}
    rpc RemovePodSandbox(RemovePodSandboxRequest) returns (RemovePodSandboxResponse) {}
    rpc PodSandboxStatus(PodSandboxStatusRequest) returns (PodSandboxStatusResponse) {}
    rpc ListPodSandbox(ListPodSandboxRequest) returns (ListPodSandboxResponse) {}
    rpc CreateContainer(CreateContainerRequest) returns (CreateContainerResponse) {}
    rpc StartContainer(StartContainerRequest) returns (StartContainerResponse) {}
    rpc StopContainer(StopContainerRequest) returns (StopContainerResponse) {}
    rpc RemoveContainer(RemoveContainerRequest) returns (RemoveContainerResponse) {}
    rpc ListContainers(ListContainersRequest) returns (ListContainersResponse) {}
    rpc ContainerStatus(ContainerStatusRequest) returns (ContainerStatusResponse) {}
    rpc UpdateContainerResources(UpdateContainerResourcesRequest) returns (UpdateContainerResourcesResponse) {}
    rpc ExecSync(ExecSyncRequest) returns (ExecSyncResponse) {}
    rpc Exec(ExecRequest) returns (ExecResponse) {}
    rpc Attach(AttachRequest) returns (AttachResponse) {}
    rpc PortForward(PortForwardRequest) returns (PortForwardResponse) {}
    rpc ContainerStats(ContainerStatsRequest) returns (ContainerStatsResponse) {}
    rpc ListContainerStats(ListContainerStatsRequest) returns (ListContainerStatsResponse) {}
    rpc UpdateRuntimeConfig(UpdateRuntimeConfigRequest) returns (UpdateRuntimeConfigResponse) {}
    rpc Status(StatusRequest) returns (StatusResponse) {}
}

service ImageService {
    rpc ListImages(ListImagesRequest) returns (ListImagesResponse) {}
    rpc ImageStatus(ImageStatusRequest) returns (ImageStatusResponse) {}
    rpc PullImage(PullImageRequest) returns (PullImageResponse) {}
    rpc RemoveImage(RemoveImageRequest) returns (RemoveImageResponse) {}
    rpc ImageFsInfo(ImageFsInfoRequest) returns (ImageFsInfoResponse) {}
}

他的服務端啟動如下 pkg/kubelet/dockershim/remote/docker_server.go
// NewDockerServer creates the dockershim grpc server.
func NewDockerServer(endpoint string, s dockershim.DockerService) *DockerServer {
	return &DockerServer{
		endpoint: endpoint,
		service:  NewDockerService(s),
	}
}
// Start starts the dockershim grpc server.
func (s *DockerServer) Start() error {
	glog.V(2).Infof("Start dockershim grpc server")
	l, err := util.CreateListener(s.endpoint)
	if err != nil {
		return fmt.Errorf("failed to listen on %q: %v", s.endpoint, err)
	}
	// Create the grpc server and register runtime and image services.
	s.server = grpc.NewServer()
	runtimeapi.RegisterRuntimeServiceServer(s.server, s.service) //將介面註冊到GRPC服務
	runtimeapi.RegisterImageServiceServer(s.server, s.service)
	go func() {
		// Use interrupt handler to make sure the server to be stopped properly.
		h := interrupt.New(nil, s.Stop)
		err := h.Run(func() error { return s.server.Serve(l) })     //啟動服務
		if err != nil {
			glog.Errorf("Failed to serve connections: %v", err)
		}
	}()
	return nil
}
而DockerServer起動如下 pkg/kubelet/kubelet.go:645
// The unix socket for kubelet <-> dockershim communication.
			glog.V(5).Infof("RemoteRuntimeEndpoint: %q, RemoteImageEndpoint: %q",
				remoteRuntimeEndpoint,
				remoteImageEndpoint)
			glog.V(2).Infof("Starting the GRPC server for the docker CRI shim.")
			server := dockerremote.NewDockerServer(remoteRuntimeEndpoint, ds)
			if err := server.Start(); err != nil {
				return nil, err
			}
客戶端方面 客戶端呼叫,比如建立POD 首先初始化客戶端 pkg/kubelet/remote/remote_runtime.go
func NewRemoteRuntimeService(endpoint string, connectionTimeout time.Duration) (internalapi.RuntimeService, error) {
	glog.Infof("Connecting to runtime service %s", endpoint)
	addr, dailer, err := util.GetAddressAndDialer(endpoint)
	if err != nil {
		return nil, err
	}
	conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithTimeout(connectionTimeout), grpc.WithDialer(dailer))   //grpc連線
	if err != nil {
		glog.Errorf("Connect remote runtime %s failed: %v", addr, err)
		return nil, err
	}

	return &RemoteRuntimeService{
		timeout:       connectionTimeout,
		runtimeClient: runtimeapi.NewRuntimeServiceClient(conn),
	}, nil
}
建立POD pkg/kubelet/kuberuntime/kuberuntime_sandbox.go
func (m *kubeGenericRuntimeManager) createPodSandbox(pod *v1.Pod, attempt uint32) (string, string, error) {
	podSandboxConfig, err := m.generatePodSandboxConfig(pod, attempt)
	if err != nil {
		message := fmt.Sprintf("GeneratePodSandboxConfig for pod %q failed: %v", format.Pod(pod), err)
		glog.Error(message)
		return "", message, err
	}

	// Create pod logs directory
	err = m.osInterface.MkdirAll(podSandboxConfig.LogDirectory, 0755)
	if err != nil {
		message := fmt.Sprintf("Create pod log directory for pod %q failed: %v", format.Pod(pod), err)
		glog.Errorf(message)
		return "", message, err
	}

	podSandBoxID, err := m.runtimeService.RunPodSandbox(podSandboxConfig)//runtimeService是kubeGenericRuntimeManager建立時候就初始化的
	if err != nil {
		message := fmt.Sprintf("CreatePodSandbox for pod %q failed: %v", format.Pod(pod), err)
		glog.Error(message)
		return "", message, err
	}

	return podSandBoxID, "", nil
}