1. 程式人生 > >ZeroMQ研究 Majordomo Protocol, Java樣例實現

ZeroMQ研究 Majordomo Protocol, Java樣例實現

最近研究利用zeromq實現多對多的雙向自由收發。在官方上發現了MDP協議,經過驗證貌似可行。正在開發中,將驗證程式碼分享如下。 互動協議棧: Worker端:
A READY command consists of a multipart message of 4 frames, formatted on the wire as follows:    * Frame 0: Empty frame    * Frame 1: "MDPW01" (six bytes, representing MDP/Worker v0.1)    * Frame 2: 0x01 (one byte, representing READY)    * Frame 3: Service name (printable string) A REQUEST command consists of a multipart message of 6 or more frames, formatted on the wire as follows:    * Frame 0: Empty frame    * Frame 1: "MDPW01" (six bytes, representing MDP/Worker v0.1)    * Frame 2: 0x02 (one byte, representing REQUEST)    * Frame 3: Client address (envelope stack)    * Frame 4: Empty (zero bytes, envelope delimiter)    * Frames 5+: Request body (opaque binary) A REPLY command consists of a multipart message of 6 or more frames, formatted on the wire as follows:    * Frame 0: Empty frame    * Frame 1: "MDPW01" (six bytes, representing MDP/Worker v0.1)    * Frame 2: 0x03 (one byte, representing REPLY)    * Frame 3: Client address (envelope stack)    * Frame 4: Empty (zero bytes, envelope delimiter)    * Frames 5+: Reply body (opaque binary) A HEARTBEAT command consists of a multipart message of 3 frames, formatted on the wire as follows:    * Frame 0: Empty frame    * Frame 1: "MDPW01" (six bytes, representing MDP/Worker v0.1)    * Frame 2: 0x04 (one byte, representing HEARTBEAT) A DISCONNECT command consists of a multipart message of 3 frames, formatted on the wire as follows:    * Frame 0: Empty frame    * Frame 1: "MDPW01" (six bytes, representing MDP/Worker v0.1)    * Frame 2: 0x05 (one byte, representing DISCONNECT)
Client端:
A REQUEST command consists of a multipart message of 4 or more frames, formatted on the wire as follows:    * Frame 0: Empty (zero bytes, invisible to REQ application)    * Frame 1: "MDPC01" (six bytes, representing MDP/Client v0.1)    * Frame 2: Service name (printable string)    * Frames 3+: Request body (opaque binary) A REPLY command consists of a multipart message of 4 or more frames, formatted on the wire as follows:    * Frame 0: Empty (zero bytes, invisible to REQ application)    * Frame 1: "MDPC01" (six bytes, representing MDP/Client v0.1)    * Frame 2: Service name (printable string)    * Frames 3+: Reply body (opaque binary)
下面是示例程式碼,基於官方的程式碼精簡改造,去掉了heartbeat機制。便於理解功能。 Broker:
package com.coderli.zeromq.majordomoprotocol;


import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashMap;
import java.util.Map;


