k8s 與 grpc
阿新 • • 發佈:2018-11-22
gRPC是一個高效能、通用的開源RPC框架,其由Google主要面向移動應用開發並基於HTTP/2協議標準而設計,基於ProtoBuf(Protocol Buffers)序列化協議開發,且支援眾多開發語言。
https://github.com/grpc/grpc-go/tree/master/examples/helloworld 裡面是個簡單的教程指導GO語言如何使用gRPC
其中 helloworld.proto 是用來定義ProtoBuf 訊息格式,helloworld.pb.go 是protoc生成的GO語言形式的訊息格式,生成方法是protoc -I helloworld/ helloworld/helloworld.proto --go_out=plugins=grpc:helloworld 。
這個例子服務端關鍵的程式碼如下
s := grpc.NewServer()
pb.RegisterGreeterServer(s, &server{})//註冊服務
// Register reflection service on gRPC server.
reflection.Register(s)
if err := s.Serve(lis); err != nil { //啟動服務
log.Fatalf("failed to serve: %v", err)
}
k8s 1.5以後開始,kubelet通過 Unix sockets
pkg/kubelet/apis/cri/v1alpha1/runtime/api.proto 中定義了訊息格式,分runtime服務和映象服務。
service RuntimeService { rpc Version(VersionRequest) returns (VersionResponse) {} rpc RunPodSandbox(RunPodSandboxRequest) returns (RunPodSandboxResponse) {} rpc StopPodSandbox(StopPodSandboxRequest) returns (StopPodSandboxResponse) {} rpc RemovePodSandbox(RemovePodSandboxRequest) returns (RemovePodSandboxResponse) {} rpc PodSandboxStatus(PodSandboxStatusRequest) returns (PodSandboxStatusResponse) {} rpc ListPodSandbox(ListPodSandboxRequest) returns (ListPodSandboxResponse) {} rpc CreateContainer(CreateContainerRequest) returns (CreateContainerResponse) {} rpc StartContainer(StartContainerRequest) returns (StartContainerResponse) {} rpc StopContainer(StopContainerRequest) returns (StopContainerResponse) {} rpc RemoveContainer(RemoveContainerRequest) returns (RemoveContainerResponse) {} rpc ListContainers(ListContainersRequest) returns (ListContainersResponse) {} rpc ContainerStatus(ContainerStatusRequest) returns (ContainerStatusResponse) {} rpc UpdateContainerResources(UpdateContainerResourcesRequest) returns (UpdateContainerResourcesResponse) {} rpc ExecSync(ExecSyncRequest) returns (ExecSyncResponse) {} rpc Exec(ExecRequest) returns (ExecResponse) {} rpc Attach(AttachRequest) returns (AttachResponse) {} rpc PortForward(PortForwardRequest) returns (PortForwardResponse) {} rpc ContainerStats(ContainerStatsRequest) returns (ContainerStatsResponse) {} rpc ListContainerStats(ListContainerStatsRequest) returns (ListContainerStatsResponse) {} rpc UpdateRuntimeConfig(UpdateRuntimeConfigRequest) returns (UpdateRuntimeConfigResponse) {} rpc Status(StatusRequest) returns (StatusResponse) {} } service ImageService { rpc ListImages(ListImagesRequest) returns (ListImagesResponse) {} rpc ImageStatus(ImageStatusRequest) returns (ImageStatusResponse) {} rpc PullImage(PullImageRequest) returns (PullImageResponse) {} rpc RemoveImage(RemoveImageRequest) returns (RemoveImageResponse) {} rpc ImageFsInfo(ImageFsInfoRequest) returns (ImageFsInfoResponse) {} }
他的服務端啟動如下 pkg/kubelet/dockershim/remote/docker_server.go
// NewDockerServer creates the dockershim grpc server.
func NewDockerServer(endpoint string, s dockershim.DockerService) *DockerServer {
return &DockerServer{
endpoint: endpoint,
service: NewDockerService(s),
}
}
// Start starts the dockershim grpc server.
func (s *DockerServer) Start() error {
glog.V(2).Infof("Start dockershim grpc server")
l, err := util.CreateListener(s.endpoint)
if err != nil {
return fmt.Errorf("failed to listen on %q: %v", s.endpoint, err)
}
// Create the grpc server and register runtime and image services.
s.server = grpc.NewServer()
runtimeapi.RegisterRuntimeServiceServer(s.server, s.service) //將介面註冊到GRPC服務
runtimeapi.RegisterImageServiceServer(s.server, s.service)
go func() {
// Use interrupt handler to make sure the server to be stopped properly.
h := interrupt.New(nil, s.Stop)
err := h.Run(func() error { return s.server.Serve(l) }) //啟動服務
if err != nil {
glog.Errorf("Failed to serve connections: %v", err)
}
}()
return nil
}
而DockerServer起動如下
pkg/kubelet/kubelet.go:645
// The unix socket for kubelet <-> dockershim communication.
glog.V(5).Infof("RemoteRuntimeEndpoint: %q, RemoteImageEndpoint: %q",
remoteRuntimeEndpoint,
remoteImageEndpoint)
glog.V(2).Infof("Starting the GRPC server for the docker CRI shim.")
server := dockerremote.NewDockerServer(remoteRuntimeEndpoint, ds)
if err := server.Start(); err != nil {
return nil, err
}
客戶端方面
客戶端呼叫,比如建立POD
首先初始化客戶端
pkg/kubelet/remote/remote_runtime.go
func NewRemoteRuntimeService(endpoint string, connectionTimeout time.Duration) (internalapi.RuntimeService, error) {
glog.Infof("Connecting to runtime service %s", endpoint)
addr, dailer, err := util.GetAddressAndDialer(endpoint)
if err != nil {
return nil, err
}
conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithTimeout(connectionTimeout), grpc.WithDialer(dailer)) //grpc連線
if err != nil {
glog.Errorf("Connect remote runtime %s failed: %v", addr, err)
return nil, err
}
return &RemoteRuntimeService{
timeout: connectionTimeout,
runtimeClient: runtimeapi.NewRuntimeServiceClient(conn),
}, nil
}
建立POD
pkg/kubelet/kuberuntime/kuberuntime_sandbox.go
func (m *kubeGenericRuntimeManager) createPodSandbox(pod *v1.Pod, attempt uint32) (string, string, error) {
podSandboxConfig, err := m.generatePodSandboxConfig(pod, attempt)
if err != nil {
message := fmt.Sprintf("GeneratePodSandboxConfig for pod %q failed: %v", format.Pod(pod), err)
glog.Error(message)
return "", message, err
}
// Create pod logs directory
err = m.osInterface.MkdirAll(podSandboxConfig.LogDirectory, 0755)
if err != nil {
message := fmt.Sprintf("Create pod log directory for pod %q failed: %v", format.Pod(pod), err)
glog.Errorf(message)
return "", message, err
}
podSandBoxID, err := m.runtimeService.RunPodSandbox(podSandboxConfig)//runtimeService是kubeGenericRuntimeManager建立時候就初始化的
if err != nil {
message := fmt.Sprintf("CreatePodSandbox for pod %q failed: %v", format.Pod(pod), err)
glog.Error(message)
return "", message, err
}
return podSandBoxID, "", nil
}