ZeroMQ 中文指南 第三章 高階請求-應答模式【轉載】
作者資訊如下。
ZMQ 指南
作者: Pieter Hintjens [email protected], CEO iMatix Corporation.
翻譯: 張吉 [email protected], 安居客集團 好租網工程師
NOTE: 此翻譯涵蓋2011年10月份的ZMQ穩定版本,即2.1.0 stable release。但讀者仍然可以通過此文了解ZMQ的一些基本概念和哲學。
第三章 高階請求-應答模式
在第二章中我們通過開發一系列的小應用來熟悉ØMQ的基本使用方法,每個應用會引入一些新的特性。本章會沿用這種方式,來探索更多建立在ØMQ請求-應答模式之上的高階工作模式。
本章涉及的內容有:
- 在請求-應答模式中建立和使用訊息信封
- 使用REQ、REP、DEALER和ROUTER套接字
- 使用標識來手工指定應答目標
- 使用自定義離散路由模式
- 使用自定義最近最少使用路由模式
- 構建高層訊息封裝類
- 構建基本的請求應答代理
- 合理命名套接字
- 模擬client-worker叢集
- 構建可擴充套件的請求-應答叢集雲
- 使用管道套接字監控執行緒
Request-Reply Envelopes
在請求-應答模式中,信封裡儲存了應答目標的位置。這就是為什麼ØMQ網路雖然是無狀態的,但仍能完成請求-應答的過程。
在一般使用過程中,你並不需要知道請求-應答信封的工作原理。使用REQ、REP時,ØMQ會自動處理訊息信封。下一章講到的裝置(device),使用時也只需保證讀取和寫入所有的資訊即可。ØMQ使用多段訊息的方式來儲存信封,所以在複製訊息時也會複製信封。
然而,在使用高階請求-應答模式之前是需要了解信封這一機制的,以下是信封機制在ROUTER中的工作原理:
- 從ROUTER中讀取一條訊息時,ØMQ會包上一層信封,上面註明了訊息的來源。
- 向ROUTER寫入一條訊息時(包含信封),ØMQ會將信封拆開,並將訊息遞送給相應的物件。
如果將從ROUTER A中獲取的訊息(包含信封)寫入ROUTER B(即將訊息傳送給一個DEALER,該DEALER連線到了ROUTER),那麼在從ROUTER B中獲取該訊息時就會包含兩層信封。
信封機制的根本作用是讓ROUTER知道如何將訊息遞送給正確的應答目標,你需要做的就是在程式中保留好該信封。回顧一下REP套接字,它會將收到訊息的信封逐個拆開,將訊息本身傳送給應用程式。而在傳送時,又會在訊息外層包裹該信封,傳送給ROUTER,從而傳遞給正確的應答目標。
我們可以使用上述原理建立起一個ROUTER-DEALER裝置:
[REQ] <--> [REP]
[REQ] <--> [ROUTER--DEALER] <--> [REP]
[REQ] <--> [ROUTER--DEALER] <--> [ROUTER--DEALER] <--> [REP]
...etc.
當你用REQ套接字去連線ROUTER套接字,併發送一條請求訊息,你會從ROUTER中獲得一條如下所示的訊息:
- 第三幀是應用程式傳送給REQ套接字的訊息;
- 第二幀的空資訊是REQ套接字在傳送訊息給ROUTER之前新增的;
- 第一幀即信封,是由ROUTER套接字新增的,記錄了訊息的來源。
如果我們在一條裝置鏈路上傳遞該訊息,最終會得到包含多層信封的訊息。最新的信封會在訊息的頂部。
以下將詳述我們在請求-應答模式中使用到的四種套接字型別:
DEALER是一種負載均衡,它會將訊息分發給已連線的節點,並使用公平佇列的機制處理接受到的訊息。DEALER的作用就像是PUSH和PULL的結合。
REQ傳送訊息時會在訊息頂部插入一個空幀,接受時會將空幀移去。其實REQ是建立在DEALER之上的,但REQ只有當訊息傳送並接受到迴應後才能繼續執行。
ROUTER在收到訊息時會在頂部新增一個信封,標記訊息來源。傳送時會通過該信封決定哪個節點可以獲取到該條訊息。
REP在收到訊息時會將第一個空幀之前的所有資訊儲存起來,將原始資訊傳送給應用程式。在傳送訊息時,REP會用剛才儲存的資訊包裹應答訊息。REP其實是建立在ROUTER之上的,但和REQ一樣,必須完成接受和傳送這兩個動作後才能繼續。
REP要求訊息中的信封由一個空幀結束,所以如果你沒有用REQ傳送訊息,則需要自己在訊息中新增這個空幀。
你肯定會問,ROUTER是怎麼標識訊息的來源的?答案當然是套接字的標識。我們之前講過,一個套接字可能是瞬時的,它所連線的套接字(如ROUTER)則會給它生成一個標識,與之相關聯。一個套接字也可以顯式地給自己定義一個標識,這樣其他套接字就可以直接使用了。
這是一個瞬時的套接字,ROUTER會自動生成一個UUID來標識訊息的來源。
這是一個持久的套接字,標識由訊息來源自己指定。
下面讓我們在例項中觀察上述兩種操作。下列程式會打印出ROUTER從兩個REP套接字中獲得的訊息,其中一個沒有指定標識,另一個指定了“Hello”作為標識。
identity.c
//
// 以下程式演示瞭如何在請求-應答模式中使用套接字標識。
// 需要注意的是s_開頭的函式是由zhelpers.h提供的。
// 我們沒有必要重複編寫那些程式碼。
//
#include "zhelpers.h"
int main (void)
{
void *context = zmq_init (1);
void *sink = zmq_socket (context, ZMQ_ROUTER);
zmq_bind (sink, "inproc://example");
// 第一個套接字由0MQ自動設定標識
void *anonymous = zmq_socket (context, ZMQ_REQ);
zmq_connect (anonymous, "inproc://example");
s_send (anonymous, "ROUTER uses a generated UUID");
s_dump (sink);
// 第二個由自己設定
void *identified = zmq_socket (context, ZMQ_REQ);
zmq_setsockopt (identified, ZMQ_IDENTITY, "Hello", 5);
zmq_connect (identified, "inproc://example");
s_send (identified, "ROUTER socket uses REQ's socket identity");
s_dump (sink);
zmq_close (sink);
zmq_close (anonymous);
zmq_close (identified);
zmq_term (context);
return 0;
}
執行結果:
----------------------------------------
[017] 00314F043F46C441E28DD0AC54BE8DA727
[000]
[026] ROUTER uses a generated UUID
----------------------------------------
[005] Hello
[000]
[038] ROUTER socket uses REQ's socket identity
自定義請求-應答路由
我們已經看到ROUTER套接字是如何使用信封將訊息傳送給正確的應答目標的,下面我們從一個角度來定義ROUTER:在傳送訊息時使用一定格式的信封提供正確的路由目標,ROUTER就能夠將該條訊息非同步地傳送給對應的節點。
所以說ROUTER的行為是完全可控的。在深入理解這一特性之前,讓我們先近距離觀察一下REQ和REP套接字,賦予他們一些鮮活的角色:
- REQ是一個“媽媽”套接字,不會耐心聽別人說話,但會不斷地丟擲問題尋求解答。REQ是嚴格同步的,它永遠位於訊息鏈路的請求端;
- REP則是一個“爸爸”套接字,只會回答問題,不會主動和別人對話。REP也是嚴格同步的,並一直位於應答端。
關於“媽媽”套接字,正如我們小時候所經歷的,只能等她向你開口時你們才能對話。媽媽不像爸爸那麼開明,也不會像DEALER套接字一樣接受模稜兩可的回答。所以,想和REQ套接字對話只有等它主動發出請求後才行,之後它就會一直等待你的回答,不管有多久。
“爸爸”套接字則給人一種強硬、冷漠的感覺,他只做一件事:無論你提出什麼問題,都會給出一個精確的回答。不要期望一個REP套接字會主動和你對話或是將你倆的交談傳達給別人,它不會這麼做的。
我們通常認為請求-應答模式一定是有來有往、有去有回的過程,但實際上這個過程是可以非同步進行的。我們只需獲得相應節點的地址,即可通過ROUTER套接字來非同步地傳送訊息。ROUTER是ZMQ中唯一一個可以定位訊息來源的套接字。
我們對請求-應答模式下的路由做一個小結:
- 對於瞬時的套接字,ROUTER會動態生成一個UUID來標識它,因此從ROUTER中獲取到的訊息裡會包含這個標識;
- 對於持久的套接字,可以自定義標識,ROUTER會如直接將該標識放入訊息之中;
- 具有顯式宣告標識的節點可以連線到其他型別的套接字;
- 節點可以通過配置檔案等機制提前獲知對方節點的標識,作出相應的處理。
我們至少有三種模式來實現和ROUTER的連線:
- ROUTER-DEALER
- ROUTER-REQ
- ROUTER-REP
每種模式下我們都可以完全掌控訊息的路由方式,但不同的模式會有不一樣的應用場景和訊息流,下一節開始我們會逐一解釋。
自定義路由也有一些注意事項:
- 自定義路由讓節點能夠控制訊息的去向,這一點有悖ØMQ的規則。使用自定義路由的唯一理由是ØMQ缺乏更多的路由演算法供我們選擇;
- 未來的ØMQ版本可能包含一些我們自定義的路由方式,這意味著我們現在設計的程式碼可能無法在新版本的ØMQ中執行,或者成為一種多餘;
- 內建的路由機制是可擴充套件的,且對裝置友好,但自定義路由就需要自己解決這些問題。
所以說自定義路由的成本是比較高的,更多情況下應當交由ØMQ來完成。不過既然我們已經講到這兒了,就繼續深入下去吧!
ROUTER-DEALER路由
ROUTER-DEALDER是一種最簡單的路由方式。將ROUTER和多個DEALER相連線,用一種合適的演算法來決定如何分發訊息給DEALER。DEALER可以是一個黑洞(只負責處理訊息,不給任何返回)、代理(將訊息轉發給其他節點)或是服務(會發送返回資訊)。
如果你要求DEALER能夠進行回覆,那就要保證只有一個ROUTER連線到DEALER,因為DEALER並不知道哪個特定的節點在聯絡它,如果有多個節點,它會做負載均衡,將訊息分發出去。但如果DEALER是一個黑洞,那就可以連線任何數量的節點。
ROUTER-DEALER路由可以用來做什麼呢?如果DEALER會將它完成任務的時間回覆給ROUTER,那ROUTER就可以知道這個DEALER的處理速度有多快了。因為ROUTER和DEALER都是非同步的套接字,所以我們要用zmq_poll()來處理這種情況。
下面例子中的兩個DEALER不會返回訊息給ROUTER,我們的路由採用加權隨機演算法:傳送兩倍多的資訊給其中的一個DEALER。
rtdealer.c
//
// 自定義ROUTER-DEALER路由
//
// 這個例項是單個程序,這樣方便啟動。
// 每個執行緒都有自己的ZMQ上下文,所以可以認為是多個程序在執行。
//
#include "zhelpers.h"
#include <pthread.h>
// 這裡定義了兩個worker,其程式碼是一樣的。
//
static void *
worker_task_a (void *args)
{
void *context = zmq_init (1);
void *worker = zmq_socket (context, ZMQ_DEALER);
zmq_setsockopt (worker, ZMQ_IDENTITY, "A", 1);
zmq_connect (worker, "ipc://routing.ipc");
int total = 0;
while (1) {
// 我們只接受到訊息的第二部分
char *request = s_recv (worker);
int finished = (strcmp (request, "END") == 0);
free (request);
if (finished) {
printf ("A received: %d\n", total);
break;
}
total++;
}
zmq_close (worker);
zmq_term (context);
return NULL;
}
static void *
worker_task_b (void *args)
{
void *context = zmq_init (1);
void *worker = zmq_socket (context, ZMQ_DEALER);
zmq_setsockopt (worker, ZMQ_IDENTITY, "B", 1);
zmq_connect (worker, "ipc://routing.ipc");
int total = 0;
while (1) {
// 我們只接受到訊息的第二部分
char *request = s_recv (worker);
int finished = (strcmp (request, "END") == 0);
free (request);
if (finished) {
printf ("B received: %d\n", total);
break;
}
total++;
}
zmq_close (worker);
zmq_term (context);
return NULL;
}
int main (void)
{
void *context = zmq_init (1);
void *client = zmq_socket (context, ZMQ_ROUTER);
zmq_bind (client, "ipc://routing.ipc");
pthread_t worker;
pthread_create (&worker, NULL, worker_task_a, NULL);
pthread_create (&worker, NULL, worker_task_b, NULL);
// 等待執行緒連線至套接字,否則我們傳送的訊息將不能被正確路由
sleep (1);
// 傳送10個任務,給A兩倍多的量
int task_nbr;
srandom ((unsigned) time (NULL));
for (task_nbr = 0; task_nbr < 10; task_nbr++) {
// 傳送訊息的兩個部分:第一部分是目標地址
if (randof (3) > 0)
s_sendmore (client, "A");
else
s_sendmore (client, "B");
// 然後是任務
s_send (client, "This is the workload");
}
s_sendmore (client, "A");
s_send (client, "END");
s_sendmore (client, "B");
s_send (client, "END");
zmq_close (client);
zmq_term (context);
return 0;
}
對上述程式碼的兩點說明:
- ROUTER並不知道DEALER何時會準備好,我們可以用訊號機制來解決,但為了不讓這個例子太過複雜,我們就用sleep(1)的方式來處理。如果沒有這句話,那ROUTER一開始發出的訊息將無法被路由,ØMQ會丟棄這些訊息。
- 需要注意的是,除了ROUTER會丟棄無法路由的訊息外,PUB套接字當沒有SUB連線它時也會丟棄傳送出去的訊息。其他套接字則會將無法傳送的訊息儲存起來,直到有節點來處理它們。
在將訊息路由給DEALER時,我們手工建立了這樣一個信封:
ROUTER套接字會移除第一幀,只將第二幀的內容傳遞給相應的DEALER。當DEALER傳送訊息給ROUTER時,只會傳送一幀,ROUTER會在外層包裹一個信封(新增第一幀),返回給我們。
如果你定義了一個非法的信封地址,ROUTER會直接丟棄該訊息,不作任何提示。對於這一點我們也無能為力,因為出現這種情況只有兩種可能,一是要送達的目標節點不復存在了,或是程式中錯誤地指定了目標地址。如何才能知道訊息會被正確地路由?唯一的方法是讓路由目標傳送一些反饋訊息給我們。後面幾章會講述這一點。
DEALER的工作方式就像是PUSH和PULL的結合。但是,我們不能用PULL或PUSH去構建請求-應答模式。
最近最少使用演算法路由(LRU模式)
我們之前講過REQ套接字永遠是對話的發起方,然後等待對方回答。這一特性可以讓我們能夠保持多個REQ套接字等待調配。換句話說,REQ套接字會告訴我們它已經準備好了。
你可以將ROUTER和多個REQ相連,請求-應答的過程如下:
- REQ傳送訊息給ROUTER
- ROUTER返回訊息給REQ
- REQ傳送訊息給ROUTER
- ROUTER返回訊息給REQ
- …
和DEALER相同,REQ只能和一個ROUTER連線,除非你想做類似多路冗餘路由這樣的事(我甚至不想在這裡解釋),其複雜度會超過你的想象並迫使你放棄的。
ROUTER-REQ模式可以用來做什麼?最常用的做法就是最近最少使用演算法(LRU)路由了,ROUTER發出的請求會讓等待最久的REQ來處理。請看示例:
//
// 自定義ROUTER-REQ路由
//
#include "zhelpers.h"
#include <pthread.h>
#define NBR_WORKERS 10
static void *
worker_task(void *args) {
void *context = zmq_init(1);
void *worker = zmq_socket(context, ZMQ_REQ);
// s_set_id()函式會根據套接字生成一個可列印的字串,
// 並以此作為該套接字的標識。
s_set_id(worker);
zmq_connect(worker, "ipc://routing.ipc");
int total = 0;
while (1) {
// 告訴ROUTER我已經準備好了
s_send(worker, "ready");
// 從ROUTER中獲取工作,直到收到結束的資訊
char *workload = s_recv(worker);
int finished = (strcmp(workload, "END") == 0);
free(workload);
if (finished) {
printf("Processed: %d tasks\n", total);
break;
}
total++;
// 隨機等待一段時間
s_sleep(randof(1000) + 1);
}
zmq_close(worker);
zmq_term(context);
return NULL;
}
int main(void) {
void *context = zmq_init(1);
void *client = zmq_socket(context, ZMQ_ROUTER);
zmq_bind(client, "ipc://routing.ipc");
srandom((unsigned) time(NULL));
int worker_nbr;
for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++) {
pthread_t worker;
pthread_create(&worker, NULL, worker_task, NULL);
}
int task_nbr;
for (task_nbr = 0; task_nbr < NBR_WORKERS * 10; task_nbr++) {
// 最近最少使用的worker就在訊息佇列中
char *address = s_recv(client);
char *empty = s_recv(client);
free(empty);
char *ready = s_recv(client);
free(ready);
s_sendmore(client, address);
s_sendmore(client, "");
s_send(client, "This is the workload");
free(address);
}
// 通知所有REQ套接字結束工作
for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++) {
char *address = s_recv(client);
char *empty = s_recv(client);
free(empty);
char *ready = s_recv(client);
free(ready);
s_sendmore(client, address);
s_sendmore(client, "");
s_send(client, "END");
free(address);
}
zmq_close(client);
zmq_term(context);
return 0;
}
在這個示例中,實現LRU演算法並沒有用到特別的資料結構,因為ØMQ的訊息佇列機制已經提供了等價的實現。一個更為實際的LRU演算法應該將已準備好的worker收集起來,儲存在一個佇列中進行分配。以後我們會講到這個例子。
程式的執行結果會將每個worker的執行次數打印出來。由於REQ套接字會隨機等待一段時間,而我們也沒有做負載均衡,所以我們希望看到的是每個worker執行相近的工作量。這也是程式執行的結果。
Processed: 8 tasks
Processed: 8 tasks
Processed: 11 tasks
Processed: 7 tasks
Processed: 9 tasks
Processed: 11 tasks
Processed: 14 tasks
Processed: 11 tasks
Processed: 11 tasks
Processed: 10 tasks
關於以上程式碼的幾點說明:
我們不需要像前一個例子一樣等待一段時間,因為REQ套接字會明確告訴ROUTER它已經準備好了。
我們使用了zhelpers.h提供的s_set_id()函式來為套接字生成一個可列印的字串標識,這是為了讓例子簡單一些。在現實環境中,REQ套接字都是匿名的,你需要直接呼叫zmq_recv()和zmq_send()來處理訊息,因為s_recv()和s_send()只能處理字串標識的套接字。
更糟的是,我們使用了隨機的標識,不要在現實環境中使用隨機標識的持久套接字,這樣做會將節點消耗殆盡。
如果你只是將上面的程式碼拷貝過來,沒有充分理解,那你就像是看到蜘蛛人從屋頂上飛下來,你也照著做了,後果自負吧。
在將訊息路由給REQ套接字時,需要注意一定的格式,即地址-空幀-訊息:
使用地址進行路由
在經典的請求-應答模式中,ROUTER一般不會和REP套接字通訊,而是由DEALER去和REP通訊。DEALER會將訊息隨機分發給多個REP,並獲得結果。ROUTER更適合和REQ套接字通訊。
我們應該記住,ØMQ的經典模型往往是執行得最好的,畢竟人走得多的路往往是條好路,如果不按常理出牌,那很有可能會跌入無敵深潭。下面我們就將ROUTER和REP進行連線,看看會發生什麼。
REP套接字有兩個特點:
- 它需要完成完整的請求-應答週期;
- 它可以接受任意大小的信封,並能完整地返回該信封。
在一般的請求-應答模式中,REP是匿名的,可以隨時替換。因為我們這裡在將自定義路由,就要做到將一條訊息傳送給REP A,而不是REP B。這樣才能保證網路的一端是你,另一端是特定的REP。
ØMQ的核心理念之一是周邊的節點應該儘可能的智慧,且數量眾多,而中介軟體則是固定和簡單的。這就意味著周邊節點可以向其他特定的節點發送訊息,比如可以連線到一個特定的REP。這裡我們先不討論如何在多個節點之間進行路由,只看最後一步中ROUTER如何和特定的REP通訊的。
這張圖描述了以下事件:
- client有一條訊息,將來會通過另一個ROUTER將該訊息傳送回去。這條資訊包含了兩個地址、一個空幀、以及訊息內容;
- client將該條訊息傳送給了ROUTER,並指定了REP的地址;
- ROUTER將該地址移去,並以此決定其下哪個REP可以獲得該訊息;
- REP收到該條包含地址、空幀、以及內容的訊息;
- REP將空幀之前的所有內容移去,交給worker去處理訊息;
- worker處理完成後將回復交給REP;
- REP將之前儲存好的信封包裹住該條回覆,併發送給ROUTER;
- ROUTER在該條回覆上又添加了一個註明REP的地址的幀。
這個過程看起來很複雜,但還是有必要取了解清楚的。只要記住,REP套接字會原封不動地將信封返回回去。
rtpapa.c
//
// 自定義ROUTER-REP路由
//
#include "zhelpers.h"
// 這裡使用一個程序來強調事件發生的順序性
int main (void)
{
void *context = zmq_init (1);
void *client = zmq_socket (context, ZMQ_ROUTER);
zmq_bind (client, "ipc://routing.ipc");
void *worker = zmq_socket (context, ZMQ_REP);
zmq_setsockopt (worker, ZMQ_IDENTITY, "A", 1);
zmq_connect (worker, "ipc://routing.ipc");
// 等待worker連線
sleep (1);
// 傳送REP的標識、地址、空幀、以及訊息內容
s_sendmore (client, "A");
s_sendmore (client, "address 3");
s_sendmore (client, "address 2");
s_sendmore (client, "address 1");
s_sendmore (client, "");
s_send (client, "This is the workload");
// worker只會得到訊息內容
s_dump (worker);
// worker不需要處理信封
s_send (worker, "This is the reply");
// 看看ROUTER裡收到了什麼
s_dump (client);
zmq_close (client);
zmq_close (worker);
zmq_term (context);
return 0;
}
執行結果
----------------------------------------
[020] This is the workload
----------------------------------------
[001] A
[009] address 3
[009] address 2
[009] address 1
[000]
[017] This is the reply
關於以上程式碼的幾點說明:
在現實環境中,ROUTER和REP套接字處於不同的節點。本例沒有啟用多程序,為的是讓事件的發生順序更為清楚。
zmq_connect()並不是瞬間完成的,REP和ROUTER連線的時候是會花費一些時間的。在現實環境中,ROUTER無從得知REP是否已經連線成功了,除非得到REP的某些迴應。本例中使用sleep(1)來處理這一問題,如果不這樣做,那REP將無法獲得訊息(自己嘗試一下吧)。
我們使用REP的套接字標識來進行路由,如果你不信,可以將訊息傳送給B,看看A能不能收到。
本例中的s_dump()等函式來自於zhelpers.h檔案,可以看到在進行套接字連線時程式碼都是一樣的,所以我們才能在ØMQ API的基礎上搭建上層的API。等今後我們討論到複雜應用程式的時候再詳細說明。
要將訊息路由給REP,我們需要建立它能辨別的信封:
請求-應答模式下的訊息代理
這一節我們將對如何使用ØMQ訊息信封做一個回顧,並嘗試編寫一個通用的訊息代理裝置。我們會建立一個佇列裝置來連線多個client和worker,裝置的路由演算法可以由我們自己決定。這裡我們選擇最近最少使用演算法,因為這和負載均衡一樣比較實用。
首先讓我們回顧一下經典的請求-應答模型,嘗試用它建立一個不斷增長的巨型服務網路。最基本的請求-應答模型是:
這個模型支援多個REP套接字,但如果我們想支援多個REQ套接字,就需要增加一箇中間件,它通常是ROUTER和DEALER的結合體,簡單將兩個套接字之間的資訊進行搬運,因此可以用現成的ZMQ_QUEUE裝置來實現:
+--------+ +--------+ +--------+
| Client | | Client | | Client |
+--------+ +--------+ +--------+
| REQ | | REQ | | REQ |
+---+----+ +---+----+ +---+----+
| | |
+-----------+-----------+
|
+---+----+
| ROUTER |
+--------+
| Device |
+--------+
| DEALER |
+---+----+
|
+-----------+-----------+
| | |
+---+----+ +---+----+ +---+----+
| REP | | REP | | REP |
+--------+ +--------+ +--------+
| Worker | | Worker | | Worker |
+--------+ +--------+ +--------+
Figure # - Stretched request-reply
這種結構的關鍵在於,ROUTER會將訊息來自哪個REQ記錄下來,生成一個信封。DEALER和REP套接字在傳輸訊息的過程中不會丟棄或更改信封的內容,這樣當訊息返回給ROUTER時,它就知道應該傳送給哪個REQ了。這個模型中的REP套接字是匿名的,並沒有特定的地址,所以只能提供同一種服務。
上述結構中,對REP的路由我們使用了DEADER自帶的負載均衡演算法。但是,我們想用LRU演算法來進行路由,這就要用到ROUTER-REP模式:
這個ROUTER-ROUTER的LRU佇列不能簡單地在兩個套接字間搬運訊息,以下程式碼會比較複雜,不過在請求-應答模式中複用性很高。
lruqueue.c
//
// 使用LRU演算法的裝置
// client和worker處於不同的執行緒中
//
#include "zhelpers.h"
#include <pthread.h>
#define NBR_CLIENTS 10
#define NBR_WORKERS 3
// 出隊操作,使用一個可儲存任何型別的陣列實現
#define DEQUEUE(q) memmove (&(q)[0], &(q)[1], sizeof (q) - sizeof (q [0]))
// 使用REQ套接字實現基本的請求-應答模式
// 由於s_send()和s_recv()不能處理0MQ的二進位制套接字標識,
// 所以這裡會生成一個可列印的字串標識。
//
static void *
client_task (void *args)
{
void *context = zmq_init (1);
void *client = zmq_socket (context, ZMQ_REQ);
s_set_id (client); // 設定可列印的標識
zmq_connect (client, "ipc://frontend.ipc");
// 傳送請求並獲取應答資訊
s_send (client, "HELLO");
char *reply = s_recv (client);
printf ("Client: %s\n", reply);
free (reply);
zmq_close (client);
zmq_term (context);
return NULL;
}
// worker使用REQ套接字實現LRU演算法
//
static void *
worker_task (void *args)
{
void *context = zmq_init (1);
void *worker = zmq_socket (context, ZMQ_REQ);
s_set_id (worker); // 設定可列印的標識
zmq_connect (worker, "ipc://backend.ipc");
// 告訴代理worker已經準備好
s_send (worker, "READY");
while (1) {
// 將訊息中空幀之前的所有內容(信封)儲存起來,
// 本例中空幀之前只有一幀,但可以有更多。
char *address = s_recv (worker);
char *empty = s_recv (worker);
assert (*empty == 0);
free (empty);
// 獲取請求,併發送回應
char *request = s_recv (worker);
printf ("Worker: %s\n", request);
free (request);
s_sendmore (worker, address);
s_sendmore (worker, "");
s_send (worker, "OK");
free (address);
}
zmq_close (worker);
zmq_term (context);
return NULL;
}
int main (void)
{
// 準備0MQ上下文和套接字
void *context = zmq_init (1);
void *frontend = zmq_socket (context, ZMQ_ROUTER);
void *backend = zmq_socket (context, ZMQ_ROUTER);
zmq_bind (frontend, "ipc://frontend.ipc");
zmq_bind (backend, "ipc://backend.ipc");
int client_nbr;
for (client_nbr = 0; client_nbr < NBR_CLIENTS; client_nbr++) {
pthread_t client;
pthread_create (&client, NULL, client_task, NULL);
}
int worker_nbr;
for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++) {
pthread_t worker;
pthread_create (&worker, NULL, worker_task, NULL);
}
// LRU邏輯
// - 一直從backend中獲取訊息;當有超過一個worker空閒時才從frontend獲取訊息。
// - 當woker迴應時,會將該worker標記為已準備好,並轉發woker的迴應給client
// - 如果client傳送了請求,就將該請求轉發給下一個worker
// 存放可用worker的佇列
int available_workers = 0;
char *worker_queue [10];
while (1) {
zmq_pollitem_t items [] = {
{ backend, 0, ZMQ_POLLIN, 0 },
{ frontend, 0, ZMQ_POLLIN, 0 }
};
zmq_poll (items, available_workers? 2: 1, -1);
// 處理backend中worker的佇列
if (items [0].revents & ZMQ_POLLIN) {
// 將worker的地址入隊
char *worker_addr = s_recv (backend);
assert (available_workers < NBR_WORKERS);
worker_queue [available_workers++] = worker_addr;
// 跳過空幀
char *empty = s_recv (backend);
assert (empty [0] == 0);
free (empty);
// 第三幀是“READY”或是一個client的地址
char *client_addr = s_recv (backend);
// 如果是一個應答訊息,則轉發給client
if (strcmp (client_addr, "READY") != 0) {
empty = s_recv (backend);
assert (empty [0] == 0);
free (empty);
char *reply = s_recv (backend);
s_sendmore (frontend, client_addr);
s_sendmore (frontend, "");
s_send (frontend, reply);
free (reply);
if (--client_nbr == 0)
break; // 處理N條訊息後退出
}
free (client_addr);
}
if (items [1].revents & ZMQ_POLLIN) {
// 獲取下一個client的請求,交給空閒的worker處理
// client請求的訊息格式是:[client地址][空幀][請求內容]
char *client_addr = s_recv (frontend);
char *empty = s_recv (frontend);
assert (empty [0] == 0);
free (empty);
char *request = s_recv (frontend);
s_sendmore (backend, worker_queue [0]);
s_sendmore (backend, "");
s_sendmore (backend, client_addr);
s_sendmore (backend, "");
s_send (backend, request);
free (client_addr);
free (request);
// 將該worker的地址出隊
free (worker_queue [0]);
DEQUEUE (worker_queue);
available_workers--;
}
}
zmq_close (frontend);
zmq_close (backend);
zmq_term (context);
return 0;
}
這段程式有兩個關鍵點:1、各個套接字是如何處理信封的;2、LRU演算法。我們先來看信封的格式。
我們知道REQ套接字在傳送訊息時會向頭部新增一個空幀,接收時又會自動移除。我們要做的就是在傳輸訊息時滿足REQ的要求,處理好空幀。另外還要注意,ROUTER會在所有收到的訊息前新增訊息來源的地址。
現在我們就將完整的請求-應答流程走一遍,我們將client套接字的標識設為“CLIENT”,worker的設為“WORKER”。以下是client傳送的訊息:
代理從ROUTER中獲取到的訊息格式如下:
代理會從LRU佇列中獲取一個空閒woker的地址,作為信封附加在訊息之上,傳送給ROUTER。注意要新增一個空幀。
REQ(worker)收到訊息時,會將信封和空幀移去:
可以看到,worker收到的訊息和client端ROUTER收到的訊息是一致的。worker需要將該訊息中的信封儲存起來,只對訊息內容做操作。
在返回的過程中:
- worker通過REQ傳輸給device訊息[client地址][空幀][應答內容];
- device從worker端的ROUTER中獲取到[worker地址][空幀][client地址][空幀][應答內容];
- device將worker地址儲存起來,併發送[client地址][空幀][應答內容]給client端的ROUTER;
- client從REQ中獲得到[應答內容]。
然後再看看LRU演算法,它要求client和worker都使用REQ套接字,並正確的儲存和返回訊息信封,具體如下:
建立一組poll,不斷地從backend(worker端的ROUTER)獲取訊息;只有當有空閒的worker時才從frontend(client端的ROUTER)獲取訊息;
迴圈執行poll
如果backend有訊息,只有兩種情況:1)READY訊息(該worker已準備好,等待分配);2)應答訊息(需要轉發給client)。兩種情況下我們都會儲存worker的地址,放入LRU佇列中,如果有應答內容,則轉發給相應的client。
如果frontend有訊息,我們從LRU佇列中取出下一個worker,將該請求傳送給它。這就需要傳送[worker地址][空幀][client地址][空幀][請求內容]到worker端的ROUTER。
我們可以對該演算法進行擴充套件,如在worker啟動時做一個自我測試,計算出自身的處理速度,並隨READY訊息傳送給代理,這樣代理在分配工作時就可以做相應的安排。
ØMQ上層API的封裝
使用ØMQ提供的API操作多段訊息時是很麻煩的,如以下程式碼:
while (1) {
// 將訊息中空幀之前的所有內容(信封)儲存起來,
// 本例中空幀之前只有一幀,但可以有更多。
char *address = s_recv (worker);
char *empty = s_recv (worker);
assert (*empty == 0);
free (empty);
// 獲取請