淺析ZeroMQ工作原理及其特點
ZeroMQ的研究與學習
- 簡介
- 工作模式
- 層級模型
- 實現原理
- 核心特點
- 與其他MQ的簡單比較
ZeroMQ的一百字概括
ZeroMQ看起來想一個可嵌入的網路庫,但其作用就像是一個併發框架。它為你提供了各種傳輸工具,如程序內,程序間,TCP和組播中進行原子訊息傳遞的套接字。你可以使用各種模式實現N對N的套接字連線,這些模式包括髮布訂閱,請求應答,扇出模式,管道模式。它的速度足夠快,因此可以充當叢集產品的結構,他的非同步IO模型提供了可擴充套件的多核應用程式,用非同步訊息來處理任務。它雖然是以C為原始碼進行開發,但是可以繫結多種語言。
1. 簡介
ZeroMQ號稱是“史上最快的訊息佇列”,基於c語言開發的,實時流處理sorm的task之間的通訊就是用的zeroMQ。 引用官方說法,“ZMQ(以下ZeroMQ簡稱ZMQ)是一個簡單好用的傳輸層,像框架一樣的一個socket library,他使得Socket程式設計更加簡單、簡潔和效能更高。是一個訊息處理佇列庫,可在多個執行緒、核心和主機盒之間彈性伸縮。ZMQ的明確目標是“成為標準網路協議棧的一部分,之後進入Linux核心”。現在還未看到它們的成功。但是,它無疑是極具前景的、並且是人們更加需要的“傳統”BSD套接字之上的一 層封裝。ZMQ讓編寫高效能網路應用程式極為簡單和有趣。” 確實,它跟RabbitMQ,ActiveMQ之類有著相當本質的區別,ZeroMQ根本就不是一個訊息佇列伺服器,更像是一組底層網路通訊庫,對原有的Socket API加上一層封裝,是我們操作更簡便。使用時只需要引入相應的jar包即可。
2. 工作模式
ZeroMQ與其他MQ類似,也實現了3中最基本的工作模式:釋出-訂閱,請求-應答,管道
1.釋出-訂閱
“釋出-訂閱”模式下,“釋出者”繫結一個指定的地址,例如“192.168.10.1:5500”,“訂閱者”連線到該地址。該模式下訊息流是單向的,只允許從“釋出者”流向“訂閱者”。且“釋出者”只管發訊息,不理會是否存在“訂閱者”。上圖只是“釋出-訂閱”的最基本的模型,一個“釋出者”可以擁有多個訂閱者,同樣的,一個“訂閱者”也可訂閱多個釋出者。下面給出“釋出-訂閱”模型的樣例程式:
釋出者:
import org.zeromq.ZMQ;
public class Publisher {
public static void main(String args[]) {
ZMQ.Context context = ZMQ.context(1); //創建立包含一個I/O執行緒的context
ZMQ.Socket publisher = context.socket(ZMQ.PUB); //建立一個publisher型別的socket,他可以向所有訂閱的subscriber廣播資料
publisher.bind("tcp://*:5555"); //將當前publisher繫結到5555埠上,可以接受subscriber的訂閱
while (!Thread.currentThread ().isInterrupted ()) {
String message = "fjs hello"; //最開始可以理解為pub的channel,subscribe需要訂閱fjs這個channel才能接收到訊息
publisher.send(message.getBytes());
}
publisher.close();
context.term();
}
}
訂閱者
import org.zeromq.ZMQ;
public class Subscriber {
public static void main(String args[]) {
for (int j = 0; j < 100; j++) {
new Thread(new Runnable(){
public void run() {
// TODO Auto-generated method stub
ZMQ.Context context = ZMQ.context(1); //建立1個I/O執行緒的上下文
ZMQ.Socket subscriber = context.socket(ZMQ.SUB); //建立一個sub型別,也就是subscriber型別的socket
subscriber.connect("tcp://127.0.0.1:5555"); //與在5555埠監聽的publisher建立連線
subscriber.subscribe("fjs".getBytes()); //訂閱fjs這個channel
for (int i = 0; i < 100; i++) {
byte[] message = subscriber.recv(); //接收publisher傳送過來的訊息
System.out.println("receive : " + new String(message));
}
subscriber.close();
context.term();
}
}).start();
}
}
}
雖然我們知道“釋出者”在傳送訊息時是不關心“訂閱者”的存在於否,所以先啟動“釋出者”,再啟動“訂閱者”是很容易導致部分訊息丟失的。那麼可能會提出一個說法“我先啟動‘訂閱者’,再啟動‘釋出者’,就能解決這個問題了?” 對於ZeroMQ而言,這種做法也並不能保證100%的可靠性。在ZeroMQ領域中有一個叫做“慢木匠”的術語,就是說即使我是先啟動了“訂閱者”,再啟動“釋出者”,“訂閱者”總是會丟失第一批資料。因為在“訂閱者”與端點建立TCP連線時,會包含幾毫秒的握手時間,雖然時間短,但是是存在的。再加上ZeroMQ後臺IO是以一部方式執行的,所以若不在雙方之間施加同步策略,訊息丟失是不可避免的。
關於“釋出-訂閱”模式在ZeroMQ中的一些其他特點:
1.公平排隊,一個“訂閱者”連線到多個釋出者時,會均衡的從每個“釋出者”讀取訊息,不會出現一個“釋出者”淹沒其他“釋出者”的情況。
2.ZMQ3.0以上的版本,過濾規則發生在“釋出方”. ZMQ3.0以下的版本,過濾規則發生在“訂閱方”。其實也就是處理訊息的位置。
2.請求-應答
說到“請求-應答”模式,不得不說的就是它的訊息流動模型以及資料包裝模型。
訊息流動模型指的是該模式下,必須嚴格遵守“一問一答”的方式。
在原始碼中Req存在兩個重要的標誌位:
private boolean receiving_reply; //標誌位,如果是ture的話,表示request已經發送了,正在等待reponse
private boolean message_begins; //如果是true的話,那麼表示這裡還需要傳送第一個標誌的空msg
發出訊息後,若沒有收到回覆,再發出第二條訊息時就會丟擲異常。同樣的,對於Rep也是,在沒有接收到訊息前,不允許發出訊息。基於此構成“一問一答”的響應模式。
對於訊息傳送時的具體資料格式,引入兩個圖作為參照:
含有識別碼:
不含識別碼:
這種資料結構ZeroMQ稱之為“封包”。一個封包由0個或多個“幀”組成。對於“請求-應答”模式,一個元封包一般由2-3個幀組成,可以看出,差別就是第一幀的存在與否。
對於含有目標地址的封包,第一幀存放訊息接收端的身份識別碼,該碼在ZeroMQ內部維護的一個Map中作為key,Value是對應的地址。第二幀是一個分隔幀,沒有任何意義,僅僅起分隔作用。第三幀是傳送的資料。對於這類封包,通常第一幀,也就是識別碼需要我們手動指定。
相比於前者,不含識別碼的封包內的幀的含義還是一樣,只是它的識別碼直接有ZeroMQ預設生成,無需手動指定。
傳送時,根據識別碼在記憶體Map中對應的地址,將訊息投遞過去。
示例程式:
服務端
import org.zeromq.ZMQ;
public class Response {
public static void main (String[] args) {
ZMQ.Context context = ZMQ.context(1); //這個表示建立用於一個I/O執行緒的context
ZMQ.Socket socket = context.socket(ZMQ.REP); //建立一個response型別的socket,他可以接收request傳送過來的請求,其實可以將其簡單的理解為服務端
socket.bind ("tcp://*:5555"); //繫結埠
int i = 0;
int number = 0;
while (!Thread.currentThread().isInterrupted()) {
i++;
if (i == 10000) {
i = 0;
System.out.println(++number);
}
byte[] request = socket.recv(); //獲取request傳送過來的資料
//System.out.println("receive : " + new String(request));
String response = "world";
socket.send(response.getBytes()); //向request端傳送資料 ,必須要要request端返回資料,沒有返回就又recv,將會出錯,這裡可以理解為強制要求走完整個request/response流程
}
socket.close(); //先關閉socket
context.term(); //關閉當前的上下文
}
}
客戶端
import org.zeromq.ZMQ;
public class Request {
public static void main(String args[]) {
for (int j = 0; j < 5; j++) {
new Thread(new Runnable(){
public void run() {
// TODO Auto-generated method stub
ZMQ.Context context = ZMQ.context(1); //建立一個I/O執行緒的上下文
ZMQ.Socket socket = context.socket(ZMQ.REQ); //建立一個request型別的socket,這裡可以將其簡單的理解為客戶端,用於向response端傳送資料
socket.connect("tcp://127.0.0.1:5555"); //與response端建立連線
long now = System.currentTimeMillis();
for (int i = 0; i < 100000; i++) {
String request = "hello";
socket.send(request.getBytes()); //向reponse端傳送資料
byte[] response = socket.recv(); //接收response傳送回來的資料 正在request/response模型中,send之後必須要recv之後才能繼續send,這可能是為了保證整個request/response的流程走完
// System.out.println("receive : " + new String(response));
}
long after = System.currentTimeMillis();
System.out.println((after - now) / 1000);
}
}).start();;
}
}
}
“請求-應答”模式中Req套接字是同步的,每次只跟一個節點交流,如果Req套接字連線了多個節點,請求會同時分發到每一個節點。
相應的Rep套接字也是同步的,每次只跟一個節點交流,如果連線了多個節點,則以公平的方式以此從每個節點讀取請求,但是最先響應的是最後讀取的請求。
在接下來的內部結構分析時,將以“請求-應答”模式為例。
3.管道模式
在說明“管道模式”前,需要明確的是在ZeroMQ中並沒有絕對的服務端與客戶端之分,所有的資料接收與傳送都是以連線為單位的,只區分ZeroMQ定義的型別。就像套接字繫結地址時,可以使用“bind”,也可以使用“connect”,只是通常我們將理解中的服務端“bind”到一個地址,而理解中的客戶端“connec”到該地址。
“管道模式”一般用於任務分發與結果收集,由一個任務發生器來產生任務,“公平”的派發到其管轄下的所有worker,完成後再由結果收集器來回收任務的執行結果。
任務發生器
import org.zeromq.ZMQ;
public class Push {
public static void main(String args[]) {
ZMQ.Context context = ZMQ.context(1);
ZMQ.Socket push = context.socket(ZMQ.PUSH);
push.bind("ipc://fjs");
for (int i = 0; i < 10000000; i++) {
push.send("hello".getBytes());
}
push.close();
context.term();
}
}
Worker
import java.util.concurrent.atomic.AtomicInteger;
import org.zeromq.ZMQ;
public class Pull {
public static void main(String args[]) {
final AtomicInteger number = new AtomicInteger(0);
for (int i = 0; i < 5; i++) {
new Thread(new Runnable(){
private int here = 0;
public void run() {
// TODO Auto-generated method stub
ZMQ.Context context = ZMQ.context(1);
ZMQ.Socket pull = context.socket(ZMQ.PULL);
pull.connect("ipc://fjs");
//pull.connect("ipc://fjs");
while (true) {
String message = new String(pull.recv());
int now = number.incrementAndGet();
here++;
if (now % 1000000 == 0) {
System.out.println(now + " here is : " + here);
}
}
}
}).start();
}
}
}
結果收集器
import org.zeromq.ZMQ;
public class Pull {
public static void main(String args[]) {
ZMQ.Context context = ZMQ.context(1);
ZMQ.Socket pull = context.socket(ZMQ.PULL);
pull.bind("ipc://fjs");
int number = 0;
while (true) {
String message = new String(pull.recv());
number++;
if (number % 1000000 == 0) {
System.out.println(number);
}
}
}
}
整體流程比較好理解,Worker連線到任務發生器上,等待任務的產生,完成後將結果傳送至結果收集器。如果要以客戶端服務端的概念來區分,這裡的任務發生器與結果收集器是服務端,而worker是客戶端。
前面說到了這裡任務的派發是“公平的”,因為內部採用了LRU的演算法來找到最近最久未工作的閒置worker。但是公平在這裡是相對的,當任務發生器啟動後,第一個連線到它的worker會在一瞬間承受整個任務發生器產生的tasks。
3. 層級模型與互動邏輯
這是ZeroMQ的主要的層級模型,以“請求-應答”為例。
由上而下,最頂層的是ZObject與IPollEvent。
ZObject是所有ZeroMQ體系中類的父類,它存在的意義是傳送與接收命令(有別於訊息,命令是告訴ZeroMQ該做什麼,需要做什麼)。
IPollEvent則是一個介面,定義了若干操作,包括讀操作,寫操作,客戶端請求連線,服務端應答連線,超時操作等供5個操作,該操作的實現類包括Req,Rep等具體Socket。該介面的目的是定義終端間發生操作時的行為。
Ctx是一個上下文類,通常一個終端只需要建立一個上下文。
IOObject本身並沒有太多的屬性,主要是其內維護了一個IOThread。
IOThread是用於處理命令的一個類,內部持有一個MailBox例項與Poller例項。
MailBox是一個重要的類,它被用作處理命令,包括命令的傳送與接收,需要注意的是,這裡的命令其實是本地傳送,也就是自己跟自己發,而不是端點間傳送。
Pipe用於處理接收到或者需要傳送的資料,是實際儲存待處理資料的資料結構,其內部是用佇列的形式實現。
LB、FQ這兩者在官方給出的全名是“LoadBalance”,“FairQueue”。也就是負載均衡與公平排隊,分別用於處理要傳送的資料與要接收的資料。
SocketBase是例如Req,Rep,Pull等包裝後Socket的父類。其內含有一對Pipe(與SessionBase公用),用於在SocketBase與SessionBase之間傳遞訊息,具體傳遞過程在接下去或說明。
SessionBase是建立SocketChannel並與目標終端進行連線的地方,是與底層Poller最先進行互動的一層。具有超時重連,斷線重連等功能。
Poller是整個ZeroMQ的核心,它實現了命令的傳送與接收,資料的傳送與接收。由他來真正的傳送資料到其他終端,也是他處理來自其他終端的資料後交給SessionBase。
基於此層級模型的互動邏輯:
傳送訊息
Socket -> Session -> StreamEngine -> Poller
接收訊息
Poller -> StreamEngine -> Session -> Socket
4. 實現原理
這部分將說明從建立一個Socket開始到傳送或者接收資料的整個過程,ZeroMQ內部的處理流程。
不過我個人覺得先了解一些在底層的原理,對於整體的實現理解會有更好的幫助。
先看一下Poller的一些重要定義
private static class PollSet {
protected IPollEvents handler; //事件的回撥
protected SelectionKey key; //註冊之後的key
protected int ops; //註冊的事件
protected boolean cancelled; //是否已經取消
protected PollSet(IPollEvents handler) {
this.handler = handler;
key = null;
cancelled = false;
ops = 0;
}
}
final private Map<SelectableChannel, PollSet> fd_table; //記錄所有的註冊,key是channel
private boolean retired; //當前註冊的物件是否有更新,如果有更新的話,在執行select之前需要先更新註冊
volatile private boolean stopping; //如果是true的話,那麼執行執行緒將會停止
volatile private boolean stopped; //是否已經停止
private Thread worker; //worker執行緒
private Selector selector; //selector
final private String name; //名字
PollerSet是Poller的一個巢狀類,所有需要註冊到selector上的channel都會先構建這個物件,將其當做附件註冊到selector上。其中handler是事件回撥(一般是一個IOObject例項),key是selector註冊後取得的key,ops是註冊的事件型別
fd_table用於維護註冊的channel物件與其的PollSet物件之間的對映關係。
retired用於標識當前的註冊的channel什麼的是否有更新,若是需要更新,則可能會重新生成key。
接下來來看看如何在poller物件上面註冊channel吧,有幾個比較重要的方法:
//用於在當前的集合裡面新增需要註冊的channel,第一個引數是channel,第二個引數是事件回撥
public final void add_fd (SelectableChannel fd_, IPollEvents events_) {
fd_table.put(fd_, new PollSet(events_)); //直接把放到map裡面就好了
adjust_load (1); //增加load值,這裡所謂的負載其實就是在當前poller裡面註冊的channel的數量
}
//在key上面註冊事件,如果negate為true的話,那麼表示是取消事件
private final void register (SelectableChannel handle_, int ops, boolean negate) {
PollSet pollset = fd_table.get(handle_); //獲取pollset物件
if (negate) {
pollset.ops = pollset.ops &~ ops; //取反,相當於取消事件
} else {
pollset.ops = pollset.ops | ops; //註冊事件
}
if (pollset.key != null) { //如果有key了,那麼表示已經註冊到selector上面了,那麼只需要更新key就好了
pollset.key.interestOps(pollset.ops);
} else {
retired = true;
}
}
可見在Poller裡註冊一個事件主要分為兩步 1.放入map中 2.設定PollerSet的相應屬性
Poller本身作為一個執行緒,來看看它的run方法
public void run () {
int returnsImmediately = 0;
while (!stopping) {
long timeout = execute_timers (); //執行所有的超時,並且獲取下一個超時的時間
if (retired) { //這裡表示註冊的東西有更新
Iterator <Map.Entry <SelectableChannel,PollSet>> it = fd_table.entrySet ().iterator ();
while (it.hasNext ()) { //遍歷所有需要註冊的
Map.Entry <SelectableChannel,PollSet> entry = it.next ();
SelectableChannel ch = entry.getKey (); //獲取channel
PollSet pollset = entry.getValue (); //獲取pollset
if (pollset.key == null) { //這裡沒有key的話,表示當前channel並沒有註冊到selector上面去
try {
pollset.key = ch.register(selector, pollset.ops, pollset.handler); //註冊
} catch (ClosedChannelException e) {
}
}
if (pollset.cancelled || !ch.isOpen()) { //如果是取消註冊,那麼直接取消掉就可以了
if(pollset.key != null) {
pollset.key.cancel();
}
it.remove ();
}
}
retired = false;
}
// Wait for events.
int rc;
long start = System.currentTimeMillis (); //select之前的時間
try {
rc = selector.select (timeout);
} catch (IOException e) {
throw new ZError.IOException (e);
}
if (rc == 0) { //出錯啦,好像
// Guess JDK epoll bug
if (timeout == 0 ||
System.currentTimeMillis () - start < timeout / 2)
returnsImmediately ++;
else
returnsImmediately = 0;
if (returnsImmediately > 10) {
rebuildSelector (); //重建selector
returnsImmediately = 0;
}
continue;
}
Iterator<SelectionKey> it = selector.selectedKeys().iterator(); //所有select出來的key
while (it.hasNext()) { //遍歷
SelectionKey key = it.next();
IPollEvents evt = (IPollEvents) key.attachment();
it.remove();
try { //接下來就是判斷事件的型別執行相應的方法就好了
if (key.isReadable() ) { //有資料可以讀取了
evt.in_event();
} else if (key.isAcceptable()) { //有新的連線進來了
evt.accept_event();
} else if (key.isConnectable()) { //連線建立
evt.connect_event();
}
if (key.isWritable()) { //可寫
evt.out_event();
}
} catch (CancelledKeyException e) {
// channel might have been closed
}
}
}
stopped = true;
}
這部分還是好理解的,首先是檢查fd_table是否需要更新,其實就是有沒有新插入的channel或者有channel已經失效,由retired標誌位決定。如果需要更新,遍歷map中每個元素,檢查PollerSet裡的key,如果沒有,則在Selector上進行註冊。
然後呼叫selector.select(),若是有事件到來,根據其事件型別以及註冊事件時一併傳入的handle來決定執行何種操作。
簡單來說Poller就是一個輪詢器,我們在它的Selector上註冊相應的channel與事件。而Poller定期掃描來捕獲channel的狀態。同時我們也瞭解到一點,Poller才是真正的IO執行緒持有者。
粗淺的說明了Poller之後,再來看看MailBox
同樣,先是介紹一些重要的屬性
private final YPipe<Command> cpipe; //一個用來儲存command的佇列,內部以連結串列的形式實現
private final Signaler signaler; //其實也是一個實現了一個SocketChannel,但是不對外發送訊息,而是向Poller傳送空白訊息,以提醒command佇列中有命令需要處理
private final Lock sync; //只有一個執行緒從mailbox裡面收命令,但是會有很多執行緒向mialbox裡面傳送命令,用這個鎖來進行同步
public SelectableChannel get_fd () {
return signaler.get_fd (); //這裡其實獲取的是signal用到的pipe的讀channel
}
//向當前的mailbox傳送命令,其實就是寫到command佇列裡面去而已
public void send (final Command cmd_) {
boolean ok = false;
sync.lock ();
try {
cpipe.write (cmd_, false);
ok = cpipe.flush (); //pipeflush,這裡將會被selector感應到,從而可以執行相應的處理,在執行執行緒裡面執行命令
} finally {
sync.unlock ();
}
if (!ok) {
signaler.send (); //通過寫端寫資料,這樣子的話會被讀端收到
}
}
//收取命令,如果這裡無法立刻獲取命令的話,還可以有一個超時時間
public Command recv (long timeout_) {
Command cmd_ = null;
// Try to get the command straight away.
if (active) {
cmd_ = cpipe.read (); //從佇列裡面獲取命令
if (cmd_ != null) {
return cmd_;
}
// If there are no more commands available, switch into passive state.
active = false;
signaler.recv (); //這裡會從讀端不斷的讀資料
}
// Wait for signal from the command sender.
boolean rc = signaler.wait_event (timeout_);
if (!rc)
return null;
// We've got the signal. Now we can switch into active state.
active = true;
// Get a command.
cmd_ = cpipe.read ();
assert (cmd_ != null);
return cmd_;
}
MailBox,就像之前說過的,只是用於處理命令的一個東西。命令的讀寫都是對本地的一個佇列進行操作。需要注意的是在寫命令與讀命令之間需要有Signaler來充當訊號通知者。
Signaler內部有一組變數:
private Pipe.SinkChannel w; 資料寫入端
private Pipe.SourceChannel r; 資料讀取端
將SourceChannel 註冊到了poller內。 這樣,當命令寫入到佇列中,會觸發SinkChannel的write操作,通過SinkChannel向SourceChannel寫資料,此時會被poller內的selector感知到。
由於IOThred在向poller註冊時,傳入的回撥是“this”,也就是本身,在發生in_event(讀事件)時,實際呼叫的時IOThread的in_event。
然後IOThread中的in_event從MailBox中讀取資料,實質是從YPipe中讀取command。
對於Signaler的作用,只是用於提醒poller有命令,它向SinkChannel內寫入的資料其實是一個大小為1沒有意義的ByteBuffer。只是用於觸發在poller內註冊的SourceChannel的Readable事件。
需要明確的是,command都是針對於本地的。不會在兩臺不同的機器間傳送command,因為send_command並沒有走socketchannel,所以不可能通過網路傳送。
MailBox裡面的邏輯大致就是如此:
1.命令寫入YPipe
2.Signaler提醒Poller,啟用in_event
3.MailBox從YPipe讀取命令並執行
ok,一些基本的概念說的差不多了,接下來開始說明Socket的建立以及訊息的傳送過程。
//這是一個建立上下文,Socket,與目標端進行連線,傳送資料以及接收資料的客戶端程式碼
Context context = ZMQ.context(1);
Socket worker = context.socket(ZMQ.REQ);
worker.connect("tcp://localhost:5671");
worker.send ("Hi Boss");
String workload = worker.recvStr ();
Sysout.out.println(workload);
1.建立上下文
private final List<SocketBase> sockets;
private final Deque<Integer> empty_slots;
private volatile boolean starting;
private boolean terminating;
private final Lock slot_sync;
private Reaper reaper;
private final List<IOThread> io_threads;
private int slot_count;
private Mailbox[] slots;
private final Mailbox term_mailbox;
private final Lock endpoints_sync;
private static AtomicInteger max_socket_id = new AtomicInteger(0);
private int max_sockets;
private int io_thread_count;
private final Lock opt_sync;
public static final int term_tid = 0;
public static final int reaper_tid = 1;
上面給出了Ctx內的一些重要的成員變數,初始化過程中呼叫了init_ctx(),返回一個Ctx物件,此時僅僅只是對部分成員變數做了一個初始賦值,並沒有特殊操作。
2.建立Socket
//擷取部分程式碼,基本上能表示整個過程
if (starting) {
starting = false;
opt_sync.lock ();
int mazmq = max_sockets;
int ios = io_thread_count;
opt_sync.unlock ();
slot_count = mazmq + ios + 2;
slots = new Mailbox[slot_count];
slots [term_tid] = term_mailbox;
reaper = new Reaper (this, reaper_tid);
slots [reaper_tid] = reaper.get_mailbox ();
reaper.start ();
//以上部分建立的Reaper物件與兩個MailBox是作用於上下文銷燬的時候處理剩餘訊息以及釋放佔用資源。
//下面是需要關注的部分,ios是在建立Ctx時我們指定需要建立的IO執行緒數,通常情況1個就足夠了。根據我們指定的數量建立相應的IOThread,每個IOThread都有他子身的MailBox。
for (int i = 2; i != ios + 2; i++) {
//建立IOThread物件的時候會建立一個Poller,以及一個MailBox,同時,將MailBox對應的Signaler的SourceChannel註冊到Poller中以監聽有無command需要執行。
IOThread io_thread = new IOThread (this, i);
io_threads.add(io_thread);
slots [i] = io_thread.get_mailbox ();
//啟動Poller
io_thread.start ();
}
for (int i = (int) slot_count - 1;
i >= (int) ios + 2; i--) {
empty_slots.add (i);
slots [i] = null;
}
}
//以上為if部分,只會在Ctx已經建立好,第一次建立Socket會進入的分支,由於是第一次建立Socket,所以需要對一些Ctx成員進行初始化。而之後只需要建立每個Socket對應的IOThread以及必要屬性即可。
int slot = empty_slots.pollLast();
int sid = max_socket_id.incrementAndGet();
//這一步比較重要,先建立一個SocketBase,它是所有Socket的父類
s = SocketBase.create (type_, this, slot, sid);
if (s == null) {
empty_slots.addLast(slot);
return null;
}
sockets.add (s);
slots [slot] = s.get_mailbox ();
來看看SocketBase.create(type_,this,slot,sid)都做了些什麼。
//省略了部分操作,實際是先根據我們要建立的Socket型別呼叫建構函式,然後在建構函式中用super呼叫父類也就是SocketBase的建構函式...
//給出部分重要成員
private int tag;
private boolean ctx_terminated;
private boolean destroyed;
private final Mailbox mailbox;
private final List<Pipe> pipes;
private Poller poller;
private SelectableChannel handle
private SocketBase monitor_socket;
protected SocketBase (Ctx parent_, int tid_, int sid_)
{
//調了ZObject的建構函式,因為ZObject是所有類的父類
super (parent_, tid_);
tag = 0xbaddecaf;
ctx_terminated = false;
destroyed = false;
last_tsc = 0;
ticks = 0;
rcvmore = false;
monitor_socket = null;
monitor_events = 0;
options.socket_id = sid_;
endpoints = new MultiMap<String, Own>();
//這個pipes在後期會起到非常大的作用
pipes = new ArrayList<Pipe>();
//建立了一個MailBox
mailbox = new Mailbox("socket-" + sid_);
errno = new ValueReference<Integer>(0);
...
return s;
}
那到這裡為止,我們已經獲得了所需的Socket,但是需要注意的是現在只是獲得Socket,但是該Socket還沒有跟地址進行繫結或者連結。
現在說connect部分,這部分比較長,所以分開說明。
//這裡就是先去看看有沒有需要執行的command,有的話先執行。這樣做的目的應該是假如我們關閉了上下文,理論上來說是不再處理任何請求。但是關閉上下文也是一個動作,發出一個command,經過之前對MailBox的講解,我們知道處理一個command其實是先放到一個佇列中,等待Poller的訊號在從佇列中取出command然後執行。這樣如果Poller要處理較多事件時,可能會推遲command的執行,個人認為在ZeroMQ中,command的優先順序是大於訊息的。所以基本在執行大部分動作前會先去看看佇列中有沒有待執行的command。以避免command等待過久而不執行的尷尬。
boolean brc = process_commands (0, false);
if (!brc)
return false;
...
//這裡沒什麼需要特別說明的,我們知道終端url的形式是像tcp://192.168.1.1:8000 這樣的形式存在的。所以這裡做的只是獲取IP,埠以及協議。
String protocol = uri.getScheme();
String address = uri.getAuthority();
String path = uri.getPath();
if (address == null)
address = path;
check_protocol (protocol);
...
//建立一個與該Socket對應的Session,一個Socket可以繫結多個Session
SessionBase session = SessionBase.create (io_thread, true, this, options, paddr);
在SessionBase中才是發生SocketChannel對接的地方,下面來看看它做了些什麼。
//與SocketBase類似,也是進行了一些基本的初始化工作
private boolean connect;
private Pipe pipe;
private final Set<Pipe> terminating_pipes;
private boolean incomplete_in;
private boolean pending;
private SocketBase socket;
private IOThread io_thread;
private boolean identity_sent;
private boolean identity_received;
private final Address addr;
private IOObject io_object;
public SessionBase(IOThread io_thread_, boolean connect_,
SocketBase socket_, Options options_, Address addr_) {
super(io_thread_, options_);
io_object = new IOObject(io_thread_);
connect = connect_;
pipe = null;
incomplete_in = false;
pending = false;
engine = null;
socket = socket_;
io_thread = io_thread_;
has_linger_timer = false;
identity_sent = false;
identity_received = false;
addr = addr_;
terminating_pipes = new HashSet <Pipe> ();
}
繼續看connect的過程。
...
if (options.delay_attach_on_connect != 1 || icanhasall) {
//這個parents在回收資源的時候起作用,維護層級關係
ZObject[] parents = {this, session};
Pipe[] pipes = {null, null};
//這是一個高水位線陣列,由於Socket根據不同型別會存在傳送緩衝區,接收緩衝區,或者一個公用緩衝區。雖然ZeroMQ沒有持久化操作。但是比如Req套接字,如果在Rep建立連線前就傳送訊息,實質是不會發出去的,會先快取在本地傳送快取區。同時,接收緩衝區也一樣,如果收到訊息還沒來的及處理,就會一直對方在接收快取區中。高水位線的作用就是給緩衝區定義一個大小,防止撐爆記憶體。
int[] hwms = {options.sndhwm, options.rcvhwm};
boolean[] delays = {options.delay_on_disconnect, options.delay_on_close};
//OK,接下去的3步操作,直接為Socket與Session進行資料互動奠定了基礎。我們仔細看下。
Pipe.pipepair (parents, pipes, hwms, delays);
attach_pipe (pipes [0], icanhasall);
session.attach_pipe (pipes [1]);
}
public static void pipepair(ZObject[] parents_, Pipe[] pipes_, int[] hwms_, boolean[] delays_) {
//從以下程式碼可以看到一個數據結構,它建立了2個YPipe物件(實質就是連結串列),然後又建立了2個Pipe,一般的1個Pipe需要一個寫端一個讀端,在這裡這兩個Pipe公用了2個YPipe。
//也就是說,現有2個Pipe,分別是A,B;2個YPipe,分別是Y1,Y2 。A,B公用Y1,Y2,Y1作為A的讀端,作為B的寫端;Y2作為B的讀端,作為A的寫端。結構圖如下
YPipe<Msg> upipe1 = new YPipe<Msg>(Msg.class, Config.message_pipe_granularity.getValue());
YPipe<Msg> upipe2 = new YPipe<Msg>(Msg.class, Config.message_pipe_granularity.getValue());
pipes_ [0] = new Pipe(parents_ [0], upipe1, upipe2,
hwms_ [1], hwms_ [0], delays_ [0]);
pipes_ [1] = new Pipe(parents_ [1], upipe2, upipe1,
hwms_ [0], hwms_ [1], delays_ [1]);
//兩個Pipe相互持有對方的引用
pipes_ [0].set_peer (pipes_ [1]);
pipes_ [1].set_peer (pipes_ [0]);
}
繼續看剩下的兩個操作
//這兩部操作就比較好說,因為我們有2個Pipe,但最後是SocketBase持有一個Pipe,另一個由SessionBase持有。這樣,SessionBase才能通過此與SocketBase進行資料互動,而實際上,傳送資料或者接收資料就是通過這兩個Pipe來流動的。
attach_pipe (pipes [0], icanhasall);
session.attach_pipe (pipes [1]);
傳送時,Pipe1通過LB向YPipe1寫入要傳送的資料,併發送read_activated命令,傳入引數為Pipe2,SessionBase中其實持有的就是Pipe2,所以Pipe2從YPipe讀取資料後由StreamEngine傳送
接收時,StreamEngine將訊息寫會SessionBase的Pipe,也就是Pipe2,從Pipe2寫入資料,其實是寫入到YPipe2,然後通知SocketBase中的Pipe1,Pipe1從YPipe2讀取資料
其實我覺得比如直接把Pipe設計成佇列的形式,同樣是兩個Pipe,SocketBase與SessionBase同時持有雙方引用,也能做到一樣的功能,也許是因為這樣做的話雙方要維護的Pipe對的引用數會加倍,所以沒有采用這種做法。
繼續講connect的最後一步。
add_endpoint (addr_, session);
//呼叫了 launch_child (endpoint_);方法
protected void launch_child (Own object_)
{
//這步是設定層級關係,回收資源時用到
object_.set_owner (this);
//其實質就是傳送了一個plug命令,因為傳進來的object_是SessionBase,所以plug操作最後由SessionBase來完成。
send_plug (object_);
send_own (this, object_);
}
//SessionBase中的process_plug操作
protected void process_plug ()
{
io_object.set_handler(this);
if (connect)
start_connecting (false);
}
//看到這裡似乎要發現了連線操作將要執行的細節了。
private void start_connecting (boolean wait_)
{
assert (connect);
assert (io_thread != null);
// Create the connecter object.
if (addr.protocol().equals("tcp")) {
TcpConnecter connecter = new TcpConnecter (
io_thread, this, options, addr, wait_);
//alloc_assert (connecter);
//沒錯,又是 launch_child,不過這次傳進去的物件是上面建立的 TcpConnecter。同樣的一會來看看 TcpConnecter裡的process_plug操作。
launch_child (connecter);
return;
}
if (addr.protocol().equals("ipc")) {
IpcConnecter connecter = new IpcConnecter (
io_thread, this, options, addr, wait_);
//alloc_assert (connecter);
launch_child (connecter);
return;
}
assert (false);
}
簡要說明下TcpConnector的重要成員以及部分操作
private final static