1. 程式人生 > >Openstack liberty中nova-compute服務的啟動過程

Openstack liberty中nova-compute服務的啟動過程

前段時間撰文分析了“雲主機的啟動過程”原始碼,讀者應該注意到了nova-scheduler,nova-compute等元件是通過發起rpc.castrpc.call呼叫完成互動的。那今天我打算介紹下nova-compute服務的啟動過程,並重點分析下其與AMQP(rabbitmq)連結的建立過程。

在CentOS 7中啟動nova-compute服務,執行路徑是這樣的:
systemctl start openstack-nova-compute.service -> /usr/bin/nova-compute -> nova/cmd/compute.py/main

下面從入口main

開始分析,函式如下:

def main():

    """載入和設定配置引數,有兩點需要注意:
    1. 呼叫rpc.set_defaults設定預設的exchange為nova,如果不設定則為
    openstack
    2. 呼叫rpc.init設定Transport和Notifier,Transport是
    oslo_messaging/transport.py/Transport例項,我採用的是預設的
    rpc_backend=rabbit,所以Transport採用的driver=oslo_messaging/
    _drivers/impl_rabbit.py/RabbitDriver;Notifier是一個通知訊息發
    送器,它藉助Transport完成通知訊息的傳送
    """
config.parse_args(sys.argv) #省略其他配置程式碼 ....... """呼叫類方法nova/service.py/Service.create建立Service服務物件 輸入引數topic = compute, db_allowd = false;`create`方法是一個 類方法(@classmethod),它首先基於輸入引數和(/etc/nova.conf中的選 項)設定配置,然後建立一個Service物件並返回給呼叫者 """ server = service.Service.create(binary='nova-compute'
, topic=CONF.compute_topic, db_allowed=CONF.conductor.use_local) """呼叫server方法啟動服務並呼叫wait方法等待服務啟動完成,serve方法創 建Launcher服務啟動例項物件(這裡是ServiceLauncher)來啟動服務, 但最終會呼叫server.start方法啟動服務。 """ service.serve(server) service.wait()

後文的分析將分兩步進行:

  • Service物件的初始化
  • Service的啟動

Service物件的初始化

上文說到create方法會建立一個Service物件,下面一起來看看其建構函式:

def __init__(self, host, binary, topic, manager, 
                 report_interval=None,
                 periodic_enable=None, periodic_fuzzy_delay=None,
                 periodic_interval_max=None, db_allowed=True,
                 *args, **kwargs):
        """nova/service.py/Service.__init__
        輸入引數如下:
            host = 'devstack'
            binary = 'nova-compute'
            topic = 'compute'
            manager = 'nova.compute.manager.ComputeManager'
            report_interval = 10
            periodic_enable = True
            periodic_fuzzy_delay = 60
            periodic_interval_max = None
            db_allowed = False
            args = ()
            kwargs = {}
        建構函式中主要實現瞭如下功能:
        1. 成員變數賦值
        2. 初始化ComputeManager物件
        3. 初始化conductor API物件

        後文重點分析2和3
        """
        super(Service, self).__init__()
        self.host = host
        self.binary = binary
        self.topic = topic
        self.manager_class_name = manager
        #例項化servicegroup API(nova/servicegroup/api.py/API)
        #CONF.servicegroup_driver指定所使用的儲存驅動(可以是db,
        #zookeeper,memcache,預設是db)
        self.servicegroup_api = servicegroup.API()
        #例項化ComputeManager(nova/compute/manager.py/ComputeManager)
        manager_class = 
        importutils.import_class(self.manager_class_name)
        self.manager = manager_class(host=self.host, *args, **kwargs)
        self.rpcserver = None
        self.report_interval = report_interval
        self.periodic_enable = periodic_enable
        self.periodic_fuzzy_delay = periodic_fuzzy_delay
        self.periodic_interval_max = periodic_interval_max
        self.saved_args, self.saved_kwargs = args, kwargs
        self.backdoor_port = None
        #例項化conductor API(nova/conductor/api.py/API)
        self.conductor_api = conductor.API(use_local=db_allowed)
        #傳送ping訊息,等待nova-conductor服務準備就緒           
        self.conductor_api.wait_until_ready(
                                context.get_admin_context())

例項化ComputeManager

ComputeManager例項化,就是建立ComputeManager物件,然後呼叫其__init__方法,建立各種api介面及client rpc api,如:network、volume、image、conductor等,程式碼如下:

def __init__(self, compute_driver=None, *args, **kwargs):
        """Load configuration options and connect to the 
        hypervisor.
        """
        """nova/compute/manager.py/ComputeManager.__init__
        api的建立都是根據配置(/etc/nova.conf)中指定的類名稱,然後建立
        對應的API類例項,具體的類請看註釋; 而client rpc api則是
        #建立一個RPCClient例項並與特定的Target(指定了訊息的傳送目的
        地)及Transport(訊息傳輸層)關聯,後文以
        `nova/compute/rpcapi.py/ComputeAPI`為例分析具體的實現
        """
        #nova/compute/manager.py/ComputeVirtAPI
        self.virtapi = ComputeVirtAPI(self)
        #nova/network/neutronv2/api.py/API
        self.network_api = network.API()
        #nova/volume/cinder.py/API
        self.volume_api = volume.API()
        #nova/image/api.py/API
        self.image_api = image.API()
        self._last_host_check = 0
        self._last_bw_usage_poll = 0
        self._bw_usage_supported = True
        self._last_bw_usage_cell_update = 0
        #nova/compute/api.py/API
        self.compute_api = compute.API()
        #nova/compute/rpcapi.py/ComputeAPI
        #訊息Target = {topic='compute', version='4.0'}
        self.compute_rpcapi = compute_rpcapi.ComputeAPI()
        """nova/conductor/api.py/API,內部實現如下:
           建立`nova/conductor/rpcapi.py/ConductorAPI`例項,
        內部會建立一個rpc client,其Target如下:
           Target = {topic = 'conductor', version = '3.0'}
           建立`nova/baserpc.py/BaseAPI`例項,內部會建立一個rpc 
        client其Target如下:
           Target = {topic = 'conductor', 
                    namespace = 'baseapi', version = '1.0'}
        """
        self.conductor_api = conductor.API()
        """nova/conductor/rpc.py/ComputeTaskAPI
        1. nova/conductro/rpcapi.py/ComputeTaskAPI
            Target = {topic = 'conductor', 
                  namespace = 'compute_task', version = '1.0'}
        """
        self.compute_task_api = conductor.ComputeTaskAPI()
        #如果security_group_api配置為neutron或者quantum,則為True
        self.is_neutron_security_groups = (
            openstack_driver.is_neutron_security_groups())
        #nova/consoleauth/rpcapi.py/ConsoleAuthAPI
        # Target = {topic = 'consoleauth', version = '2.1'}
        self.consoleauth_rpcapi = consoleauth.rpcapi.ConsoleAuthAPI()
        #nova/cells/rpcapi.py/CellsAPI
        # Target = {topic = 'cells', version = '1.0'}
        self.cells_rpcapi = cells_rpcapi.CellsAPI()
        """nova/scheduler/client/__init__.py/SchedulerClient
        內部延遲建立(在使用時再建立):
        1. nova.scheduler.client.query.SchedulerQueryClient
            初始化時建立nova/scheduler/rpcapi.py/SchedulerAPI,
            Target = {topic = 'scheduler', version = '4.0'}
        2. nova.scheduler.client.report.SchedulerReportClient
        #兩個客戶端例項分別用來建立雲主機時選主及更新主機資訊
        """
        self.scheduler_client = scheduler_client.SchedulerClient()
        self._resource_tracker_dict = {}
        self.instance_events = InstanceEvents()
        self._sync_power_pool = eventlet.GreenPool()
        self._syncs_in_progress = {}
        self.send_instance_updates = 
                        CONF.scheduler_tracks_instance_changes
        if CONF.max_concurrent_builds != 0:
            self._build_semaphore = 
                        eventlet.semaphore.Semaphore(
                                    CONF.max_concurrent_builds)
        else:
            self._build_semaphore = 
                        compute_utils.UnlimitedSemaphore()
        if max(CONF.max_concurrent_live_migrations, 0) != 0:
            self._live_migration_semaphore = 
                        eventlet.semaphore.Semaphore(
                           CONF.max_concurrent_live_migrations)
        else:
            self._live_migration_semaphore = 
                        compute_utils.UnlimitedSemaphore()

        super(ComputeManager, 
                        self).__init__(service_name="compute",
                                             *args, **kwargs)

        # NOTE(russellb) Load the driver last.  It may call 
        #back into the
        # compute manager via the virtapi, so we want it to be 
        #fully
        # initialized before that happens.
        #使用的hypervisor是livirt,所以這裡是LibvirtDriver
        #(nova/virt/libvirt/driver/LibvirtDriver)
        self.driver = driver.load_compute_driver(self.virtapi, 
                                                compute_driver)
        #False                                      
        self.use_legacy_block_device_info = \
                    self.driver.need_legacy_block_device_info

