1. 程式人生 > >[RabbitMQ+Python入門經典] 兔子和兔子窩

[RabbitMQ+Python入門經典] 兔子和兔子窩

作為一個工業級的訊息佇列伺服器,在其Python段當中推薦了,作為RabbitMQ+Python的入門手冊再合適不過了。不過,正如其標題Rabbit and Warrens(兔子和養兔場)一樣,這篇英文寫的相當俏皮,以至於對於我等非英文讀者來說不像一般的技術文件那麼好懂,所以,翻譯一下吧。翻譯過了,希望其他人可以少用一些時間。翻譯水平有限,不可能像原文一樣俏皮,部分地方可能就意譯了,希望以容易懂為準。想看看老外的幽默的,推薦去看原文,其實,也不是那麼難理解……

兔子和兔子窩

當時我們的動機很簡單:從生產環境的電子郵件處理流程當中分支出一個特定的離線分析流程。我們開始用的MySQL,將要處理的東西放在表裡面,另一個程式從中取。不過很快,這種設計的醜陋之處就顯現出來了

…… 你想要多個程式從一個隊列當中取資料來處理?沒問題,我們硬編碼程式的個數好了……什麼?還要能夠允許程式動態地增加和減少的時候動態進行壓力分配?

是的,當年我們想的簡單的東西(做一個分支處理)逐漸變成了一個棘手的問題。以前拿著錘子(MySQL)看所有東西都是釘子(表)的年代是多麼美好……

在搜尋了一下之後,我們走進了訊息佇列(message queue)的大門。不不,我們當然知道訊息佇列是什麼,我們可是以做電子郵件程式謀生的。我們實現過各種各樣的專業的,高速的記憶體佇列用來做電子郵件處理。我們不知道的是那一大類現成的、通用的訊息佇列(MQ)伺服器——無論是用什麼語言寫出的,不需要複雜的裝配的,可以自然的在網路上的應用程式之間傳送資料的一類程式。不用我們自己寫?看看再說。

讓大家看看你們的Queue……

過去的4年裡,人們寫了有好多好多的開源的MQ伺服器啊。其中大多數都是某公司例如LiveJournal寫出來用來解決特定問題的。它們的確不關心上面跑的是什麼型別的訊息,不過他們的設計思想通常是和建立者息息相關的(訊息的持久化,崩潰恢復等通常不在他們考慮範圍內)。不過,有三個專門設計用來做及其靈活的訊息佇列的程式值得關注:

Apache ActiveMQ 曝光率最高,不過看起來它有些問題,可能會造成丟訊息。不可接受,下一個。

ZeroMQ RabbitMQ 都支援一個開源的訊息協議,成為AMQPAMQP的一個優點是它是一個靈活和開放的協議,以便和另外兩個商業化的

Message Queue IBMTibco)競爭,很好。不過ZeroMQ不支援訊息持久化和崩潰恢復,不太好。剩下的只有RabbitMQ了。如果你不在意訊息持久化和崩潰恢復,試試ZeroMQ吧,延遲很低,而且支援靈活的拓撲。

剩下的只有這個吃胡蘿蔔的傢伙了……

當我讀到它是用Erlang寫的時候,RabbitMQ震了我一下。是愛立信開發的高度並行的語言,用來跑在電話交換機上。是的,那些要求69的線上時間的東西。在Erlang當中,充斥著大量輕量程序,它們之間用訊息傳遞來通訊。聽起來思路和我們用訊息佇列的思路是一樣的,不是麼?

而且,RabbitMQ支援持久化。是的,如果RabbitMQ死掉了,訊息並不會丟失,當佇列重啟,一切都會回來。而且,正如在DigiTar(注:原文作者的公司)做事情期望的那樣,它。除此之外,RabbitMQ的文件相當的……恐怖。如果你懂AMQP,這些文件還好,但是有多少人懂AMQP?這些文件就像MySQL的文件假設你已經懂了SQL一樣……不過沒關係啦。

好了,廢話少說。這裡是花了一週時間閱讀關於AMQP和關於它如何在RabbitMQ上工作的文件之後的一個總結,還有,怎麼在Python當中使用。

開始吧

AMQP當中有四個概念非常重要:虛擬主機(virtual host),交換機(exchange),佇列(queue)和繫結(binding)。一個虛擬主機持有一組交換機、佇列和繫結。為什麼需要多個虛擬主機呢?很簡單,RabbitMQ當中,使用者只能在虛擬主機的粒度進行許可權控制。因此,如果需要禁止A組訪問B組的交換機/佇列/繫結,必須為AB分別建立一個虛擬主機。每一個RabbitMQ伺服器都有一個預設的虛擬主機“/”。如果這就夠了,那現在就可以開始了。

