ZeroMQ研究 Majordomo Protocol, Java樣例實現
阿新 • • 發佈:2018-12-23
最近研究利用zeromq實現多對多的雙向自由收發。在官方上發現了MDP協議,經過驗證貌似可行。正在開發中,將驗證程式碼分享如下。
互動協議棧:
Worker端:
Client端:
下面是示例程式碼,基於官方的程式碼精簡改造,去掉了heartbeat機制。便於理解功能。
Broker:
A READY command consists of a multipart message of 4 frames, formatted on the wire as follows: * Frame 0: Empty frame * Frame 1: "MDPW01" (six bytes, representing MDP/Worker v0.1) * Frame 2: 0x01 (one byte, representing READY) * Frame 3: Service name (printable string) A REQUEST command consists of a multipart message of 6 or more frames, formatted on the wire as follows: * Frame 0: Empty frame * Frame 1: "MDPW01" (six bytes, representing MDP/Worker v0.1) * Frame 2: 0x02 (one byte, representing REQUEST) * Frame 3: Client address (envelope stack) * Frame 4: Empty (zero bytes, envelope delimiter) * Frames 5+: Request body (opaque binary) A REPLY command consists of a multipart message of 6 or more frames, formatted on the wire as follows: * Frame 0: Empty frame * Frame 1: "MDPW01" (six bytes, representing MDP/Worker v0.1) * Frame 2: 0x03 (one byte, representing REPLY) * Frame 3: Client address (envelope stack) * Frame 4: Empty (zero bytes, envelope delimiter) * Frames 5+: Reply body (opaque binary) A HEARTBEAT command consists of a multipart message of 3 frames, formatted on the wire as follows: * Frame 0: Empty frame * Frame 1: "MDPW01" (six bytes, representing MDP/Worker v0.1) * Frame 2: 0x04 (one byte, representing HEARTBEAT) A DISCONNECT command consists of a multipart message of 3 frames, formatted on the wire as follows: * Frame 0: Empty frame * Frame 1: "MDPW01" (six bytes, representing MDP/Worker v0.1) * Frame 2: 0x05 (one byte, representing DISCONNECT)
A REQUEST command consists of a multipart message of 4 or more frames, formatted on the wire as follows: * Frame 0: Empty (zero bytes, invisible to REQ application) * Frame 1: "MDPC01" (six bytes, representing MDP/Client v0.1) * Frame 2: Service name (printable string) * Frames 3+: Request body (opaque binary) A REPLY command consists of a multipart message of 4 or more frames, formatted on the wire as follows: * Frame 0: Empty (zero bytes, invisible to REQ application) * Frame 1: "MDPC01" (six bytes, representing MDP/Client v0.1) * Frame 2: Service name (printable string) * Frames 3+: Reply body (opaque binary)
package com.coderli.zeromq.majordomoprotocol;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashMap;
import java.util.Map;
import org.zeromq.ZContext;
import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
import org.zeromq.ZMsg ;
import com.coderli.zeromq.JZMQBase;
/**
* ZeroMQ Majordomo Protocol協議驗證<br>
* 用於實現多client、多worker實現雙向指定目標資料收發 <br>
* 此為核心Broker模組
*
*/
public class Broker extends JZMQBase {
private static class Service {
// 服務名
public final String name;
// 請求資訊佇列
Deque<ZMsg> requests;
// 待用worker佇列
Deque<Worker> waiting; // List of waiting workers
public Service(String name) {
this. name = name;
this. requests = new ArrayDeque<ZMsg>();
this. waiting = new ArrayDeque<Worker>();
}
}
private static class Worker {
// worker的唯一標識
@SuppressWarnings( "unused")
String identity; // Identity of worker
// 目標worker地址
ZFrame address; // Address frame to route to
// 包含的service名稱,如果存在
Service service;
public Worker(String identity, ZFrame address) {
this. address = address;
this. identity = identity;
}
}
private ZContext ctx;
private ZMQ.Socket socket;
private Map<String, Service> services;
private Map<String, Worker> workers;
private Deque<Worker> waiting;
public static void main(String[] args) {
Broker broker = new Broker();
broker.bind( BROKER_FRONT_END);
broker.mediate();
}
public Broker() {
this. services = new HashMap<String, Service>();
this. workers = new HashMap<String, Worker>();
this. waiting = new ArrayDeque<Worker>();
this. ctx = new ZContext();
this. socket = ctx.createSocket(ZMQ. ROUTER);
}
public void mediate() {
while (!Thread. currentThread().isInterrupted()) {
ZMQ.Poller items = new ZMQ.Poller(1);
items.register( socket, ZMQ.Poller. POLLIN);
items.poll();
if (items.pollin(0)) {
ZMsg msg = ZMsg. recvMsg(socket);
if (msg == null) {
System. out.println( "接收到的訊息為null。" );
break; // Interrupted
}
System. out.println( "I: received message:\n");
msg.dump(System. out);
// 根據協議棧規則讀取資料,此處需要注意broker接受到的協議棧格式
ZFrame sender = msg.pop();
ZFrame empty = msg.pop();
ZFrame header = msg.pop();
if (MDP. C_CLIENT.frameEquals(header)) {
processClient(sender, msg);
} else if (MDP.W_WORKER.frameEquals(header))
processWorker(sender, msg);
else {
System. out.println( "E: invalid message:\n");
msg.dump(System. out);
msg.destroy();
}
sender.destroy();
empty.destroy();
header.destroy();
}
}
destroy();
}
private void destroy() {
Worker[] deleteList = workers.entrySet().toArray( new Worker[0]);
for (Worker worker : deleteList) {
deleteWorker(worker, true);
}
ctx.destroy();
}
/**
* 處理客戶端請求的,用於分發給指定的worker.
*/
private void processClient(ZFrame sender, ZMsg msg) {
if (msg.size() < 2) {
System. out.println( "訊息棧不完整,不能傳送" );
return;
}
ZFrame serviceFrame = msg.pop();
msg.wrap(sender);
dispatch(requireService(serviceFrame), msg);
serviceFrame.destroy();
}
private void processWorker(ZFrame sender, ZMsg msg) {
if (msg.size() < 1) {
System. out.println( "回覆給客戶端的訊息不完整,不能傳送。" );
}
ZFrame command = msg.pop();
boolean workerReady = workers.containsKey(sender.strhex());
Worker worker = requireWorker(sender);
if (MDP. W_READY.frameEquals(command)) {
if (workerReady) {
System. out.println( "刪除worker:" + sender.strhex());
deleteWorker(worker, true);
} else {
ZFrame serviceFrame = msg.pop();
worker. service = requireService(serviceFrame);
workerWaiting(worker);
serviceFrame.destroy();
}
} else if (MDP. W_REPLY.frameEquals(command)) {
if (workerReady) {
System. out.println( "開始給客戶端相應" );
ZFrame client = msg.unwrap();
msg.addFirst(worker. service. name);
msg.addFirst(MDP. C_CLIENT.newFrame());
msg.wrap(client);
msg.send( socket);
workerWaiting(worker);
} else {
deleteWorker(worker, true);
}
} else {
System. out.print( "不合法的訊息結構" );
msg.dump(System. out);
}
msg.destroy();
}
private void deleteWorker(Worker worker, boolean disconnect) {
System. out.println( "刪除worker");
if (disconnect) {
sendToWorker(worker, MDP. W_DISCONNECT, null, null);
}
if (worker. service != null)
worker. service. waiting.remove(worker);
workers.remove(worker);
worker. address.destroy();
}
private Worker requireWorker(ZFrame address) {
assert (address != null);
String identity = address.strhex();
Worker worker = workers.get(identity);
if (worker == null) {
worker = new Worker(identity, address.duplicate());
workers.put(identity, worker);
System. out.println( "註冊了新的worker:" + identity);
}
return worker;
}
private Service requireService(ZFrame serviceFrame) {
assert (serviceFrame != null);
String name = serviceFrame.toString();
Service service = services.get(name);
if (service == null) {
service = new Service(name);
services.put(name, service);
}
return service;
}
private void bind(String endpoint) {
socket.bind(endpoint);
System. out.println( "Broker版定在埠: " + endpoint);
}
public synchronized void workerWaiting(Worker worker) {
waiting.addLast(worker);
worker. service. waiting.addLast(worker);
dispatch(worker. service, null);
}
private void dispatch(Service service, ZMsg msg) {
assert (service != null);
if (msg != null) {
service. requests.offerLast(msg);
}
while (!service. waiting.isEmpty() && !service.requests.isEmpty()) {
msg = service. requests.pop();
Worker worker = service. waiting.pop();
waiting.remove(worker);
sendToWorker(worker, MDP. W_REQUEST, null, msg);
msg.destroy();
}
}
public void sendToWorker(Worker worker, MDP command, String option,
ZMsg msgp) {
ZMsg msg = msgp == null ? new ZMsg() : msgp.duplicate();
if (option != null)
msg.addFirst( new ZFrame(option));
msg.addFirst(command.newFrame());
msg.addFirst(MDP. W_WORKER.newFrame());
msg.wrap(worker. address.duplicate());
System. out.println( "給worker傳送命令: [" + command + "]。");
msg.dump(System. out);
msg.send( socket);
}
}
ClientAPI:
package com.coderli.zeromq.majordomoprotocol;
import org.zeromq.ZContext;
import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
import org.zeromq.ZMsg;
/**
* ZeroMQ Majordomo Protocol協議驗證<br>
* 用於實現多client、多worker實現雙向指定目標資料收發 <br>
* 此為Client端依賴的API。
*
*/
public class ClientAPI {
private String broker;
private ZContext ctx;
private ZMQ.Socket client;
private long timeout = 2500;
private int retries = 3;
public long getTimeout() {
return timeout;
}
public void setTimeout( long timeout) {
this. timeout = timeout;
}
public int getRetries() {
return retries;
}
public void setRetries( int retries) {
this. retries = retries;
}
public ClientAPI(String broker) {
this. broker = broker;
ctx = new ZContext();
reconnectToBroker();
}
void reconnectToBroker() {
if ( client != null) {
ctx.destroySocket( client);
}
client = ctx.createSocket(ZMQ. REQ);
client.connect( broker);
System. out.println( "連線到Broker:" + broker );
}
/**
* 給broker傳送訊息
*
* @param service
* @param request
* @return
*/
public ZMsg send(String service, ZMsg request) {
request.push( new ZFrame(service));
request.push(MDP. C_CLIENT.newFrame());
System. out.println( "傳送訊息給worker:" + service);
request.dump(System. out);
ZMsg reply = null;
int retriesLeft = retries;
while (retriesLeft > 0 && !Thread.currentThread().isInterrupted()) {
request.duplicate().send( client);
ZMQ.Poller items = new ZMQ.Poller(1);
items.register( client, ZMQ.Poller. POLLIN);
if (items.poll( timeout) == -1)
break; // 超時退出
if (items.pollin(0)) {
ZMsg msg = ZMsg. recvMsg(client);
System. out.println( "接收到訊息。" );
msg.dump(System. out);
ZFrame header = msg.pop();
header.destroy();
ZFrame replyService = msg.pop();
replyService.destroy();
reply = msg;
break;
} else {
items.unregister( client);
if (--retriesLeft == 0) {
System. out.println( "超過重試次數,錯誤。退出。" );
break;
}
System. out.println( "沒有收到迴應,重試。" );
reconnectToBroker();
}
}
request.destroy();
return reply;
}
public void destroy() {
ctx.destroy();
}
}
ClientOne:
package com.coderli.zeromq.majordomoprotocol;
import org.zeromq.ZMsg;
import com.coderli.zeromq.JZMQBase;
/**
* ZeroMQ Majordomo Protocol協議驗證<br>
* 用於實現多client、多worker實現雙向指定目標資料收發 <br>
* 此為Client端一號,定向發給1、2號worker
*
*/
public class ClientOne extends JZMQBase {
public static void main(String[] args) throws InterruptedException {
ClientAPI clientSession = new ClientAPI(BROKER_FRONT_END);
int count;
for (count = 0; count < 1; count++) {
ZMsg request = new ZMsg();
ZMsg reply = null;
long start = System. nanoTime();
request.addString(String. valueOf(start));
if (count % 2 == 1) {
reply = clientSession.send( "one", request);
} else {
reply = clientSession.send( "two", request);
}
if (reply != null)
reply.destroy();
else
break; // Interrupt or failure
Thread. sleep(1000000L);
}
System. out.printf( "%d requests/replies processed\n", count);
clientSession.destroy();
}
}
WorkerAPI:
package com.coderli.zeromq.majordomoprotocol;
import org.zeromq.ZContext;
import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
import org.zeromq.ZMsg;
/**
* ZeroMQ驗證 workerAPI封裝
*
* @author lihzh
* @date 2014年1月15日 下午2:23:14
*/
public class WorkerAPI {
private String broker;
private ZContext ctx;
private String service;
private ZMQ.Socket worker;
private long timeout = 2500;
private ZFrame replyTo;
public WorkerAPI(String broker, String service) {
assert (broker != null);
assert (service != null);
this. broker = broker;
this. service = service;
ctx = new ZContext();
reconnectToBroker();
}
/**
* 給Broker傳送訊息
*
* @param command
* @param option
* @param msg
*/
void sendToBroker(MDP command, String option, ZMsg msg) {
msg = msg != null ? msg.duplicate() : new ZMsg();
if (option != null)
msg.addFirst( new ZFrame(option));
msg.addFirst(command.newFrame());
msg.addFirst(MDP. W_WORKER.newFrame());
msg.addFirst( new ZFrame( new byte[0]));
msg.send( worker);
}
void reconnectToBroker() {
if ( worker != null) {
ctx.destroySocket( worker);
}
worker = ctx.createSocket(ZMQ. DEALER);
worker.connect( broker);
sendToBroker(MDP. W_READY, service, null);
}
/**
* 接收資料
*
* @param reply
* @return
* @author lihzh
* @date 2014年1月15日 下午2:24:23
*/
public ZMsg receive(ZMsg reply)