下面以nova/compute/rpcapi.py/ComputeAPI為例,分析下rpc 客戶的的初始化:

def __init__(self):
    """nova/compute/rpcapi.py/ComputeAPI.__init__
    """
    super(ComputeAPI, self).__init__()
    #建立oslo_messaging/target.py/Target物件,該物件決定了
    #訊息的傳送目的地;topic指定訊息以topic模式傳送,version
    #指定了要求的訊息版本,在這裡CONF.compute_topic = 'compute'
    target = messaging.Target(topic=CONF.compute_topic, 
                                        version='4.0')
    #設定相容的訊息版本及建立序列化器                                   
    version_cap = 
    self.VERSION_ALIASES.get(CONF.upgrade_levels.compute,
                              CONF.upgrade_levels.compute)
    serializer = objects_base.NovaObjectSerializer()
    #建立rpc client api
    #(oslo_messaging/rpc/client.py/RPCClient)
    #正如之前分析rpc.init函式時所說,訊息是按照Target指定的模式通過
    #Transport傳輸到訊息佇列的
    self.client = self.get_client(target, version_cap, serializer)

小結:ComputeManager例項化過程主要是建立與雲主機操作相關的各種API及客戶端RPC , 包括:network,volume,image,conductor,scheduler等,為服務啟動後執行相關操作時提供服務

例項化conductor API並等待nova-conductor就緒

例項化conductor API

細心的讀者,會發現ComputeManager例項化過程中也建立了一個
nova/conductor/api.py/API例項,下面一起來看看程式碼:

def __init__(self):
    """1. 建立`nova/conductor/rpcapi.py/ConductorAPI`例項,內部會
    建立一個rpc client,其Target如下:
        Target = {topic = 'conductor', version = '3.0'}
       2. 建立`nova/baserpc.py/BaseAPI`例項,內部會建立一個rpc 
    client其Target如下:
        Target = {topic = 'conductor', 
                    namespace = 'baseapi', version = '1.0'}
    """
    self._manager = rpcapi.ConductorAPI()
    self.base_rpcapi = baserpc.BaseAPI(topic=CONF.conductor.topic)

等待nova-conductor就緒

nova-compute服務是通過前述__init__方法中建立的base_rpcapi傳送遠端ping請求來確認nova-conductor服務已經啟動的,如下:

def wait_until_ready(self, context, early_timeout=10, 
                    early_attempts=10):
    """nova/conductor/api.py/API.wait_until_ready
        該方法通過呼叫`self.base_rpcapi`傳送ping請求來確定
    `nova-conductor`的狀態,請求超時為early_timeout=10,重試次數為
    early_attempts=10;下面的程式碼中省略了異常及重試部分
    """

    """內部首先呼叫rpc客戶端api的prepare方法準備一個呼叫上下文
    `_CallContext`,該上下檔案與rpc api的Transport及Target關聯,
    接著呼叫_CallContext.call方法傳送同步ping請求(訊息內容為:'1.21 
    GigaWatts')到訊息佇列,nova-conductor服務收到訊息會給一個應答,否
    則會報超時異常;具體的處理過程請看下面的分析
    """
    self.base_rpcapi.ping(context, '1.21 GigaWatts',
                                      timeout=timeout)