交換機,佇列,還有繫結……天哪!

剛開始我思維的列車就是在這裡脫軌的…… 這些鬼東西怎麼結合起來的?

佇列(Queues)是你的訊息(messages)的終點,可以理解成裝訊息的容器。訊息就一直在裡面,直到有客戶端(也就是消費者,Consumer)連線到這個佇列並且將其取走為止。不過。你可以將一個佇列配置成這樣的:一旦訊息進入這個佇列,biu~,它就煙消雲散了。這個有點跑題了……

需要記住的是,佇列是由消費者(Consumer)通過程式建立的,不是通過配置檔案或者命令列工具。這沒什麼問題,如果一個消費者試圖建立一個已經存在的佇列,RabbitMQ就會起來拍拍他的腦袋,笑一笑,然後忽略這個請求。因此你可以將訊息佇列的配置寫在應用程式的程式碼裡面。這個概念不錯。

OK,你已經建立並且連線到了你的佇列,你的消費者程式正在百無聊賴的敲著手指等待訊息的到來,敲啊,敲啊…… 沒有訊息。發生了什麼?你當然需要先把一個訊息放進佇列才行。不過要做這個,你需要一個交換機(Exchange……

交換機可以理解成具有路由表的路由程式,僅此而已。每個訊息都有一個稱為路由鍵(routing key)的屬性,就是一個簡單的字串。交換機當中有一系列的繫結(binding),即路由規則(routes),例如,指明具有路由鍵 “X” 的訊息要到名為timbuku的隊列當中去。先不討論這個,我們有點超前了。

你的消費者程式要負責建立你的交換機(複數)。啥?你是說你可以有多個交換機?是的,這個可以有,不過為啥?很簡單,每個交換機在自己獨立的程序當中執行,因此增加多個交換機就是增加多個程序,可以充分利用伺服器上的CPU核以便達到更高的效率。例如,在一個8核的伺服器上,可以建立5個交換機來用5個核,另外3個核留下來做訊息處理。類似的,在RabbitMQ的叢集當中,你可以用類似的思路來擴充套件交換機一邊獲取更高的吞吐量。

OK,你已經建立了一個交換機。但是他並不知道要把訊息送到哪個佇列。你需要路由規則,即繫結(binding)。一個繫結就是一個類似這樣的規則:將交換機“desert(沙漠)當中具有路由鍵阿里巴巴的訊息送到佇列“hideout(山洞)裡面去。換句話說,一個繫結就是一個基於路由鍵將交換機和佇列連線起來的路由規則。例如,具有路由鍵“audit”的訊息需要被送到兩個佇列,“log-forever”“alert-the-big-dude”。要做到這個,就需要建立兩個繫結,每個都連線一個交換機和一個佇列,兩者都是由“audit”路由鍵觸發。在這種情況下,交換機會複製一份訊息並且把它們分別傳送到兩個隊列當中。交換機不過就是一個由繫結構成的路由表。

現在複雜的東西來了:交換機有多種型別。他們都是做路由的,不過接受不同型別的繫結。為什麼不建立一種交換機來處理所有型別的路由規則呢?因為每種規則用來做匹配分子的CPU開銷是不同的。例如,一個“topic”型別的交換機試圖將訊息的路由鍵與類似dogs.*的模式進行匹配。匹配這種末端的萬用字元比直接將路由鍵與dogs比較(“direct”型別的交換機)要消耗更多的CPU。如果你不需要“topic”型別的交換機帶來的靈活性,你可以通過使用“direct”型別的交換機獲取更高的處理效率。那麼有哪些型別,他們又是怎麼處理的呢?

Fanout Exchange不處理路由鍵。你只需要簡單的將佇列繫結到交換機上。一個傳送到交換機的訊息都會被轉發到與該交換機繫結的所有佇列上。很像子網廣播,每臺子網內的主機都獲得了一份複製的訊息。Fanout交換機轉發訊息是最快的。

Direct Exchange處理路由鍵。需要將一個佇列繫結到交換機上,要求該訊息與一個特定的路由鍵完全匹配。這是一個完整的匹配。如果一個佇列繫結到該交換機上要求路由鍵 “dog”,則只有被標記為dog的訊息才被轉發,不會轉發dog.puppy,也不會轉發dog.guard,只會轉發dog

Topic Exchange將路由鍵和某模式進行匹配。此時佇列需要繫結要一個模式上。符號“#”匹配一個或多個詞,符號“*”匹配不多不少一個詞。因此“audit.#”能夠匹配到audit.irs.corporate,但是audit.*只會匹配到audit.irs。我在RedHat的朋友做了一張不錯的圖,來表明topic交換機是如何工作的:

持久化這些小東西們

你花了大量的時間來建立佇列、交換機和繫結,然後,砰~伺服器程式掛了。你的佇列、交換機和繫結怎麼樣了?還有,放在佇列裡面但是尚未處理的訊息們呢?

放鬆~如果你是用預設引數構造的這一切的話,那麼,他們,都,biu~,灰飛煙滅了。是的,RabbitMQ重啟之後會乾淨的像個新生兒。你必須重做所有的一切,亡羊補牢,如何避免將來再度發生此類杯具?

佇列和交換機有一個建立時候指定的標誌durable,直譯叫做堅固的。durable的唯一含義就是具有這個標誌的佇列和交換機會在重啟之後重新建立,它不表示說在隊列當中的訊息會在重啟後恢復。那麼如何才能做到不只是佇列和交換機,還有訊息都是持久的呢?

但是首先一個問題是,你真的需要訊息是持久的嗎?對於一個需要在重啟之後回覆的訊息來說,它需要被寫入到磁碟上,而即使是最簡單的磁碟操作也是要消耗時間的。如果和訊息的內容相比,你更看重的是訊息處理的速度,那麼不要使用持久化的訊息。不過對於我們@DigiTar來說,持久化很重要。

當你將訊息釋出到交換機的時候,可以指定一個標誌“Delivery Mode”(投遞模式)。根據你使用的AMQP的庫不同,指定這個標誌的方法可能不太一樣(我們後面會討論如何用Python搞定)。簡單的說,就是將Delivery Mode設定成2,也就是持久的(persistent)即可。一般的AMQP庫都是將Delivery Mode設定成1,也就是非持久的。所以要持久化訊息的步驟如下:

1.將交換機設成 durable

2.將佇列設成 durable

3.將訊息的 Delivery Mode 設定成2

就這樣,不是很複雜,起碼沒有造火箭複雜,不過也有可能犯點小錯誤。

下面還要羅嗦一個東西……繫結(Bindings)怎麼辦?我們無法在建立繫結的時候設定成durable。沒問題,如果你綁定了一個durable的佇列和一個durable的交換機,RabbitMQ會自動保留這個繫結。類似的,如果刪除了某個佇列或交換機(無論是不是durable),依賴它的繫結都會自動刪除。

注意兩點:

·RabbitMQ 不允許你繫結一個非堅固(non-durable)的交換機和一個durable的佇列。反之亦然。要想成功必須佇列和交換機都是durable的。

·一旦建立了佇列和交換機,就不能修改其標誌了。例如,如果建立了一個non-durable的佇列,然後想把它改變成durable的,唯一的辦法就是刪除這個佇列然後重現建立。因此,最好仔細檢查建立的標誌。

開始喂蛇了~

【譯註】說喂蛇是因為Python的圖示是條蛇。

AMQP的一個空白地帶是如何在Python當中使用。對於其他語言有一大坨材料。

但是對Python老兄來說,你需要花點時間來挖掘一下。所以我寫了這個,這樣別的傢伙們就不需要經歷我這種抓狂的過程了。

首先,我們需要一個PythonAMQP庫。有兩個可選:

·使用框架的AMQP庫,因此允許非同步I/O

根據你的需求,py-amqplib或者txAMQP都是可以的。因為是基於Twisted的,txAMQP可以保證用非同步IO構建超高效能的AMQP程式。但是Twisted程式設計本身就是一個很大的主題……因此清晰起見,我們打算用 py-amqplib更新:請參見Esteve Fernandez關於txAMQP的使用和程式碼樣例的回覆

AMQP支援在一個TCP連線上啟用多個MQ通訊channel,每個channel都可以被應用作為通訊流。每個AMQP程式至少要有一個連線和一個channel

 

每個channel都被分配了一個整數標識,自動由Connection()類的.channel()方法維護。或者,你可以使用.channel(x)來指定channel標識,其中x是你想要使用的channel標識。通常情況下,推薦使用.channel()方法來自動分配channel標識,以便防止衝突。

現在我們已經有了一個可以用的連線和channel。現在,我們的程式碼將分成兩個應用,生產者(producer)和消費者(consumer)。我們先建立一個消費者程式,他會建立一個叫做“po_box”的佇列和一個叫“sorting_room”的交換機:

 

這段程式碼幹了啥?首先,它建立了一個名叫po_box的佇列,它是durable的(重啟之後會重新建立),並且最後一個消費者斷開的時候不會自動刪除(auto_delete=False)。在建立durable的佇列(或者交換機)的時候,將auto_delete設定成false是很重要的,否則佇列將會在最後一個消費者斷開的時候消失,與durable與否無關。如果將durableauto_delete都設定成True,只有尚有消費者活動的佇列可以在RabbitMQ意外崩潰的時候自動恢復。

(你可以注意到了另一個標誌,稱為“exclusive”。如果設定成True,只有建立這個佇列的消費者程式才允許連線到該佇列。這種佇列對於這個消費者程式是私有的)。

還有另一個交換機宣告,建立了一個名字叫“sorting_room”的交換機。auto_deletedurable的含義和佇列是一樣的。但是,.excange_declare() 還有另外一個引數叫做type,用來指定要建立的交換機的型別(如前面列出的): fanout, direct  topic.

到此為止,你已經有了一個可以接收訊息的佇列和一個可以傳送訊息的交換機。不過我們需要建立一個繫結,把它們連線起來。

chan.queue_bind(queue=”po_box”, exchange=”sorting_room”,
routing_key=”jason”)

這個繫結的過程非常直接。任何送到交換機sorting_room的具有路由鍵jason的訊息都被路由到名為po_box的佇列。

現在,你有兩種方法從隊列當中取出訊息。第一個是呼叫chan.basic_get(),主動從隊列當中拉出下一個訊息(如果隊列當中沒有訊息,chan.basic_get()會返回None因此下面程式碼當中print msg.body 會在沒有訊息的時候崩掉):

 

但是如果你想要應用程式在訊息到達的時候立即得到通知怎麼辦?這種情況下不能使用chan.basic_get(),你需要用chan.basic_consume()註冊一個新訊息到達的回撥。

 

chan.wait()放在一個無限迴圈裡面,這個函式會等待在佇列上,直到下一個訊息到達佇列。chan.basic_cancel()用來登出該回調函式。引數consumer_tag當中指定的字串和chan.basic_consume()註冊的一直。在這個例子當中chan.basic_cancel()不會被呼叫到,因為上面是個無限迴圈…… 不過你需要知道這個呼叫,所以我把它放在了程式碼裡。

需要注意的另一個東西是no_ack引數。這個引數可以傳給chan.basic_get()chan.basic_consume(),預設是false。當從隊列當中取出一個訊息的時候,RabbitMQ需要應用顯式地回饋說已經獲取到了該訊息。如果一段時間內不回饋,RabbitMQ會將該訊息重新分配給另外一個繫結在該佇列上的消費者。另一種情況是消費者斷開連線,但是獲取到的訊息沒有回饋,則RabbitMQ同樣重新分配。如果將no_ack引數設定為true,則py-amqplib會為下一個AMQP請求新增一個no_ack屬性,告訴AMQP伺服器不需要等待回饋。但是,大多數時候,你也許想要自己手工傳送回饋,例如,需要在回饋之前將訊息存入資料庫。回饋通常是通過呼叫chan.basic_ack()方法,使用訊息的delivery_tag屬性作為引數。參見chan.basic_get()的例項程式碼。

不過沒有人傳送訊息的話,要消費者何用?所以需要一個生產者。下面的程式碼示例表明如何將一個簡單訊息傳送到交換區sorting_room,並且標記為路由鍵jason

 

你也許注意到我們設定訊息的delivery_mode屬性為2,因為佇列和交換機都設定為durable的,這個設定將保證訊息能夠持久化,也就是說,當它還沒有送達消費者之前如果RabbitMQ重啟則它能夠被恢復。

剩下的最後一件事情(生產者和消費者都需要呼叫的)是關閉channel和連線:

chan.close()

conn.close()

來真實地跑一下吧……

現在我們已經寫好了生產者和消費者,讓他們跑起來吧。假設你的RabbitMQlocalhost上安裝並且執行。

開啟一個終端,執行python ./amqp_consumer.py讓消費者執行,並且建立佇列、交換機和繫結。

然後在另一個終端執行python ./amqp_publisher.py “AMQP rocks.”。如果一切良好,你應該能夠在第一個終端看到輸出的訊息。

付諸使用吧

我知道這個教程是非常粗淺的關於AMQP/RabbitMQ和如何使用Python訪問的教程。希望這個可以說明所有的概念如何在Python當中被組合起來。如果你發現任何錯誤,請聯絡原作者【譯註:如果是翻譯問題請聯絡譯者】。同時,我很高興回答我知道的問題。【譯註:譯者也是一樣的】。接下來是,叢集化(clustering)!不過我需要先把它弄懂再說。

注:關於RabbitMQ的知識我主要來自這些來源,推薦閱讀: