1. 程式人生 > >Kuryr kubernetes 原始碼簡介

Kuryr kubernetes 原始碼簡介

Kuryr kubernetes 分為3類獨立可執行程式,分別為Kuryr Controller,Kuryr CNI,Kuryr CNI daemon(可選,抽離CNI的watach 功能以減少多pod同時啟動時候的資源浪費俄)。 Kuryr Controller 獨立執行,功能是 watch k8s API 和從neutron 申請資源port等,通過k8s API修改 pod 的annotate 裡網路資訊。 Kuryr CNI 在每個work node 備用。當pod啟動或刪除呼叫該外掛,該外掛具體執行veth建立和該veth 和neutron 建立的port 連線管理。如果沒有Kuryr CNI daemon,還要負責建立時watch k8s API 直到Kuryr Controller 將網路資訊寫入 pod 的annotate 。 Kuryr CNI daemon 在每個work node 執行。當他執行時,主要抽離出Kuryr CNI 的watch 功能和執行功能。具體執行veth建立和該veth 和neutron 建立的port 連線管理也歸它管理,Kuryr CNI 只保留CNI 入口功能,如同 Kuryr CNI daemon的客戶端Kuryr CNI 和 Kuryr CNI daemon用同一個應用程式,引數不同執行方式就不同。
Kuryr Controller 入口 kuryr_kubernetes/controller/service.py
class KuryrK8sService(service.Service): """Kuryr-Kubernetes controller Service."""
def __init__(self): super(KuryrK8sService, self).__init__()
objects.register_locally_defined_vifs() pipeline = h_pipeline.ControllerPipeline(self.tg)//建立pipeline self.watcher = watcher.Watcher(pipeline, self.tg) # TODO(ivc): pluggable resource/handler registration for resource in ["pods", "services", "endpoints"]: //watch k8s API 這三種資源 self.watcher.add("%s/%s" % (constants.K8S_API_BASE, resource)) pipeline.register(h_vif.VIFHandler()) //註冊處理函式,watch到的變動將由這些註冊過的處理函式處理 pipeline.register(h_lbaas.LBaaSSpecHandler()) pipeline.register(h_lbaas.LoadBalancerHandler())
def start(self): LOG.info("Service '%s' starting", self.__class__.__name__) health.ReadinessChecker() super(KuryrK8sService, self).start() self.watcher.start() LOG.info("Service '%s' started", self.__class__.__name__)
關於 watch kuryr_kubernetes/watcher.py def start(self): self._running = True for path in self._resources - set(self._watching): self._start_watch(path) def _start_watch(self, path): tg = self._thread_group self._idle[path] = True if tg: self._watching[path] = tg.add_thread(self._watch, path)//thread_group新增觀察執行緒 else: self._watching[path] = None self._watch(path)
回到VIFHandler kuryr_kubernetes/controller/handlers/vif.py def on_present(self, pod): vif = self._get_vif(pod) if not vif://如果觀察到的pod沒有vif project_id = self._drv_project.get_project(pod) security_groups = self._drv_sg.get_security_groups(pod, project_id) subnets = self._drv_subnets.get_subnets(pod, project_id) vif = self._drv_vif_pool.request_vif(pod, project_id, subnets, security_groups)//從vif_pool申請到port try: self._set_vif(pod, vif)//將vif資訊寫回k8s API 裡pod 的annotation except k_exc.K8sClientException as ex: LOG.debug("Failed to set annotation: %s", ex) # FIXME(ivc): improve granularity of K8sClient exceptions: # only resourceVersion conflict should be ignored self._drv_vif_pool.release_vif(pod, vif, project_id, security_groups) elif not vif.active://如果觀察到的pod沒有vif沒有active self._drv_vif_pool.activate_vif(pod, vif) self._set_vif(pod, vif)//將vif資訊寫回k8s API 裡pod 的annotation
接下看如何從vif_pool申請到port kuryr_kubernetes/controller/drivers/vif_pool.py def request_vif(self, pod, project_id, subnets, security_groups): try: host_addr = self._get_host_addr(pod) except KeyError: LOG.warning("Pod has not been scheduled yet.") raise pool_key = (host_addr, project_id, tuple(sorted(security_groups))) try: return self._get_port_from_pool(pool_key, pod, subnets)//從pool中獲取提前申請的port except exceptions.ResourceNotReady as ex: LOG.warning("Ports pool does not have available ports!") eventlet.spawn(self._populate_pool, pool_key, pod, subnets)//如果資源不足,協程申請埠 raise ex
def _get_port_from_pool(self, pool_key, pod, subnets): try: port_id = self._available_ports_pools[pool_key].pop() except IndexError: raise exceptions.ResourceNotReady(pod) if config.CONF.kubernetes.port_debug: neutron = clients.get_neutron_client() neutron.update_port( port_id, { "port": { 'name': pod['metadata']['name'], 'device_id': pod['metadata']['uid'] } }) # check if the pool needs to be populated if (self._get_pool_size(pool_key) < oslo_cfg.CONF.vif_pool.ports_pool_min)://如果快取的提前申請的port數量不足最小數量 eventlet.spawn(self._populate_pool, pool_key, pod, subnets)//協程申請埠 return self._existing_vifs[port_id]
具體ports_pool 如何申請埠 def _populate_pool(self, pool_key, pod, subnets): ............ vifs = self._drv_vif.request_vifs( pod=pod, project_id=pool_key[1], subnets=subnets, security_groups=list(pool_key[2]), num_ports=num_ports) for vif in vifs: self._existing_vifs[vif.id] = vif self._available_ports_pools.setdefault(pool_key, []).append(vif.id) 具體vif 如何申請埠 kuryr_kubernetes/controller/drivers/neutron_vif.py def request_vif(self, pod, project_id, subnets, security_groups): neutron = clients.get_neutron_client()//獲取neutron客戶端 rq = self._get_port_request(pod, project_id, subnets, security_groups)//生成request body port = neutron.create_port(rq).get('port')//申請port vif_plugin = self._get_vif_plugin(port) return ovu.neutron_to_osvif_vif(vif_plugin, port, subnets)
CNI入口在kuryr_kubernetes/cni/main.py 。CNI 和CNI daemon用同一個可執行程式,根據引數可以執行為daemon或者外掛形式。 if CONF.cni_daemon.daemon_enabled: runner = cni_api.CNIDaemonizedRunner()//守護程序形式執行 else: runner = cni_api.CNIStandaloneRunner(K8sCNIPlugin()) //外掛形式執行 以外掛形式執行時,需要滿足CNI協議引數要求,即引數是ADD,DEL,VERSION kuryr_kubernetes/cni/api.py def run(self, env, fin, fout): try: # Prepare params according to calling Object params = self.prepare_env(env, fin) if env.get('CNI_COMMAND') == 'ADD': vif = self._add(params) self._write_dict(fout, vif) elif env.get('CNI_COMMAND') == 'DEL': self._delete(params) elif env.get('CNI_COMMAND') == 'VERSION': self._write_version(fout) else: raise k_exc.CNIError(_("unknown CNI_COMMAND: %s") % env['CNI_COMMAND']) return 0 except Exception as ex: # LOG.exception self._write_exception(fout, str(ex)) return 1
class K8sCNIPlugin(cni_api.CNIPlugin):
def add(self, params): self._setup(params) self._pipeline.register(h_cni.AddHandler(params, self._done)) //register add handler self._watcher.start() return self._vif
def delete(self, params): self._setup(params) self._pipeline.register(h_cni.DelHandler(params, self._done)) self._watcher.start()
def _done(self, vif): self._vif = vif self._watcher.stop()
def _setup(self, params): clients.setup_kubernetes_client() self._pipeline = h_cni.CNIPipeline() self._watcher = k_watcher.Watcher(self._pipeline) self._watcher.add( //watch k8s API "%(base)s/namespaces/%(namespace)s/pods" "?fieldSelector=metadata.name=%(pod)s" % { 'base': k_const.K8S_API_BASE, 'namespace': params.args.K8S_POD_NAMESPACE, 'pod': params.args.K8S_POD_NAME})
kuryr_kubernetes/cni/handlers.py class AddHandler(CNIHandlerBase): def __init__(self, cni, on_done): LOG.debug("AddHandler called with CNI env: %r", cni) super(AddHandler, self).__init__(cni, on_done) self._vif = None def on_vif(self, pod, vif): if not self._vif: self._vif = vif.obj_clone() self._vif.active = True b_base.connect(self._vif, self._get_inst(pod), //連線pod和port self._cni.CNI_IFNAME, self._cni.CNI_NETNS) if vif.active: self._callback(vif)
kuryr_kubernetes/cni/binding/base.py 連線和斷開連線 def connect(vif, instance_info, ifname, netns=None): driver = _get_binding_driver(vif)//獲取驅動 os_vif.plug(vif, instance_info)//通過os_vif包連線vif和網路介面 driver.connect(vif, ifname, netns)//呼叫驅動的連線函式 _configure_l3(vif, ifname, netns)//配置l3的路由,閘道器資訊 def disconnect(vif, instance_info, ifname, netns=None): driver = _get_binding_driver(vif) driver.disconnect(vif, ifname, netns) os_vif.unplug(vif, instance_info)
class VIFOpenVSwitchDriver(BaseBridgeDriver): def connect(self, vif, ifname, netns): super(VIFOpenVSwitchDriver, self).connect(vif, ifname, netns) # FIXME(irenab) use pod_id (neutron port device_id) instance_id = 'kuryr' net_utils.create_ovs_vif_port(vif.bridge_name, vif.vif_name, vif.port_profile.interface_id, vif.address, instance_id)
具體新增veth到br-int。最後cmd形式呼叫ovs-vsctl kuryr_kubernetes/linux_net_utils.py def _ovs_vsctl(args, timeout=None): full_args = ['ovs-vsctl'] if timeout is not None: full_args += ['--timeout=%s' % timeout] full_args += args try: return processutils.execute(*full_args, run_as_root=True) except Exception as e: LOG.error("Unable to execute %(cmd)s. Exception: %(exception)s", {'cmd': full_args, 'exception': e}) raise def _create_ovs_vif_cmd(bridge, dev, iface_id, mac, instance_id)://生成命令列引數 cmd = ['--', '--if-exists', 'del-port', dev, '--', 'add-port', bridge, dev, '--', 'set', 'Interface', dev, 'external-ids:iface-id=%s' % iface_id, 'external-ids:iface-status=active', 'external-ids:attached-mac=%s' % mac, 'external-ids:vm-uuid=%s' % instance_id] return cmd def create_ovs_vif_port(bridge, dev, iface_id, mac, instance_id): _ovs_vsctl(_create_ovs_vif_cmd(bridge, dev, iface_id, mac, instance_id)) def delete_ovs_vif_port(bridge, dev): _ovs_vsctl(['--', '--if-exists', 'del-port', bridge, dev])