import org.zeromq.ZContext;
import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
import org.zeromq.ZMsg
; import com.coderli.zeromq.JZMQBase; /** * ZeroMQ Majordomo Protocol協議驗證<br> * 用於實現多client、多worker實現雙向指定目標資料收發 <br> * 此為核心Broker模組 * */ public class Broker extends JZMQBase { private static class Service { // 服務名 public final String name; // 請求資訊佇列 Deque<ZMsg> requests; // 待用worker佇列 Deque<Worker> waiting; // List of waiting workers public Service(String name) { this. name = name; this. requests = new ArrayDeque<ZMsg>(); this. waiting = new ArrayDeque<Worker>(); } } private static class Worker { // worker的唯一標識 @SuppressWarnings( "unused") String identity; // Identity of worker // 目標worker地址 ZFrame address; // Address frame to route to // 包含的service名稱,如果存在 Service service; public Worker(String identity, ZFrame address) { this. address = address; this. identity = identity; } } private ZContext ctx; private ZMQ.Socket socket; private Map<String, Service> services; private Map<String, Worker> workers; private Deque<Worker> waiting; public static void main(String[] args) { Broker broker = new Broker(); broker.bind( BROKER_FRONT_END); broker.mediate(); } public Broker() { this. services = new HashMap<String, Service>(); this. workers = new HashMap<String, Worker>(); this. waiting = new ArrayDeque<Worker>(); this. ctx = new ZContext(); this. socket = ctx.createSocket(ZMQ. ROUTER); } public void mediate() { while (!Thread. currentThread().isInterrupted()) { ZMQ.Poller items = new ZMQ.Poller(1); items.register( socket, ZMQ.Poller. POLLIN); items.poll(); if (items.pollin(0)) { ZMsg msg = ZMsg. recvMsg(socket); if (msg == null) { System. out.println( "接收到的訊息為null。" ); break; // Interrupted } System. out.println( "I: received message:\n"); msg.dump(System. out); // 根據協議棧規則讀取資料,此處需要注意broker接受到的協議棧格式 ZFrame sender = msg.pop(); ZFrame empty = msg.pop(); ZFrame header = msg.pop(); if (MDP. C_CLIENT.frameEquals(header)) { processClient(sender, msg); } else if (MDP.W_WORKER.frameEquals(header)) processWorker(sender, msg); else { System. out.println( "E: invalid message:\n"); msg.dump(System. out); msg.destroy(); } sender.destroy(); empty.destroy(); header.destroy(); } } destroy(); } private void destroy() { Worker[] deleteList = workers.entrySet().toArray( new Worker[0]); for (Worker worker : deleteList) { deleteWorker(worker, true); } ctx.destroy(); } /** * 處理客戶端請求的,用於分發給指定的worker. */ private void processClient(ZFrame sender, ZMsg msg) { if (msg.size() < 2) { System. out.println( "訊息棧不完整,不能傳送" ); return; } ZFrame serviceFrame = msg.pop(); msg.wrap(sender); dispatch(requireService(serviceFrame), msg); serviceFrame.destroy(); } private void processWorker(ZFrame sender, ZMsg msg) { if (msg.size() < 1) { System. out.println( "回覆給客戶端的訊息不完整,不能傳送。" ); } ZFrame command = msg.pop(); boolean workerReady = workers.containsKey(sender.strhex()); Worker worker = requireWorker(sender); if (MDP. W_READY.frameEquals(command)) { if (workerReady) { System. out.println( "刪除worker:" + sender.strhex()); deleteWorker(worker, true); } else { ZFrame serviceFrame = msg.pop(); worker. service = requireService(serviceFrame); workerWaiting(worker); serviceFrame.destroy(); } } else if (MDP. W_REPLY.frameEquals(command)) { if (workerReady) { System. out.println( "開始給客戶端相應" ); ZFrame client = msg.unwrap(); msg.addFirst(worker. service. name); msg.addFirst(MDP. C_CLIENT.newFrame()); msg.wrap(client); msg.send( socket); workerWaiting(worker); } else { deleteWorker(worker, true); } } else { System. out.print( "不合法的訊息結構" ); msg.dump(System. out); } msg.destroy(); } private void deleteWorker(Worker worker, boolean disconnect) { System. out.println( "刪除worker"); if (disconnect) { sendToWorker(worker, MDP. W_DISCONNECT, null, null); } if (worker. service != null) worker. service. waiting.remove(worker); workers.remove(worker); worker. address.destroy(); } private Worker requireWorker(ZFrame address) { assert (address != null); String identity = address.strhex(); Worker worker = workers.get(identity); if (worker == null) { worker = new Worker(identity, address.duplicate()); workers.put(identity, worker); System. out.println( "註冊了新的worker:" + identity); } return worker; } private Service requireService(ZFrame serviceFrame) { assert (serviceFrame != null); String name = serviceFrame.toString(); Service service = services.get(name); if (service == null) { service = new Service(name); services.put(name, service); } return service; } private void bind(String endpoint) { socket.bind(endpoint); System. out.println( "Broker版定在埠: " + endpoint); } public synchronized void workerWaiting(Worker worker) { waiting.addLast(worker); worker. service. waiting.addLast(worker); dispatch(worker. service, null); } private void dispatch(Service service, ZMsg msg) { assert (service != null); if (msg != null) { service. requests.offerLast(msg); } while (!service. waiting.isEmpty() &amp;&amp; !service.requests.isEmpty()) { msg = service. requests.pop(); Worker worker = service. waiting.pop(); waiting.remove(worker); sendToWorker(worker, MDP. W_REQUEST, null, msg); msg.destroy(); } } public void sendToWorker(Worker worker, MDP command, String option, ZMsg msgp) { ZMsg msg = msgp == null ? new ZMsg() : msgp.duplicate(); if (option != null) msg.addFirst( new ZFrame(option)); msg.addFirst(command.newFrame()); msg.addFirst(MDP. W_WORKER.newFrame()); msg.wrap(worker. address.duplicate()); System. out.println( "給worker傳送命令: [" + command + "]。"); msg.dump(System. out); msg.send( socket); } }
ClientAPI:
package com.coderli.zeromq.majordomoprotocol;


import org.zeromq.ZContext;
import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
import org.zeromq.ZMsg;


/**
 * ZeroMQ Majordomo Protocol協議驗證<br>
 * 用於實現多client、多worker實現雙向指定目標資料收發 <br>
 * 此為Client端依賴的API。
 *
 */
public class ClientAPI {


     private String broker;
     private ZContext ctx;
     private ZMQ.Socket client;
     private long timeout = 2500;
     private int retries = 3;


     public long getTimeout() {
           return timeout;
     }


     public void setTimeout( long timeout) {
           this. timeout = timeout;
     }


     public int getRetries() {
           return retries;
     }


     public void setRetries( int retries) {
           this. retries = retries;
     }


     public ClientAPI(String broker) {
           this. broker = broker;
           ctx = new ZContext();
          reconnectToBroker();
     }


     void reconnectToBroker() {
           if ( client != null) {
               ctx.destroySocket( client);
          }
           client = ctx.createSocket(ZMQ. REQ);
           client.connect( broker);
          System. out.println( "連線到Broker:" + broker );
     }


     /**
      * 給broker傳送訊息
      *
      * @param service
      * @param request
      * @return
      */
     public ZMsg send(String service, ZMsg request) {


          request.push( new ZFrame(service));
          request.push(MDP. C_CLIENT.newFrame());
          System. out.println( "傳送訊息給worker:" + service);
          request.dump(System. out);
          ZMsg reply = null;


           int retriesLeft = retries;
           while (retriesLeft > 0 &amp;&amp; !Thread.currentThread().isInterrupted()) {
              request.duplicate().send( client);
              ZMQ.Poller items = new ZMQ.Poller(1);
              items.register( client, ZMQ.Poller. POLLIN);
               if (items.poll( timeout) == -1)
                    break; // 超時退出
               if (items.pollin(0)) {
                   ZMsg msg = ZMsg. recvMsg(client);
                   System. out.println( "接收到訊息。" );
                   msg.dump(System. out);
                   ZFrame header = msg.pop();
                   header.destroy();
                   ZFrame replyService = msg.pop();
                   replyService.destroy();
                   reply = msg;
                    break;
              } else {
                   items.unregister( client);
                    if (--retriesLeft == 0) {
                        System. out.println( "超過重試次數,錯誤。退出。" );
                         break;
                   }
                   System. out.println( "沒有收到迴應,重試。" );
                   reconnectToBroker();
              }
          }
          request.destroy();
           return reply;
     }


     public void destroy() {
           ctx.destroy();
     }
}
ClientOne:
package com.coderli.zeromq.majordomoprotocol;


import org.zeromq.ZMsg;


import com.coderli.zeromq.JZMQBase;


/**
 * ZeroMQ Majordomo Protocol協議驗證<br>
 * 用於實現多client、多worker實現雙向指定目標資料收發 <br>
 * 此為Client端一號,定向發給1、2號worker
 *
 */
public class ClientOne extends JZMQBase {


     public static void main(String[] args) throws InterruptedException {
          ClientAPI clientSession = new ClientAPI(BROKER_FRONT_END);


           int count;
           for (count = 0; count < 1; count++) {
              ZMsg request = new ZMsg();
              ZMsg reply = null;
               long start = System. nanoTime();
              request.addString(String. valueOf(start));
               if (count % 2 == 1) {
                   reply = clientSession.send( "one", request);
              } else {
                   reply = clientSession.send( "two", request);
              }
               if (reply != null)
                   reply.destroy();
               else
                    break; // Interrupt or failure
              Thread. sleep(1000000L);
          }


          System. out.printf( "%d requests/replies processed\n", count);
          clientSession.destroy();
     }
}
WorkerAPI:
package com.coderli.zeromq.majordomoprotocol;


import org.zeromq.ZContext;
import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
import org.zeromq.ZMsg;


/**
 * ZeroMQ驗證 workerAPI封裝
 *
 * @author lihzh
 * @date 2014年1月15日 下午2:23:14
 */
public class WorkerAPI {


     private String broker;
     private ZContext ctx;
     private String service;
     private ZMQ.Socket worker;


     private long timeout = 2500;


     private ZFrame replyTo;


     public WorkerAPI(String broker, String service) {
           assert (broker != null);
           assert (service != null);
           this. broker = broker;
           this. service = service;
           ctx = new ZContext();
          reconnectToBroker();
     }


     /**
      * 給Broker傳送訊息
      *
      * @param command
      * @param option
      * @param msg
      */
     void sendToBroker(MDP command, String option, ZMsg msg) {
          msg = msg != null ? msg.duplicate() : new ZMsg();


           if (option != null)
              msg.addFirst( new ZFrame(option));
          msg.addFirst(command.newFrame());
          msg.addFirst(MDP. W_WORKER.newFrame());
          msg.addFirst( new ZFrame( new byte[0]));
          msg.send( worker);
     }


     void reconnectToBroker() {
           if ( worker != null) {
               ctx.destroySocket( worker);
          }
           worker = ctx.createSocket(ZMQ. DEALER);
           worker.connect( broker);
          sendToBroker(MDP. W_READY, service, null);
     }


     /**
      * 接收資料
      *
      * @param reply
      * @return
      * @author lihzh
      * @date 2014年1月15日 下午2:24:23
      */
     public ZMsg receive(ZMsg reply)