1. 程式人生 > >ZeroMQ(java)之負載均衡

ZeroMQ(java)之負載均衡

我們在實際的應用中最常遇到的場景如下:

A向B傳送請求,B向A返回結果。。。。

但是這種場景就會很容易變成這個樣子:

很多A向B傳送請求,所以B要不斷的處理這些請求,所以就會很容易想到對B進行擴充套件,由多個B來處理這些請求,那麼這裡就出現了另外一個問題:

B對請求處理的速度可能不同,那麼B之間他們的負載也是不同的,那麼應該如何對請求進行分發就成了一個比較重要的問題。。。也就變成了負載均衡的問題了。。。

其實最好的負載均衡解決方案也很簡單:

絕大多數的任務都是獨立的,這裡中間層可以將A傳送過來的請求先快取起來,然後B的行為就是主動的找中間層獲取請求處理,然後返回,再獲取。。。。也就是中間層只是做一個請求的快取。。。由B自己來掌控合適來處理請求,也就是當B已經處理完了任務之後,自己去主動獲取。。。而不是由中間層自己去主動分發。。。。

嗯,那麼在ZeroMQ中應該如何實現這種模式呢,恩其實還挺簡單的,如下圖:

 

這裡由兩個Router來作為中間層,具體的資料流程如下:

(1)中間層啟動,Worker連線Backend,向其傳送Request請求(ready),這個時候中間層就能夠知道哪一個worker現在是空閒的,將其儲存起來(放到worker佇列),可以處理請求

worker的執行流程就是send(傳送ready)--->recv(獲取請求),

(2)Client端向Fronted傳送請求,中間層將請求快取到一個任務佇列

(3)中間層從任務隊裡裡面取出一個任務,將其傳送給worker佇列中的一個worker,並將其從woker佇列中移除

(4)worker處理完以後,傳送執行結果,也就是send,中間層收到woker的資料 之後,將其傳送給相應的client,然後在講這個worker放到worker佇列中,表示當前這個worker可用。。。。

 

