1. 程式人生 > >ZeroMQ(java)之Publish/Subscribe模式

ZeroMQ(java)之Publish/Subscribe模式

前面的文章介紹了比較簡單的Request/Subscribe模式, 這篇文章介紹更為經典的Publish/Subscribe通訊模式用來ZeroMQ的實現,其通訊方式如下圖:


客戶端(subscriber)向伺服器(publisher)訂閱訊息,然後伺服器可以將訊息推送到所有訂閱了訊息的客戶端,這裡也可以理解為廣播吧。。。。

好了,閒話不多說了,直接上用ZeroMQ實現這種通訊模式的程式碼吧:

(1)服務端(publisher):

package pubsub;


import org.zeromq.ZMQ;

public class Publisher {
	public static void main(String args[]) {
	
		ZMQ.Context context = ZMQ.context(1);  //創建立包含一個I/O執行緒的context
		ZMQ.Socket publisher = context.socket(ZMQ.PUB);   //建立一個publisher型別的socket,他可以向所有訂閱的subscriber廣播資料
		
		publisher.bind("tcp://*:5555");  //將當前publisher繫結到5555埠上,可以接受subscriber的訂閱
		
        while (!Thread.currentThread ().isInterrupted ()) {
            String message = "fjs hello";  //最開始可以理解為pub的channel,subscribe需要訂閱fjs這個channel才能接收到訊息
            publisher.send(message.getBytes());
        }

		publisher.close();
		context.term();
	}
}

程式碼很簡單吧,這裡publisher來充當服務端,所有的subscriber都需要建立於publisher的連線。,,。,

(2)客戶端(subscriber)程式碼:

package pubsub;

import org.zeromq.ZMQ;

public class Subscriber {
	public static void main(String args[]) {
		for (int j = 0; j < 100; j++) {
			new Thread(new Runnable(){

				public void run() {
					// TODO Auto-generated method stub
					ZMQ.Context context = ZMQ.context(1);  //建立1個I/O執行緒的上下文
					ZMQ.Socket subscriber = context.socket(ZMQ.SUB);     //建立一個sub型別,也就是subscriber型別的socket
					subscriber.connect("tcp://127.0.0.1:5555");    //與在5555埠監聽的publisher建立連線
			        subscriber.subscribe("fjs".getBytes());     //訂閱fjs這個channel
					
					for (int i = 0; i < 100; i++) {
						byte[] message = subscriber.recv();  //接收publisher傳送過來的訊息
						System.out.println("receive : " + new String(message));
					}
					subscriber.close();
					context.term();
				}
				
			}).start();
		}
		
		
	}
}

這裡需要注意訂閱的channel問題,如果這裡錯了的話,subscriber是不會受到publisher傳送過來的資料的

好了,到這裡publish/subscribe的實現就算ok了