OpenStack之RPC呼叫(一)
關於OpenStack中基於RESTFul API的通訊方式主要是應用了WSGI,我會在以後的博文中詳細討論。這裡主要還是分享一下關於RPC呼叫的一些理解。
首先,什麼是RPC呢?百度百科給出的解釋是這樣的:“RPC(Remote Procedure Call Protocol)——遠端過程呼叫協議,它是一種通過網路從遠端計算機程式上請求服務,而不需要了解底層網路技術的協議”。這個概念聽起來還是比較抽象,下面我會結合OpenStack中RPC呼叫的具體應用來具體分析一下這個協議。
其次,為什麼要採用RPC呢?單純的依靠RESTFul API不可以嗎?其實原因有下面這幾個:
1. 由於RESTFul API是基於HTTP協議的,因此客戶端與服務端之間所能傳輸的訊息僅限於文字
2. RESTFul API客戶端與服務端之間採用的是同步機制,當傳送HTTP請求時,客戶端需要等待服務端的響應。當然對於這一點是可以通過一些技術來實現非同步的機制的
3. 採用RESTFul API,客戶端與服務端之間雖然可以獨立開發,但還是存在耦合。比如,客戶端在傳送請求的時,必須知道伺服器的地址,且必須保證伺服器正常工作
基於上面這幾個原因,所以OpenStack才採用了另一種遠端通訊機制,這就是我們今天要討論的鼎鼎大名的RPC。
要了解OpenStack中的RPC,有一個元件是必不可少的,那就是RabbitMQ(訊息佇列)。OpenStack中,RPC採用AMQP協議實現程序間通訊,而RabbitMQ正是AMQP的實現方式,所以可以說OpenStack中的RPC呼叫都是基於RabbitMq完成的(注:有關AMQP、RabbitMQ的一些知識,可以參看我上篇分享的文章——《兔子與兔子洞》)。
在Nova中,定義了兩種遠端呼叫方式——rpc.call和rpc.cast。其中rpc.call方式是指request/response(請求/響應)模式,即客戶端在傳送請求後,繼續等待伺服器端的響應結果,待響應結果到達後,才結束整個過程。rpc.cast方式是指客戶端傳送RPC呼叫請求後,不等待伺服器端的響應結果。不難看出,較rpc.cast模式,rpc.call更為複雜。為了處理rpc.call,Nova採用了Topic Exchange(主題交換器)和Direct Exchange(直接交換器)兩種訊息交換器。其中Topic Exchange用於客戶端向伺服器端rpc.call的發起,Direct Exchange用於伺服器端向客戶端返回rpc.call。對應於這兩種交換機,Nova中定義了Topic/Direct訊息消費者、Topic/Direct訊息釋出者、Topic/Direct交換器。各部件如下圖所示:
需要說明的是一個主題交換器可以關聯多個佇列,而一個直接交換器只能關聯一個佇列。
結合rpc.call程式碼例項,開始我們的分析吧,程式碼結構如下:
client.py——RPC呼叫請求客戶端啟動指令碼
dispatcher.py——將客戶端釋出的訊息分發給相應的方法處理
impl_kombu.py——核心程式碼,實現了消費者、生產者、建立連線等操作
manager.py——定義處理RPC請求的方法
rpc_amqp.py——傳送RPC請求和發生RPC響應
rpc.py——為外部提供訪問API
server.py——伺服器端啟動指令碼
service.py——建立和管理RPC服務
接下來看一下rpc.call執行的流程:
1). RPC伺服器端定義並啟動RPC服務
2). RPC伺服器端建立和RabbitMQ伺服器的連線
3). RPC伺服器端建立和啟用主題消費者
4). RPC客戶端向主題交換器傳送RPC請求
5). RPC伺服器端接收和處理RPC請求
6). RPC客戶端建立和啟用直接消費者
7). RPC伺服器端向直接交換機發送RPC響應
1. 伺服器端:定義和啟動RPC服務
server.py
import service
srv = service.Service() #建立RPC服務
srv.start() #啟動RPC服務
while True:
srv.drain_events() #監聽RPC請求
以上程式碼的作用是建立Service物件,然後分別呼叫start和drain_events方法。Service類的定義在service.py中。
service.py
import rpc
import manager
import dispatcher
TOPIC = 'sendout_request'
class Service(object):
def __init__(self):
self.topic = TOPIC
self.manager = manager.Manager()
def start(self):
self.conn = rpc.create_connection()
rpc_dispatcher = dispatcher.RpcDispatcher(self.manager)
self.conn.create_consumer(self.topic, rpc_dispatcher)
self.conn.consume()
def drain_events(self):
self.conn.drain_events()
Service類中包含init、start、drain_events三個方法。
2. 伺服器端:建立與RabbitMQ的連線
Service類呼叫rpc.crerate_connection方法來建立連線。該方法會返回一個Connection物件。Connection物件的定義在impl_kombu.py檔案中。
impl_kombu.py
class Connection(object):
def __init__(self):
self.consumers = []
self.connection = None
self.reconnect()
def reconnect(self):
#初次重連的等待時間
sleep_time = conf.get('interval_start', 1)
#每次連線失敗後增加的等待時間
stepping = conf.get('interval_stepping', 2)
#重連的最大等待時間
interval_max = conf.get('interval_max', 30)
sleep_time -= stepping
while True:
try:
self._connect()
return
except Exception, e:
if 'timeout' not in str(e):
raise
sleep_time += stepping
sleep_time = min(sleep_time, interval_max)
print("AMQP Server is unreachable,"
"trying to connect %d seconds later\n" % sleep_time)
time.sleep(sleep_time)<pre name="code" class="python"> def _connect(self):
hostname = rabbit_params.get('hostname')
port = rabbit_params.get('port')
if self.connection: #如果已有連線,釋放原有連線
print("Reconnecting to AMQP Server on "
"%(hostname)s:%(port)d\n" % locals())
self.connection.release()
self.connection = None
self.connection = kombu.connection.BrokerConnection(**rabbit_params)
self.consumer_num = itertools.count(1) #消費者迭代器,產生消費者唯一的標識
self.connection.connect() #建立與RabbitMQ的連線
self.channel = self.connection.channel() #獲取連線的通道
for consumer in self.consumers:
consumer.reconnect(self.channel)
Connection類的初始化方法比較簡單,主要是呼叫reconnect方法。reconnect方法會不斷嘗試與RabbitMQ建立連線。_connect方法實現真正的連線的建立。
3. 伺服器端:建立和啟用主題消費者
(1)Service類通過Connection類的create_consumer方法建立消費者
impl_kombu.py
class Connection(object):
def create_consumer(self, topic, proxy):
proxy_cb = rpc_amqp.ProxyCallback(proxy)
self.declare_topic_consumer(topic, proxy_cb)
def declare_topic_consumer(self, topic, callback):
print('declaring topic consumer for topic %s...\n' % topic)
self.declare_consumer(TopicConsumer, topic, callback)
def declare_consumer(self, consumer_cls, topic, callback):
def _declare_consumer():
consumer = consumer_cls(self.channel,topic, callback,self.consumer_num.next())
self.consumers.append(consumer)
print('Succed declaring consumer for topic %s\n' % topic)
return consumer
return self.ensure(_declare_consumer, topic)
def ensure(self, method, topic):
while True:
try:
return method()
except Exception, e:
if 'timeout' not in str(e):
raise
print('Failed to declare consumer for topic %s: '
'%s\n' % (topic, str(e)))
self.reconnect()
ProxyCallback類是一個處理回撥的代理類,後續會說明。主要來看declare_topic_consumer方法,這個方法建立了消費者。declare_topic_consumer方法呼叫declare_consumer,向declare_consumer方法中傳入TopicConsumer類。declare_consumer方法對TopicConsumer類進行例項化,並通過ensure方法保證消費者建立成功。來看一下TopicConsumer這個類。
impl_kombu.py
class TopicConsumer(ConsumerBase):
def __init__(self, channel, topic, callback, tag, **kwargs):
self.topic = topic
options = {'durable': False,
'auto_delete': False,
'exclusive': False}
options.update(kwargs)
exchange = kombu.entity.Exchange(name=topic,
type='topic',
durable=options['durable'],
auto_delete=options['auto_delete'])
super(TopicConsumer, self).__init__(channel,
callback,
tag,
name=topic,
exchange=exchange,
routing_key=topic,
**options)
options變數用來設定交換器的屬性。這裡交換器的名稱為topic的值,tag為消費者的唯一標識。可以看到TopicConsumer類繼承自ConsumerBase類,來看一下這個基類。
impl_kombu.py
class ConsumerBase(object):
def __init__(self, channel, callback, tag, **kwargs):
self.callback = callback
self.tag = str(tag)
self.kwargs = kwargs
self.queue = None
self.reconnect(channel)
def reconnect(self, channel):
self.channel = channel
self.kwargs['channel'] = channel
self.queue = kombu.entity.Queue(**self.kwargs)
self.queue.declare()
ConsumerBase類的初始化方法呼叫了reconnect方法來建立佇列。
(2)啟用消費者
Service類通過Connection類的consume方法啟用消費者。Connection類的consume方法最終會呼叫ConsumerBase類的consume方法。
impl_kombu.py
class ConsumerBase(object):
def consume(self, *args, **kwargs):
options = {'consumer_tag': self.tag}
options['nowait'] = False
def _callback(raw_message):
message = self.channel.message_to_python(raw_message)
try:
msg = message.payload #獲取訊息體
self.callback(msg) #處理訊息
message.ack() #通知交換器訊息處理完畢
except Exception:
print("Failed to process message... skipping it.\n")
self.queue.consume(*args, callback=_callback, **options) #啟用消費者
主要定義了_callback方法,分發和處理RPC請求。
至此伺服器端的工作暫時告一段落,下一篇博文來我們再來分析下客戶端的工作和伺服器端的後續接收和處理RPC請求的工作。