def ping(self, context, arg, timeout=None):
    arg_p = jsonutils.to_primitive(arg)
    #呼叫RPCClient.prepare方法準備一個請求上下文
    #該方法實際上直接呼叫_CallContext.prepare類方法建立一個
    #_CallContext物件,並繫結Target、Transport和serializer
    cctxt = self.client.prepare(timeout=timeout)
    #呼叫_CallContext.call傳送同步ping請求,具體分析如下
    return cctxt.call(context, 'ping', arg=arg_p)

def call(self, ctxt, method, **kwargs):
    """Invoke a method and wait for a reply. See 
    RPCClient.call().
    """
    if self.target.fanout:
        raise exceptions.InvalidTarget('A call cannot be used' 
                                'with fanout', self.target)

    """封裝訊息,結果如下:
    {
    'args': {'arg': '1.21 GigaWatts'}, 
    'namespace': 'baseapi', 
    'method': 'ping', 
    'version': '1.0'
    }
    arg 是訊息內容,method是遠端請求方法,namespace和version取自
    Target,用來控制訊息的接受
    """
    msg = self._make_message(ctxt, method, kwargs)
    #self.serializer在建立RPCClient時建立,這裡是
    #nova/rpc.py/RequestContextSerializer,用來序列化上下文
    msg_ctxt = self.serializer.serialize_context(ctxt)

    timeout = self.timeout
    if self.timeout is None:
        timeout = self.conf.rpc_response_timeout

    if self.version_cap:
        self._check_version_cap(msg.get('version'))

    try:
        """呼叫oslo_messaging/transport.py/Transport._send傳送消
        息,wait_for_reply=True表示需要應答,內部在傳送訊息前會先建立
        一個consumer用來接受應答,Transport._send直接呼叫
        AMQPDriverBase._send傳送請求,下面具體分析
        """
        result = self.transport._send(self.target, msg_ctxt, 
                                          msg,
                                          wait_for_reply=True, 
                                          timeout=timeout,
                                          retry=self.retry)
   except driver_base.TransportDriverError as ex:
       raise ClientSendError(self.target, ex)
   #序列化應答訊息,並返回給呼叫者
   return self.serializer.deserialize_entity(ctxt, result)

#amqpdriver.py/AMQPDriverBase._send
def _send(self, target, ctxt, message,
              wait_for_reply=None, timeout=None,
              envelope=True, notify=False, retry=None):
    """省略某些非核心程式碼"""

    """同步ping訊息,需要等待應答,在這裡設定訊息id及應答佇列,結果如下:
    {
    '_msg_id': '221b9eafe51c475bb15fecafbd72ea17', 
    'version': '1.0', 
    '_reply_q': 'reply_83fbf8446b564899b1e3c89753a8689a', 
    'args': {'arg': '1.21 GigaWatts'}, 'namespace': 'baseapi', 
    'method': 'ping'
    }
    _get_reply_q方法用來建立應答佇列,內部會建立一個用於監聽的rpc連結
    (connection),連結通道(channel),消費者(direct模式),
    Exchange,繫結佇列與Exchange,啟動監聽執行緒(poll方法輪詢訊息)

    建立rpc連結及通道的呼叫如下:
    amqpdriver.py/AMQPDriverBase._get_connection -> 
    amqp.py/ConnectionContext -> amqp.py/ConnectionPool.create 
    -> impl_rabbit.py/Connection -> 
    komku/connection.py/Connection

    建立消費者及Exchange的呼叫如下:
    impl_rabbit.py/Connection.declare_direct_consumer -> 
    impl_rabbit.py/Consumer.declare_consumer
    """
    if wait_for_reply:
        msg_id = uuid.uuid4().hex
        msg.update({'_msg_id': msg_id})
        LOG.debug('MSG_ID is %s', msg_id)
        msg.update({'_reply_q': self._get_reply_q()})

    #新增uuid及打包上下文到訊息體中
    rpc_amqp._add_unique_id(msg)
    rpc_amqp.pack_context(msg, context)

    #序列化訊息
    if envelope:
        msg = rpc_common.serialize_msg(msg)

    #繫結應答訊息佇列
    if wait_for_reply:
        self._waiter.listen(msg_id)

    #傳送訊息
    try:
        #建立一個用於傳送訊息的rpc連結,呼叫邏輯與上面建立監聽rpc連結相
        #似,不再贅述不同的是,傳送連結是通過連結池建立的
        with self._get_connection(rpc_amqp.PURPOSE_SEND) as 
                                                        conn:
            #傳送通知訊息                                         
            if notify:
                conn.notify_send(self._get_exchange(target),
                                     target.topic, msg, retry=retry)
            #fanout模式                        
            elif target.fanout:
                conn.fanout_send(target.topic, msg, retry=retry)
            #ping訊息採用的是topic模式(回過頭看看建立RPCClient時建立
            #的Target就會明白了),target內容如下:
            #{topic=conductor, namespace=baseapi, version=1.0}
            else:
                topic = target.topic
                if target.server:
                    topic = '%s.%s' % (target.topic, target.server)
                #傳送訊息,根據target的內容,我們知道這裡使用預設的
                #exchange,也就是'nova',具體內容見下面的分析
                conn.topic_send(
                  exchange_name=self._get_exchange(target),
                                    topic=topic, msg=msg, 
                                    timeout=timeout,
                                    retry=retry)

        #等待應答
        if wait_for_reply:
            result = self._waiter.wait(msg_id, timeout)
            if isinstance(result, Exception):
                raise result
            return result
       finally:
           #刪除之前繫結的訊息佇列
           if wait_for_reply:
               self._waiter.unlisten(msg_id)

