ZeroMQ(java)之Publish/Subscribe模式
阿新 • • 發佈:2019-02-05
前面的文章介紹了比較簡單的Request/Subscribe模式, 這篇文章介紹更為經典的Publish/Subscribe通訊模式用來ZeroMQ的實現,其通訊方式如下圖:
客戶端(subscriber)向伺服器(publisher)訂閱訊息,然後伺服器可以將訊息推送到所有訂閱了訊息的客戶端,這裡也可以理解為廣播吧。。。。
好了,閒話不多說了,直接上用ZeroMQ實現這種通訊模式的程式碼吧:
(1)服務端(publisher):
package pubsub; import org.zeromq.ZMQ; public class Publisher { public static void main(String args[]) { ZMQ.Context context = ZMQ.context(1); //創建立包含一個I/O執行緒的context ZMQ.Socket publisher = context.socket(ZMQ.PUB); //建立一個publisher型別的socket,他可以向所有訂閱的subscriber廣播資料 publisher.bind("tcp://*:5555"); //將當前publisher繫結到5555埠上,可以接受subscriber的訂閱 while (!Thread.currentThread ().isInterrupted ()) { String message = "fjs hello"; //最開始可以理解為pub的channel,subscribe需要訂閱fjs這個channel才能接收到訊息 publisher.send(message.getBytes()); } publisher.close(); context.term(); } }
程式碼很簡單吧,這裡publisher來充當服務端,所有的subscriber都需要建立於publisher的連線。,,。,
(2)客戶端(subscriber)程式碼:
package pubsub; import org.zeromq.ZMQ; public class Subscriber { public static void main(String args[]) { for (int j = 0; j < 100; j++) { new Thread(new Runnable(){ public void run() { // TODO Auto-generated method stub ZMQ.Context context = ZMQ.context(1); //建立1個I/O執行緒的上下文 ZMQ.Socket subscriber = context.socket(ZMQ.SUB); //建立一個sub型別,也就是subscriber型別的socket subscriber.connect("tcp://127.0.0.1:5555"); //與在5555埠監聽的publisher建立連線 subscriber.subscribe("fjs".getBytes()); //訂閱fjs這個channel for (int i = 0; i < 100; i++) { byte[] message = subscriber.recv(); //接收publisher傳送過來的訊息 System.out.println("receive : " + new String(message)); } subscriber.close(); context.term(); } }).start(); } } }
這裡需要注意訂閱的channel問題,如果這裡錯了的話,subscriber是不會受到publisher傳送過來的資料的
好了,到這裡publish/subscribe的實現就算ok了