好了,前面就基本上介紹了整個結構用ZeroMQ應該是怎麼實現的,那麼接下來就直接來上程式碼吧:

 
  1. package balance;

  2.  
  3. import java.util.LinkedList;

  4.  
  5. import org.zeromq.ZFrame;

  6. import org.zeromq.ZMQ;

  7. import org.zeromq.ZMsg;

  8.  
  9. public class Balance {

  10.  
  11. public static class Client {

  12. public void start() {

  13. new Thread(new Runnable(){

  14.  
  15. public void run() {

  16. // TODO Auto-generated method stub

  17. ZMQ.Context context = ZMQ.context(1);

  18. ZMQ.Socket socket = context.socket(ZMQ.REQ);

  19.  
  20. socket.connect("ipc://front"); //連線router,想起傳送請求

  21.  
  22. for (int i = 0; i < 1000; i++) {

  23. socket.send("hello".getBytes(), 0); //傳送hello請求

  24. String bb = new String(socket.recv()); //獲取返回的資料

  25. System.out.println(bb);

  26. }

  27. socket.close();

  28. context.term();

  29. }

  30.  
  31. }).start();

  32. }

  33. }

  34.  
  35. public static class Worker {

  36. public void start() {

  37. new Thread(new Runnable(){

  38.  
  39. public void run() {

  40. // TODO Auto-generated method stub

  41. ZMQ.Context context = ZMQ.context(1);

  42. ZMQ.Socket socket = context.socket(ZMQ.REQ);

  43.  
  44. socket.connect("ipc://back"); //連線,用於獲取要處理的請求,併發送回去處理結果

  45.  
  46. socket.send("ready".getBytes()); //傳送ready,表示當前可用

  47.  
  48. while (!Thread.currentThread().isInterrupted()) {

  49. ZMsg msg = ZMsg.recvMsg(socket); //獲取需要處理的請求,其實這裡msg最外面的標誌frame是router對分配給client的標誌frame

  50. ZFrame request = msg.removeLast(); //最後一個frame其實儲存的就是實際的請求資料,這裡將其移除,待會用新的frame代替

  51. ZFrame frame = new ZFrame("hello fjs".getBytes());

  52. msg.addLast(frame); //將剛剛建立的frame放到msg的最後,worker將會收到

  53. msg.send(socket); //將資料傳送回去

  54.  
  55. }

  56. socket.close();

  57. context.term();

  58. }

  59.  
  60. }).start();

  61. }

  62. }

  63.  
  64. public static class Middle {

  65. private LinkedList<ZFrame> workers;

  66. private LinkedList<ZMsg> requests;

  67. private ZMQ.Context context;

  68. private ZMQ.Poller poller;

  69.  
  70. public Middle() {

  71. this.workers = new LinkedList<ZFrame>();

  72. this.requests = new LinkedList<ZMsg>();

  73. this.context = ZMQ.context(1);

  74. this.poller = new ZMQ.Poller(2);

  75. }

  76.  
  77. public void start() {

  78. ZMQ.Socket fronted = this.context.socket(ZMQ.ROUTER); //建立一個router,用於接收client傳送過來的請求,以及向client傳送處理結果

  79. ZMQ.Socket backend = this.context.socket(ZMQ.ROUTER); //建立一個router,用於向後面的worker傳送資料,然後接收處理的結果

  80.  
  81. fronted.bind("ipc://front"); //監聽,等待client的連線

  82. backend.bind("ipc://back"); //監聽,等待worker連線

  83.  
  84. //建立pollItem

  85. ZMQ.PollItem fitem = new ZMQ.PollItem(fronted, ZMQ.Poller.POLLIN);

  86. ZMQ.PollItem bitem = new ZMQ.PollItem(backend, ZMQ.Poller.POLLIN);

  87.  
  88. this.poller.register(fitem); //註冊pollItem

  89. this.poller.register(bitem);

  90.  
  91.  
  92. while (!Thread.currentThread().isInterrupted()) {

  93. this.poller.poll();

  94. if (fitem.isReadable()) { //表示前面有請求發過來了

  95. ZMsg msg = ZMsg.recvMsg(fitem.getSocket()); //獲取client傳送過來的請求,這裡router會在實際請求上面套一個連線的標誌frame

  96. this.requests.addLast(msg); //將其掛到請求佇列

  97. }

  98. if (bitem.isReadable()) { //這裡表示worker傳送資料過來了

  99. ZMsg msg = ZMsg.recvMsg(bitem.getSocket()); //獲取msg,這裡也會在實際傳送的資料前面包裝一個連線的標誌frame

  100. //這裡需要注意,這裡返回的是最外面的那個frame,另外它還會將後面的接著的空的標誌frame都去掉

  101. ZFrame workerID = msg.unwrap(); //把外面那層包裝取下來,也就是router對連線的標誌frame

  102. this.workers.addLast(workerID); //將當前的worker的標誌frame放到worker佇列裡面,表示這個worker可以用了

  103. ZFrame readyOrAddress = msg.getFirst(); //這裡獲取標誌frame後面的資料,如果worker剛剛啟動,那麼應該是傳送過來的ready,

  104.  
  105.  
  106. if (new String(readyOrAddress.getData()).equals("ready")) { //表示是worker剛剛啟動,發過來的ready

  107. msg.destroy();

  108. } else {

  109. msg.send(fronted); //表示是worker處理完的返回結果,那麼返回給客戶端

  110. }

  111. }

  112.  
  113. while (this.workers.size() > 0 && this.requests.size() > 0) {

  114. ZMsg request = this.requests.removeFirst();

  115. ZFrame worker = this.workers.removeFirst();

  116.  
  117. request.wrap(worker); //在request前面包裝一層,把可以用的worker的標誌frame包裝上,這樣router就會發給相應的worker的連線

  118. request.send(backend); //將這個包裝過的訊息傳送出去

  119. }

  120.  
  121. }

  122. fronted.close();

  123. backend.close();

  124. this.context.term();

  125. }

  126. }

  127.  
  128.  
  129. public static void main(String args[]) {

  130. Worker worker = new Worker();

  131. worker.start();

  132. Client client = new Client();

  133. client.start();

  134. Middle middle = new Middle();

  135. middle.start();

  136.  
  137. }

  138. }

 

其實根據前面已經提出來的實現原理來編寫程式碼還是比較順利的,中途也沒有遇到什麼問題。。。不過要理解這部分要比較瞭解ZeroMQ的資料格式才行