RabbitMQ and Oslo.messaging
基本概念
Openstack的官網上也有這個的使用介紹:http://docs.openstack.org/developer/oslo.messaging/
Roles
producer, exchange, queue, message, consumer
Producer
RPC ways. In order to receive a response the client needs to send a ‘callback’ queue address with the request. e.g. specify properties=pika.BasicProperties(reply_to = callback_queue_name)
correlation_id
to create a single callback queue per client.
If producer needs to know if message reached at least one queue, it can set the mandatory flag on a basic.publish, ensuring that a basic.return will be sent back to the client if no queues were appropriately bound.
Exchange
In RabbitMQ a message can never be sent directly to the queue, Instead, the producer can only send messages to an exchange, There are a few exchange types available: direct, topic, headers and fanout, the routing-key’s value is ignored for fanout exchanges.
Topic exchange is powerful and can behave like other exchanges.
Queue
Queues can optionally be made mirrored across multiple nodes. Each mirrored queue consists of one master and one or more slaves, with the oldest slave being promoted to the new master if the old master disappears for any reason.
Exclusive queues will be deleted when the connection that declared them is closed
Consumer
By default, RabbitMQ will send each message to the next consumer listening to the same queue, in sequence. This way of distributing messages is called round-robin.
Consumer applications will need to perform deduplication or handle incoming messages in an idempotent manner.
If a message is delivered to a consumer and then requeued (because it was not acknowledged before the consumer connection dropped, for example) then RabbitMQ will set the redelivered flag on it when it is delivered again (whether to the same consumer or a different one).
If a consumer determines that it cannot handle a message then it can reject it using basic.reject (or basic.nack), either asking the server to requeue it, or not.
Reliability
Acknowledgment
Message acknowledgments are turned on by default. If consumer dies without sending an ack, RabbitMQ will understand that a message wasn’t processed fully and will redeliver it to another consumer. That way you can be sure that no message is lost, even if the
workers occasionally die.
(kong)該特性需要在consumer的程式碼中顯式的傳送ack,如:ch.basic_ack(delivery_tag = method.delivery_tag)
;再比如OpenStack中olso.messaging中的程式碼。
There aren’t any message timeouts; RabbitMQ will redeliver the message only when the worker connection dies. It’s fine even if processing a message takes a very, very long time.
Confirmation
Two things are required to make sure that messages aren’t lost if RabbitMQ server stops: we need tomark both the exchange/queue and messages as durable. But there is still a short time window when RabbitMQ has accepted a message and hasn’t saved it yet. If
you need a stronger guarantee then you can use publisher confirms.
For routable messages, the basic.ack is sent when a message has been accepted by all the queues. For persistent messages routed to durable queues, this means persisting to disk. For mirrored queues, this means that all mirrors have accepted the message.
Distributed Broker
Sometimes however it is necessary or desirable to make the RabbitMQ broker itself distributed. There are three ways in which to accomplish
that: with clustering, with federation, and using the shovel.
Heartbeat
Heartbeat frames are sent about every timeout / 2 seconds. After two missed heartbeats, the peer is considered to be unreachable. Different clients manifest this differently but the TCP connection will be closed. When a client detects that RabbitMQ node is
unreachable due to a heartbeat, it needs to re-connect.
Commands
rabbitmqctl list_queues name messages_ready messages_unacknowledged
rabbitmqctl list_exchanges
rabbitmqctl list_bindings
Oslo.messaging
終於可以用中文寫了。
oslo.messaging的產生就不多說了,因為RPC的呼叫在各個專案中都有,以前各個專案分別維護一坨類似的程式碼,為了簡化工作、方便打包等,社群就把RPC相關的功能作為OpenStack的一個依賴庫。另一方面,也為後續支援非AMQP協議的訊息中介軟體(ZeroMQ)的引入打下基礎。
其實oslo.messaging庫就是把rabbitmq的python庫做了封裝,考慮到了程式設計友好、效能、可靠性、異常的捕獲等諸多因素。讓各個專案的開發者聚焦於業務程式碼的編寫,而不用考慮訊息如何傳送和接收。這對於各個專案開發者來說當然是好事,但對於一套OpenStack系統的運維人員來說,封裝就意味著很多細節被隱藏,為了能夠解決訊息轉發過程中出現的問題,需要再花費時間和精力去理解oslo.messaging的業務邏輯,對於本來就錯綜複雜的OpenStack核心業務來說,無疑是雪上加霜。
關於oslo.messaging的程式碼相關的介紹,網上有很多現成的文章,我也不想再花費時間書面總結(其實程式碼本身就是文件,好好讀讀程式碼即可)。但程式碼本身是枯燥的,所以,這裡我就舉例說明oslo.messaging的流程。
nova-compute重啟虛擬機器(cast呼叫)
在nova-compute重啟的時候,系統初始化過程中,如果判斷到主機上有虛擬機器需要reboot,nova-compute會有如下呼叫(這個例子中的RPC呼叫方式其實是一個bug,已被我的同事修復,https://review.openstack.org/#/c/170110/):
self.compute_rpcapi.reboot_instance(context, instance, block_device_info=None, reboot_type=reboot_type)
這裡的compute_rpcapi
就是一個呼叫oslo.messaging庫的客戶端:
VERSION_ALIASES ={
'icehouse':'3.23',
'juno':'3.35',
}
def __init__(self):
super(ComputeAPI,self).__init__()
target = messaging.Target(topic=CONF.compute_topic, version='3.0')
version_cap =self.VERSION_ALIASES.get(CONF.upgrade_levels.compute,
CONF.upgrade_levels.compute)
serializer = objects_base.NovaObjectSerializer()
self.client=self.get_client(target, version_cap, serializer)
def get_client(self, target, version_cap, serializer):
return rpc.get_client(target,
version_cap=version_cap,
serializer=serializer)
......
def get_client(target, version_cap=None, serializer=None):
assert TRANSPORT isnotNone
serializer =RequestContextSerializer(serializer)
return messaging.RPCClient(TRANSPORT,
target,
version_cap=version_cap,
serializer=serializer)
這裡有幾個概念:
target:作為訊息傳送者,需要在target中指定訊息要傳送到的exchange, binding-key, consumer等資訊(這些概念可能與target物件屬性不一樣)
serializer:負責訊息的序列化處理。就是負責把Nova中的物件轉換成可以在網路中傳送的格式。
TRANSPORT:處理訊息傳送的抽象層。根據rpc_backend
的配置確定真正處理訊息傳送的driver。一般我們會用到這個:rabbit = oslo_messaging._drivers.impl_rabbit:RabbitDriver
。對於RabbitDriver,其相關配置項都在/oslo_messaging/_drivers/impl_rabbit.py
中,它內部會維護一個connection
pool,管理Connection物件。
此時,我們知道,一個messaging客戶端的初始化,可以確定這麼幾個事情:訊息發到哪?訊息由誰來發?訊息如何做序列化?……但是,我們還缺一個最重要的,訊息在哪?
別急,這只是一個RPC客戶端的初始化過程……
我們接著看nova-compute通過RPC重啟虛擬機器:
def reboot_instance(self, ctxt, instance, block_device_info,
reboot_type):
version ='3.0'
cctxt =self.client.prepare(server=_compute_host(None, instance),
version=version)
cctxt.cast(ctxt,'reboot_instance',
instance=instance,
block_device_info=block_device_info,
reboot_type=reboot_type)
解釋一下:
1、RPCClient的prepare函式,是根據客戶端的需求,對target重新配置一下,比如這裡,更新了客戶端中的server、version屬性(server就對應於RabbitMQ中的consumer,也就是一個計算節點)。說明該請求是針對某個特定的consumer,而不是把訊息扔到queue裡,隨機選擇consumer處理。該函式返回一個RPCClient的包裝類(_CallContext
)物件。
2、cast。熟悉OpenStack訊息佇列的都知道,RPC訊息傳送分為同步和非同步,這個背景知識我也不再贅述。這裡的cast就是傳送一個非同步訊息。
def cast(self, ctxt, method,**kwargs):
"""Invoke a method and return immediately. See RPCClient.cast()."""
msg =self._make_message(ctxt, method, kwargs)
ctxt =self.serializer.serialize_context(ctxt)
ifself.version_cap:
self._check_version_cap(msg.get('version'))
try:
self.transport._send(self.target, ctxt, msg,retry=self.retry)
except driver_base.TransportDriverErroras ex:
raiseClientSendError(self.target, ex)
3、訊息組裝。msg就是要傳送的訊息體,裡面都有啥呢?幾個部分:
method,接收端函式的名稱;
args,接收端函式的引數,引數會做序列化處理;
namespace,就是target中的namespace屬性;
version,target中的version,這個version要與之前的version_cap
相容,major要一致,minor要小於或等於version_cap
,即:rpcapi中的函式版本不能大於該客戶端version_cap
指定的版本;這個version很大的作用是為了升級時版本相容,具體可參見團隊一個同事關於升級的一篇部落格:這裡
4、訊息的傳送。這裡呼叫了上述TRANSPORT的方法:
def _send(self, target, ctxt, message,
wait_for_reply=None, timeout=None,
envelope=True, notify=False,retry=None):
classContext(object):
def __init__(self, d):
self.d= d
def to_dict(self):
returnself.d
context =Context(ctxt)
msg = message
if wait_for_reply:
msg_id = uuid.uuid4().hex
msg.update({'_msg_id': msg_id})
LOG.debug('MSG_ID is %s', msg_id)
msg.update({'_reply_q':self._get_reply_q()})
rpc_amqp._add_unique_id(msg)
rpc_amqp.pack_context(msg, context)
if envelope:
msg = rpc_common.serialize_msg(msg)
if wait_for_reply:
self._waiter.listen(msg_id)
try:
withself._get_connection(rpc_amqp.PURPOSE_SEND)as conn:
if notify:
conn.notify_send(self._get_exchange(target),
target.topic, msg,retry=retry)
elif target.fanout:
conn.fanout_send(target.topic, msg,retry=retry)
else:
topic = target.topic
if target.server:
topic ='%s.%s'%(target.topic, target.server)
conn.topic_send(exchange_name=self._get_exchange(target),
topic=topic, msg=msg, timeout=timeout,
retry=retry)
if wait_for_reply:
result =self._waiter.wait(msg_id, timeout)
if isinstance(result,Exception):
raise result
return result
finally:
if wait_for_reply:
self._waiter.unlisten(msg_id)
程式碼有點多,幾點解釋:
1、對於cast呼叫,沒有wait_for_reply
,沒有timeout
,沒有notify
,也說明cast方式不需要等待consumer的消費和返回值。
2、給msg訊息隨機生成一個唯一標識:_unique_id
,並且將context中的資訊扔到msg裡。然後重新組裝msg成如下格式:
{
'oslo.version':2.0(目前是)
'oslo.message':原始msg
}
這就是最終在RabbitMQ中傳遞的訊息體。
3、從上述提到的connection pool中獲取一個Connection物件,呼叫其topic_send
方法,注意裡面的引數,指定了exchange的名稱、binding-key、訊息體等引數。
4、Connection物件,前面提到,但沒細講。這個物件封裝了對RabbitMQ的連線。
5、topic_send
方法,就是初始化一個TopicPublisher物件,發訊息發到RabbitMQ的broker中。 對於RabbitDriver來說,有很多類似於TopicPublisher的類,比如:DirectPublisher、FanoutPublisher,它們都是繼承自Publisher(裡面方法就是對RabbitMQ python庫的直接呼叫),但表示不同的訊息傳送行為(從名字就能看出來)。但沒有本質區別,只是不同型別的exchange屬性不同而已。從topic_send
方法也能看出來oslo.messaging在RabbitMQ
python庫自身訊息傳送之外,做了兩個事兒:一是重試機制,二是對異常的捕獲。這也是RabbitMQ tutorial中RPC章節的最後所建議的。
OK,到這裡,一個cast訊息的傳送就結束了。本來說不講oslo.messaging的程式碼實現,但發現繞開程式碼講實現有點耍流氓,所以還是貼了很多程式碼。但對於oslo.messaging的學習,不能鑽進程式碼太深(其實對於任何專案的學習都是這樣),到每一個關鍵步驟,最好適時的從程式碼中出來,從全域性視角再思考一下整個流程,會對整個機制的理解更深。
call呼叫
講完了cast,那麼順便說一下call訊息傳送的過程。call和cast很像,區別在於:
1、call需要等待consumer處理結束,拿到返回值;
2、call需要考慮超時和捕獲異常;
3、對返回訊息做反序列化處理;
具體實現中的區別在於:
1、在訊息傳送之前,給msg增加兩個屬性:_msg_id
和_reply_q
,前者是隨機生成(注意要跟unique id區分開來),用於區分不同傳送端傳送的訊息,後者是為了給consumer提示,處理結果該發給誰,其實就是傳送端維護的臨時exchange和queue,用於獲取返回的訊息。
2、傳送端會建立一個全域性的ReplyWaiter物件(在每個傳送端共用),用於監聽上述臨時的queue,對收到的訊息進行處理(它本身就是接收到訊息的callback)。
def __call__(self, message):
message.acknowledge()
incoming_msg_id = message.pop('_msg_id',None)
self.waiters.put(incoming_msg_id, message)
可以看到首先發送ack,然後把訊息儲存起來。ReplyWaiter全域性物件維護著每個msg的id和返回結果,每個傳送端只關心與自己相關的msg。返回結果中可能包含的欄位:failure、ending、result,注意,每個傳送端都會有超時處理和重複訊息處理。
關於call呼叫有一張經典的圖:
接收端,以nova-compute為例
上面講了那麼多,都是作為RPC客戶端的行為,RPC客戶端的呼叫,需要RPC服務端提供介面服務。在nova-compute服務初始化方法中(其他作為接收端的服務也類似):
target = messaging.Target(topic=self.topic, server=self.host)
endpoints =[
self.manager,
baserpc.BaseRPCAPI(self.manager.service_name,self.backdoor_port)
]
endpoints.extend(self.manager.additional_endpoints)
serializer = objects_base.NovaObjectSerializer()
self.rpcserver= rpc.get_server(target, endpoints, serializer)
self.rpcserver.start()
......
def get_server(target, endpoints, serializer=None):
assert TRANSPORT isnotNone
serializer =RequestContextSerializer(serializer)
return messaging.get_rpc_server(TRANSPORT,
target,
endpoints,
executor='eventlet',
serializer=serializer)
幾點解釋:
1、作為服務端,要在target中定義topic和server,同樣需要提供serializer、TRANSPORT、target;
2、endpoints,作為接收端,訊息來了如何處理?endpoints就是訊息最終處理者,endpoint本身是可呼叫的,nova-compute本身就是一個endpoint;
3、executor,見下述描述。
transport, dispatcher, executor三者有啥區別呢?引用一句程式碼註釋:
Connect a transport to a dispatcher that knows how to process the message using an executor that knows how the app wants to create new tasks.
通俗一點講,executor確定接收訊息的執行緒模型,transport負責在訊息中介軟體層面接收訊息,dispatcher負責最終的訊息處理。都是為了程式碼邏輯而抽象出來的概念。
一般我們用的executor是eventlet,executor的工作就兩件事兒:1、取訊息;2、處理訊息。
取訊息
在取訊息之前,transport有一個listen的操作:
def listen(self, target):
conn =self._get_connection(rpc_amqp.PURPOSE_LISTEN)
listener =AMQPListener(self, conn)
conn.declare_topic_consumer(exchange_name=self._get_exchange(target),
topic=target.topic,
callback=listener)
conn.declare_topic_consumer(exchange_name=self._get_exchange(target),
topic='%s.%s'%(target.topic,
target.server),
callback=listener)
conn.declare_fanout_consumer(target.topic, listener)
return listener
可以看到,新建了三個exchange(2個是topic型別,1個是fanout型別)和繫結的佇列,並在佇列上監聽訊息,訊息的處理是AMQPListener物件。AMQPListener的處理也很簡單,不停的從佇列中取訊息,對訊息做去重、解封裝(把一些屬性抽取出來)後,最終構造成一個AMQPIncomingMessage型別的訊息加入記憶體,等待被處理。
處理訊息
在dispatcher處理訊息時,首先發送ack,然後根據msg中的method、args、namespace、version等資訊(都是上述我們已經熟悉的東西),選擇合適的endpoint處理。什麼叫合適的endpoint呢?
- endpoint的target屬性中namespace、version是否與訊息相容;
- endpoint中是否有對應的method;
endpoint本身是可呼叫的,後續就是訊息的反序列化、呼叫、序列化(準備返回給呼叫者)。
OK,我們還有最後一點沒提及,訊息如何返回給呼叫者(對於call操作)。根據前面的知識,我們知道call的呼叫者們會監聽一個全域性的queue,並且在msg中也給了足夠的hints(msg_id
和reply_q
),最為接收者,訊息的返回就很簡單了。
def _send_reply(self, conn, reply=None, failure=None,
ending=False, log_failure=True):
if failure:
failure = rpc_common.serialize_remote_exception(failure,
log_failure)
msg ={'result': reply,'failure': failure}
if ending:
msg['ending']=True
rpc_amqp._add_unique_id(msg)
# If a reply_q exists, add the msg_id to the reply and pass the
# reply_q to direct_send() to use it as the response queue.
# Otherwise use the msg_id for backward compatibility.
ifself.reply_q:
msg['_msg_id']=self.msg_id
conn.direct_send(self.reply_q, rpc_common.serialize_msg(msg))
else:
conn.direct_send(self.msg_id, rpc_common.serialize_msg(msg))