#impl_rabbit.py/Connection.topic_send
def topic_send(self, exchange_name, topic, msg, timeout=None, 
                retry=None):
    """Send a 'topic' message."""
    #建立一個型別為'topic'的Exchange,名字為'nova'
    exchange = kombu.entity.Exchange(
            name=exchange_name,
            type='topic',
            durable=self.amqp_durable_queues,
            auto_delete=self.amqp_auto_delete)

    #傳送訊息,routing_key = 'conductor', 該函式的內部封裝比較複雜
    #(各種重試程式碼,異常程式碼,裝飾器等用來盡力保證訊息傳送成功),
    #簡單來說:最後都會呼叫self._publish方法,內部會建立一個生產者並呼叫
    #其publish方法,進而呼叫channel釋出訊息;
    self._ensure_publishing(self._publish, exchange, msg,
                                routing_key=topic, retry=retry)

小結:通過上面的分析我們知道nova-compute服務依賴於nova-conductor服務,在啟動compute服務前需要保證conductor服務已經啟動,否則會失敗

總結下rabbitmq訊息生成者及消費者的使用步驟:

生產者

  • 建立連結
  • 建立通道
  • 建立Exchange
  • 建立生產者
  • 傳送訊息

消費者

  • 建立連結
  • 建立通道
  • 建立Exchange
  • 建立佇列並與Exchange繫結
  • 建立消費者
  • 向伺服器註冊
  • 等待接收訊息

Service的啟動

create方法返回就表明一切準備就緒了,下面來看看服務的啟動過程,並重點分析與rabbitmq的連結過程:

def main():

    .......

    """serve方法建立Launcher服務啟動例項物件(這裡是ServiceLauncher)
    來啟動服務,但最終會呼叫server.start方法啟動服務。下面來看start方法
    的實現
    """
    service.serve(server)

    .......

def start(self):
    """省略了某些非關鍵程式碼"""

    """manager是在初始化中建立的ComputeManager物件,init_host完成下
    面的工作:
    1. 完成LibvirtDriver相關的初始化
    2. 更新雲主機例項資訊
    3. 更新例項資訊到scheduler
    """
    self.manager.init_host()

    #更新節點資訊
    self.manager.pre_start_hook()

    #建立一個Target = {topic = 'compute', server = 'hostname'}
    target = messaging.Target(topic=self.topic, server=self.host)

    """建立endpoints, BaseRPCAPI內部會建立一個Target = {namespace 
    'baseapi', version = '1.1'}
    manager = ComputeManager,如果您尋根究底的話,會發現nova-
    compute服務從訊息佇列接受到訊息後,通過dispatcher分發器分發後,最終
    是會投遞給ComputeManager的某個方法處理的--這其實是顯然的,不是麼!
    """
    endpoints = [
            self.manager,
            baserpc.BaseRPCAPI(self.manager.service_name, 
                                        self.backdoor_port)
        ]
    endpoints.extend(self.manager.additional_endpoints)

    serializer = objects_base.NovaObjectSerializer()

    #建立rpc 伺服器,呼叫
    #oslo_messaging/rpc/server.py/get_rpc_server建立rpc伺服器,
    #返回oslo_messaging/server.py/MessageHandlingServer物件
    #下文具體分析
    self.rpcserver = rpc.get_server(target, endpoints, serializer)
    #啟動rpc 伺服器,下文重點分析
    self.rpcserver.start()

    .......

