Ocata Neutron代碼分析(二)——Neutron RPC啟動過程分析
阿新 • • 發佈:2017-12-21
gre add ice common multi tween wait函數 tex 依次
RPC啟動跟Neutron API的啟動在同一個函數中執行,neutron.server.wsgi_eventlet.py中的eventlet_wsgi_server。
def eventlet_wsgi_server(): neutron_api = service.serve_wsgi(service.NeutronApiService) start_api_and_rpc_workers(neutron_api) def start_api_and_rpc_workers(neutron_api): try: worker_launcher = service.start_all_workers() poolstart_api_and_rpc_workers函數中使用start_all_workers函數來啟動RPC相關的workers,返回worker_launcher對象(實際上是oslo_service.service::ProcessLauncher的實例)。接著,主進程使用GreenPool分別spawn出neutron_api和worker_launcher的wait函數,並調用waitall函數來等待這兩個GreenThread結束。其中的兩個link函數是為了保證其中一個服務掛掉,另外一個服務也要隨之結束。 下面重點分析neutron.service的start_all_workers函數:= eventlet.GreenPool() api_thread = pool.spawn(neutron_api.wait) plugin_workers_thread = pool.spawn(worker_launcher.wait) # api and other workers should die together. When one dies, # kill the other. api_thread.link(lambda gt: plugin_workers_thread.kill()) plugin_workers_thread.link(lambda gt: api_thread.kill()) pool.waitall() except NotImplementedError: LOG.info(_LI("RPC was already started in parent process by " "plugin.")) neutron_api.wait()
defstart_all_workers主要完成獲取workers並啟動的工作,這裏的workers分為兩類:start_all_workers(): workers = _get_rpc_workers() + _get_plugins_workers() return _start_workers(workers)
- _get_rpc_workers返回每個plugin(包括core plugin和service plugin)所必須的rpc worker;
- _get_plugins_workers返回每一種(跟第一點中的每個plugin不同)plugin中用於實現自己特殊需求(如果有)的rpc workers。
def _get_rpc_workers(): plugin = directory.get_plugin() # 獲取core plugin service_plugins = directory.get_plugins().values() # 獲取plugin,包括core plugin和service plugin if cfg.CONF.rpc_workers < 1: # 檢查配置文件中的rpc_workers的值,必須大於或等於1 cfg.CONF.set_override(‘rpc_workers‘, 1) # If 0 < rpc_workers then start_rpc_listeners would be called in a # subprocess and we cannot simply catch the NotImplementedError. It is # simpler to check this up front by testing whether the plugin supports # multiple RPC workers. if not plugin.rpc_workers_supported(): LOG.debug("Active plugin doesn‘t implement start_rpc_listeners") if 0 < cfg.CONF.rpc_workers: LOG.error(_LE("‘rpc_workers = %d‘ ignored because " "start_rpc_listeners is not implemented."), cfg.CONF.rpc_workers) raise NotImplementedError() # passing service plugins only, because core plugin is among them rpc_workers = [RpcWorker(service_plugins, worker_process_count=cfg.CONF.rpc_workers)] if (cfg.CONF.rpc_state_report_workers > 0 and plugin.rpc_state_report_workers_supported()): rpc_workers.append( RpcReportsWorker( [plugin], worker_process_count=cfg.CONF.rpc_state_report_workers ) ) return rpc_workersplugin.rpc_workers_supported實際上是檢查core plugin是否實現了start_rpc_listeners方法。 接著,實例化了RpcWorker類,這是這個函數中最關鍵的步驟。RpcWorker類和Neutron API部分的WorkerService類一樣,是繼承於neutron_worker.NeutronWorker類的子類,同樣實現了start函數。 最後,判斷配置文件中的rpc_state_report_workers和core plugin是否支持對rpc state的report。如果支持,則再啟動rpc_state_report_workers這麽多個子進程來運行 RpcReportsWorker。這個Worker是RpcWorker的子類,只是其start_listeners_method跟RpcWorker不同。
class RpcReportsWorker(RpcWorker): start_listeners_method = ‘start_rpc_state_reports_listener‘
下面分析RpcWorker的構造函數和其start函數:
class RpcWorker(neutron_worker.NeutronWorker): """Wraps a worker to be handled by ProcessLauncher""" start_listeners_method = ‘start_rpc_listeners‘ def __init__(self, plugins, worker_process_count=1): # 構造函數只是進行了變量的簡單賦值 super(RpcWorker, self).__init__( worker_process_count=worker_process_count ) self._plugins = plugins self._servers = [] def start(self): super(RpcWorker, self).start() for plugin in self._plugins: if hasattr(plugin, self.start_listeners_method): try: servers = getattr(plugin, self.start_listeners_method)() except NotImplementedError: continue self._servers.extend(servers)start函數會遍歷所有的plugins(包括core plugin和service plugins),查看各個plugin是否實現了start_listeners_method(即start_rpc_listeners)方法,如果實現了則調用之,如果沒有就跳過。 這就是RpcWorker的作用。各個plugin的start_rpc_listeners方法中就完成了rpc的功能,主要是通過消費特定名稱的mq隊列消息來提供服務。 下面分析_get_plugins_workers函數:
def _get_plugins_workers(): # NOTE(twilson) get_plugins also returns the core plugin plugins = directory.get_unique_plugins() # TODO(twilson) Instead of defaulting here, come up with a good way to # share a common get_workers default between NeutronPluginBaseV2 and # ServicePluginBase return [ plugin_worker for plugin in plugins if hasattr(plugin, ‘get_workers‘) for plugin_worker in plugin.get_workers() ]_get_plugins_workers函數檢查了每種plugin(不是每一個)中是否實現了get_workers方法,並將該方法返回的所有workers收集後返回。這裏的get_workesr方法返回plugin用於實現自己的特殊需求或提供個性化服務的workers。 兩個收集workers的函數執行完後,下面就是啟動各個workers的函數:
def start_all_workers(): workers = _get_rpc_workers() + _get_plugins_workers() return _start_workers(workers)
def _start_workers(workers): process_workers = [ plugin_worker for plugin_worker in workers if plugin_worker.worker_process_count > 0 ] # 篩選出worker_process_count大於0的workers try: if process_workers: # 如果存在worker_process_count大於0的workers worker_launcher = common_service.ProcessLauncher( cfg.CONF, wait_interval=1.0 ) # add extra process worker and spawn there all workers with # worker_process_count == 0 thread_workers = [ plugin_worker for plugin_worker in workers if plugin_worker.worker_process_count < 1 ] if thread_workers: process_workers.append( AllServicesNeutronWorker(thread_workers) ) # dispose the whole pool before os.fork, otherwise there will # be shared DB connections in child processes which may cause # DB errors. session.context_manager.dispose_pool() for worker in process_workers: worker_launcher.launch_service(worker, worker.worker_process_count) else: # 如果workers中的所有worker的worker_process_count都為0 worker_launcher = common_service.ServiceLauncher(cfg.CONF) for worker in workers: worker_launcher.launch_service(worker) return worker_launcher except Exception: with excutils.save_and_reraise_exception(): LOG.exception(_LE(‘Unrecoverable error: please check log for ‘ ‘details.‘))_start_workers首先判斷workers中是否存在worker_process_count大於0的workers。
- 如果不存在(else分支):實例化ServiceLauncher,並調用其launch_service方法依次在當前的進程中啟動各個worker;
- 如果存在(if分支):
- 首先實例化ProcessLauncher;
-
接著對workers中可能存在的worker_process_count為0的worker進行處理,將這樣的worker形成thread_workers列表,並將這些worker作為參數實例化AllServicesNeutronWorker(AllServicesNeutronWorker也是NeutronWorker的子類並重寫了start方法,直接調用Launcher.launch_service啟動服務)。最後將AllServicesNeutronWorker實例append到process_workers列表中;
- 最後,調用ProcessLauncher.launch_service方法依次啟動各個worker(與Neutron API的啟動一樣)。
- ServiceLauncher是將任務放到greenthread中運行;
- ProcessLauncher是將任務放到os fork出來的子進程中運行。
Ocata Neutron代碼分析(二)——Neutron RPC啟動過程分析