ZeroMQ(java)之負載均衡
我們在實際的應用中最常遇到的場景如下:
A向B傳送請求,B向A返回結果。。。。
但是這種場景就會很容易變成這個樣子:
很多A向B傳送請求,所以B要不斷的處理這些請求,所以就會很容易想到對B進行擴充套件,由多個B來處理這些請求,那麼這裡就出現了另外一個問題:
B對請求處理的速度可能不同,那麼B之間他們的負載也是不同的,那麼應該如何對請求進行分發就成了一個比較重要的問題。。。也就變成了負載均衡的問題了。。。
其實最好的負載均衡解決方案也很簡單:
絕大多數的任務都是獨立的,這裡中間層可以將A傳送過來的請求先快取起來,然後B的行為就是主動的找中間層獲取請求處理,然後返回,再獲取。。。。也就是中間層只是做一個請求的快取。。。由B自己來掌控合適來處理請求,也就是當B已經處理完了任務之後,自己去主動獲取。。。而不是由中間層自己去主動分發。。。。
嗯,那麼在ZeroMQ中應該如何實現這種模式呢,恩其實還挺簡單的,如下圖:
這裡由兩個Router來作為中間層,具體的資料流程如下:
(1)中間層啟動,Worker連線Backend,向其傳送Request請求(ready),這個時候中間層就能夠知道哪一個worker現在是空閒的,將其儲存起來(放到worker佇列),可以處理請求
worker的執行流程就是send(傳送ready)--->recv(獲取請求),
(2)Client端向Fronted傳送請求,中間層將請求快取到一個任務佇列
(3)中間層從任務隊裡裡面取出一個任務,將其傳送給worker佇列中的一個worker,並將其從woker佇列中移除
(4)worker處理完以後,傳送執行結果,也就是send,中間層收到woker的資料 之後,將其傳送給相應的client,然後在講這個worker放到worker佇列中,表示當前這個worker可用。。。。
好了,前面就基本上介紹了整個結構用ZeroMQ應該是怎麼實現的,那麼接下來就直接來上程式碼吧:
-
package balance;
-
import java.util.LinkedList;
-
import org.zeromq.ZFrame;
-
import org.zeromq.ZMQ;
-
import org.zeromq.ZMsg;
-
public class Balance {
-
public static class Client {
-
public void start() {
-
new Thread(new Runnable(){
-
public void run() {
-
// TODO Auto-generated method stub
-
ZMQ.Context context = ZMQ.context(1);
-
ZMQ.Socket socket = context.socket(ZMQ.REQ);
-
socket.connect("ipc://front"); //連線router,想起傳送請求
-
for (int i = 0; i < 1000; i++) {
-
socket.send("hello".getBytes(), 0); //傳送hello請求
-
String bb = new String(socket.recv()); //獲取返回的資料
-
System.out.println(bb);
-
}
-
socket.close();
-
context.term();
-
}
-
}).start();
-
}
-
}
-
public static class Worker {
-
public void start() {
-
new Thread(new Runnable(){
-
public void run() {
-
// TODO Auto-generated method stub
-
ZMQ.Context context = ZMQ.context(1);
-
ZMQ.Socket socket = context.socket(ZMQ.REQ);
-
socket.connect("ipc://back"); //連線,用於獲取要處理的請求,併發送回去處理結果
-
socket.send("ready".getBytes()); //傳送ready,表示當前可用
-
while (!Thread.currentThread().isInterrupted()) {
-
ZMsg msg = ZMsg.recvMsg(socket); //獲取需要處理的請求,其實這裡msg最外面的標誌frame是router對分配給client的標誌frame
-
ZFrame request = msg.removeLast(); //最後一個frame其實儲存的就是實際的請求資料,這裡將其移除,待會用新的frame代替
-
ZFrame frame = new ZFrame("hello fjs".getBytes());
-
msg.addLast(frame); //將剛剛建立的frame放到msg的最後,worker將會收到
-
msg.send(socket); //將資料傳送回去
-
}
-
socket.close();
-
context.term();
-
}
-
}).start();
-
}
-
}
-
public static class Middle {
-
private LinkedList<ZFrame> workers;
-
private LinkedList<ZMsg> requests;
-
private ZMQ.Context context;
-
private ZMQ.Poller poller;
-
public Middle() {
-
this.workers = new LinkedList<ZFrame>();
-
this.requests = new LinkedList<ZMsg>();
-
this.context = ZMQ.context(1);
-
this.poller = new ZMQ.Poller(2);
-
}
-
public void start() {
-
ZMQ.Socket fronted = this.context.socket(ZMQ.ROUTER); //建立一個router,用於接收client傳送過來的請求,以及向client傳送處理結果
-
ZMQ.Socket backend = this.context.socket(ZMQ.ROUTER); //建立一個router,用於向後面的worker傳送資料,然後接收處理的結果
-
fronted.bind("ipc://front"); //監聽,等待client的連線
-
backend.bind("ipc://back"); //監聽,等待worker連線
-
//建立pollItem
-
ZMQ.PollItem fitem = new ZMQ.PollItem(fronted, ZMQ.Poller.POLLIN);
-
ZMQ.PollItem bitem = new ZMQ.PollItem(backend, ZMQ.Poller.POLLIN);
-
this.poller.register(fitem); //註冊pollItem
-
this.poller.register(bitem);
-
while (!Thread.currentThread().isInterrupted()) {
-
this.poller.poll();
-
if (fitem.isReadable()) { //表示前面有請求發過來了
-
ZMsg msg = ZMsg.recvMsg(fitem.getSocket()); //獲取client傳送過來的請求,這裡router會在實際請求上面套一個連線的標誌frame
-
this.requests.addLast(msg); //將其掛到請求佇列
-
}
-
if (bitem.isReadable()) { //這裡表示worker傳送資料過來了
-
ZMsg msg = ZMsg.recvMsg(bitem.getSocket()); //獲取msg,這裡也會在實際傳送的資料前面包裝一個連線的標誌frame
-
//這裡需要注意,這裡返回的是最外面的那個frame,另外它還會將後面的接著的空的標誌frame都去掉
-
ZFrame workerID = msg.unwrap(); //把外面那層包裝取下來,也就是router對連線的標誌frame
-
this.workers.addLast(workerID); //將當前的worker的標誌frame放到worker佇列裡面,表示這個worker可以用了
-
ZFrame readyOrAddress = msg.getFirst(); //這裡獲取標誌frame後面的資料,如果worker剛剛啟動,那麼應該是傳送過來的ready,
-
if (new String(readyOrAddress.getData()).equals("ready")) { //表示是worker剛剛啟動,發過來的ready
-
msg.destroy();
-
} else {
-
msg.send(fronted); //表示是worker處理完的返回結果,那麼返回給客戶端
-
}
-
}
-
while (this.workers.size() > 0 && this.requests.size() > 0) {
-
ZMsg request = this.requests.removeFirst();
-
ZFrame worker = this.workers.removeFirst();
-
request.wrap(worker); //在request前面包裝一層,把可以用的worker的標誌frame包裝上,這樣router就會發給相應的worker的連線
-
request.send(backend); //將這個包裝過的訊息傳送出去
-
}
-
}
-
fronted.close();
-
backend.close();
-
this.context.term();
-
}
-
}
-
public static void main(String args[]) {
-
Worker worker = new Worker();
-
worker.start();
-
Client client = new Client();
-
client.start();
-
Middle middle = new Middle();
-
middle.start();
-
}
-
}
其實根據前面已經提出來的實現原理來編寫程式碼還是比較順利的,中途也沒有遇到什麼問題。。。不過要理解這部分要比較瞭解ZeroMQ的資料格式才行