建立rpc server

#oslo_messaging/rpc/server.py/get_rpc_server
def get_rpc_server(transport, target, endpoints,
                   executor='blocking', serializer=None):
    """輸入引數如下:
    transport = oslo_messaging.transport.Transport
    target = {topic='compute', server='devstack'}
    endpoints = [nova.compute.manager.ComputeManager,
                    nova.baserpc.BaseRPCAPI]
    executor = 'eventlet'
    serializer = nova.rpc.RequestContextSerializer 
    """
    #建立訊息分發器
    dispatcher = rpc_dispatcher.RPCDispatcher(target, endpoints, serializer)
    """建立伺服器物件,內部根據executor載入對應的執行器,這裡是
oslo_messaging/_executors/impl_eventlet.py/EventletExecutor
    """
    return msg_server.MessageHandlingServer(transport, dispatcher, executor)

啟動rpc server

rpc server建立好了,下面來看它的啟動過程:


#oslo_messaging/server.py/MessageHandlingServer.start
def start(self):
    """Start handling incoming messages.
    This method causes the server to begin polling the 
    transport for incoming messages and passing them to the 
    dispatcher. Message processing will continue until the 
    stop() method is called.

    The executor controls how the server integrates with 
    the applications I/O handling strategy - it may choose 
    to poll for messages in a new process, thread or co-
    operatively scheduled coroutine or simply by
    registering a callback with an event loop. Similarly, 
    the executor may choose to dispatch messages in a new 
    thread, coroutine or simply the current thread.
    """
    self._check_same_thread_id()

    #只能啟動一次
    if self._executor is not None:
        return
    try:
        """在transport上啟動訊息監聽(註冊三個消費者),呼叫鏈如下: 
        dispatcher._listen -> transport._listen -> 
        driver._listen, 這裡的driver = RabbitDriver,最後返回
        AMQPListener物件,詳細內容請看後文
        """
        listener = self.dispatcher._listen(self.transport)
    except driver_base.TransportDriverError as ex:
        raise ServerListenError(self.target, ex)

    self._running = True
    #例項化執行器EventletExecutor,
    self._executor = self._executor_cls(self.conf, listener,
                                            self.dispatcher)
    #啟動EventletExecutor,呼叫listener.poll方法監聽訊息
    #收到訊息後通過dispatcher分發給ComputeManager的對應方法處理                                  
    self._executor.start()

下面具體來看看nova-compute中三個消費者的註冊過程

註冊消費者

承接上文,繼續來看RabbitDriver._listen方法的實現:

#oslo_messaging/_drivers/amqpdriver.py/AMQPDriverBase._listen
def listen(self, target):
    """target是之前在start方法中建立的Target物件,內容如下:
        {topic='compute', server='devstack'}
    """
    """建立一個監聽用途的rpc,具體的過程在上文分析過,函式呼叫鏈如下:
    amqpdriver.py/AMQPDriverBase._get_connection -> 
    amqp.py/ConnectionContext -> amqp.py/ConnectionPool.create 
    -> impl_rabbit.py/Connection -> 
    komku/connection.py/Connection
    """
    conn = self._get_connection(rpc_amqp.PURPOSE_LISTEN)
    #建立一個監聽器amqpdriver.py/AMQPListener,並與連結繫結
    listener = AMQPListener(self, conn)

    """下面建立了三個消費者,用於接收其他元件/模組傳送過來的訊息
    1. 第一個是topic型別的消費者,exchange_name = 'nova',topic = 
    'compute'
    2. 第二個也是topci型別的消費者,exchange_name = 'nova',topic = 
    'compute.devstack'
    3. 第三個是fanout型別的消費者,topic = 'compute'

    讀者可以通過rabbitmqctl工具檢視相關的exchange,佇列,生產者,消費
    者資訊

    下文以第一個消費者的建立為例具體分析程式碼實現
    """
    conn.declare_topic_consumer(
              exchange_name=self._get_exchange(target),
                                    topic=target.topic,
                                    callback=listener)
    conn.declare_topic_consumer(
                exchange_name=self._get_exchange(target),
                topic='%s.%s' % (target.topic, target.server),
                                    callback=listener)

    conn.declare_fanout_consumer(target.topic, listener)

    return listener

-------------------------------------------------------------
#impl_rabbit.py/Connection.declare_topic_consumer
def declare_topic_consumer(self, exchange_name, topic, 
                                callback=None,
                               queue_name=None):
    """輸入引數如下:
        exchange_name = 'nova'
        topic = 'compute'
        callback = AMQPListener
        queue_name = None
    """     
    #內部會建立一個名為exchange_name,型別為type的Exchange,
    #amqp_durable_queues = False,預設值,重啟後訊息丟失
    #amqp_auto_delete = False,預設值,不自動刪除訊息
    #rabbit_ha_queues = False,預設值,沒有HA
    consumer = Consumer(exchange_name=exchange_name,
                        queue_name=queue_name or topic,
                        routing_key=topic,
                        type='topic',
                        durable=self.amqp_durable_queues,
                        auto_delete=self.amqp_auto_delete,
                        callback=callback,
                        rabbit_ha_queues=self.rabbit_ha_queues)

    #為確保consumer向伺服器註冊成功,declare函式經過了層層的跳轉,封
    #裝,裝飾,重試等非常複雜的封裝,下面的分析中,只給出正常的處理邏輯並忽
    #略大部分的異常處理,具體看下文的分析
    self.declare_consumer(consumer)

-------------------------------------------------------
接著來看上文的declare_consumer方法:

def declare_consumer(self, consumer):
    """Create a Consumer using the class that was passed in 
    and add it to our list of consumers
    該函式中進行了第一次封裝:定義了_declare_consumer, 封裝具體
    consumer的declare函式,目的是註冊成功後將新增consumer到列表中
    """

    #連線異常處理回撥函式
    def _connect_error(exc):
        log_info = {'topic': consumer.routing_key, 'err_str': exc}
        LOG.error(_LE("Failed to declare consumer for topic '%'
                           '(topic)s': %(err_str)s"), log_info)

    #消費者的回撥處理函式,執行真正的消費者註冊,declare程式碼請看下面的分析
    def _declare_consumer():
        consumer.declare(self)
        self._consumers.append(consumer)
        self._new_consumers.append(consumer)
        return consumer
    """ensure內部包含大量的異常重試程式碼定義對_declare_consumer物件裝飾
    封裝,目的是確保消費者註冊成功;具體的封裝過程請看下文的分析
    """
    with self._connection_lock:
        return self.ensure(_declare_consumer,
                               error_callback=_connect_error)

---------------------------------------------------------------
繼續第二層封裝前,先來看看consumer的declare的程式碼:

#impl_rabbit.py/Consumer.declare
 def declare(self, conn):
     """Re-declare the queue after a rabbit (re)connect.
     函式邏輯很簡單:首先建立一個佇列,然後向rabbitmq註冊該佇列及
     exchange並將佇列與exchange繫結
     """

     #建立一個名為'compute'的佇列,self.*引數在上文建立Consumer是指定
     self.queue = kombu.entity.Queue(
            name=self.queue_name,
            channel=conn.channel,
            exchange=self.exchange,
            durable=self.durable,
            auto_delete=self.auto_delete,
            routing_key=self.routing_key,
            queue_arguments=self.queue_arguments)

     try:
         LOG.trace('ConsumerBase.declare: '
                      'queue %s', self.queue_name)
         #向rabbitmq註冊佇列及exchange並繫結佇列與exchange
         self.queue.declare()
     except conn.connection.channel_errors as exc:
         """NOTE(jrosenboom): This exception may be triggered 
         by a race condition. Simply retrying will solve the 
         error most of the time and should work well enough as 
         a workaround until the race condition itself can be 
         fixed.See 
         https://bugs.launchpad.net/neutron/+bug/1318721 
         for details.
         """
         if exc.code == 404:
             self.queue.declare()
         else:
             raise
--------------------------------------------------------------
繼續來看看後續的裝飾及封裝過程,裝飾及封裝的目的只有一個:確保消費者能夠向rabbitmq伺服器註冊成功:

#impl_rabbit.py/Connection.enusre
def ensure(self, method, retry=None,
               recoverable_error_callback=None, 
               error_callback=None,
               timeout_is_error=True):
    """這是第二層封裝,定義execute_method方法封裝第一層中輸入的處理函式
    _declare_consumer
    該函式定義了多個回撥處理函式,限於篇幅不給出具體程式碼,簡單說明如下:
    1. on_error - 處理連線異常的回撥
    2. on_recconnect - 處理重連成功後的回撥
    3. execute_method - 正常的回撥,是_declare_consumer的封裝
    """

    def execute_method(channel):
        self._set_current_channel(channel)
        #method = _declare_consumer
        method()

    """省略了try {} except 異常處理程式碼"""

    #呼叫komku/connection.py/Connection.autoretry方法呼叫ensure方
    #法再次封裝函式呼叫,詳情請繼續看下面的分析
    autoretry_method = self.connection.autoretry(
                execute_method, channel=self.channel,
                max_retries=retry,
                errback=on_error,
                interval_start=self.interval_start or 1,
                interval_step=self.interval_stepping,
                on_revive=on_reconnection,
                )
    """根據對上述autoretry的程式碼分析,我們知道autoretry_method = 
    execute_method(ensured),實際的呼叫鏈如下:_ensured -> 
    revive.__call__ -> execute_method -> _declare_consumer
    """
    ret, channel = autoretry_method()
    self._set_current_channel(channel)
    return ret   

------------------------------------------------------------ 
下面的解讀包含第三層及第四層封裝,第三層中定義了一個callable物件Revival來封裝上層輸入的處理函式execute_method;而第四層中定義的_ensured方法包含‘死迴圈’重連來保證消費者註冊成功

#komku/connection.py/Connection.autoretry       
def autoretry(self, fun, channel=None, **ensure_options):
    """可以把該方法看成一個裝飾器,建立Revival類封裝輸入方法fun = 
    execute_method,Revival類定義了__call__方法,說明它是一個
    callable物件
    """
    channels = [channel]
    create_channel = self.channel

    class Revival(object):
         __name__ = getattr(fun, '__name__', None)
         __module__ = getattr(fun, '__module__', None)
         __doc__ = getattr(fun, '__doc__', None)

         def revive(self, channel):
             channels[0] = channel

         def __call__(self, *args, **kwargs):
             if channels[0] is None:
                 self.revive(create_channel())
             #fun = execute_method
             return fun(*args, channel=channels[0], **kwargs), 
                                                    channels[0]
    """建立Revival物件,作為ensure的輸入引數並繼續封裝以確保操作的成功
    ensure方法內部定義_ensured方法,它是revive.__call__方法的封裝,包
    含異常及重試程式碼,_ensured方法的__name__,__module__,__doc__屬性
    都被按如下方式替換為Revival物件的__name__,__module__,__doc__:
    _ensured.__name__ = "%s(ensured)" % fun.__name__
    _ensured.__doc__ = fun.__doc__
    _ensured.__module__ = fun.__module__
    所以最後返回的是名為execute_method(ensured)的方法,時間的函式包裝
    是這樣的:_ensured -> revive.__call__ -> execute_method -> 
    _declare_consumer
    """
    revive = Revival()
    return self.ensure(revive, revive, **ensure_options)

到這裡nova-compute服務內的rpc server就建立好了,通過上文的分析我們得到下面幾個結論:

  • nova-compute服務依賴於nova-conductor服務
  • nova-compute服務依賴於訊息佇列(預設rabbitmq)
  • nova-compute服務啟動過程中會建立三個消費者(兩個topic型別,一個fanout型別)
  • 最後您會發現每個openstack服務程序(API除外)都建立了三個消費者,可以通過rabbitmqctl命令列工具檢視相關資訊!

希望對大家有幫助,本文完!