1. 程式人生 > >淺析ZeroMQ工作原理及其特點

淺析ZeroMQ工作原理及其特點

ZeroMQ的研究與學習

  • 簡介
  • 工作模式
  • 層級模型
  • 實現原理
  • 核心特點
  • 與其他MQ的簡單比較

ZeroMQ-logo

ZeroMQ的一百字概括

ZeroMQ看起來想一個可嵌入的網路庫,但其作用就像是一個併發框架。它為你提供了各種傳輸工具,如程序內,程序間,TCP和組播中進行原子訊息傳遞的套接字。你可以使用各種模式實現N對N的套接字連線,這些模式包括髮布訂閱,請求應答,扇出模式,管道模式。它的速度足夠快,因此可以充當叢集產品的結構,他的非同步IO模型提供了可擴充套件的多核應用程式,用非同步訊息來處理任務。它雖然是以C為原始碼進行開發,但是可以繫結多種語言。

1. 簡介

ZeroMQ號稱是“史上最快的訊息佇列”,基於c語言開發的,實時流處理sorm的task之間的通訊就是用的zeroMQ。
引用官方說法,“ZMQ(以下ZeroMQ簡稱ZMQ)是一個簡單好用的傳輸層,像框架一樣的一個socket library,他使得Socket程式設計更加簡單、簡潔和效能更高。是一個訊息處理佇列庫,可在多個執行緒、核心和主機盒之間彈性伸縮。ZMQ的明確目標是“成為標準網路協議棧的一部分,之後進入Linux核心”。現在還未看到它們的成功。但是,它無疑是極具前景的、並且是人們更加需要的“傳統”BSD套接字之上的一 層封裝。ZMQ讓編寫高效能網路應用程式極為簡單和有趣。” 確實,它跟RabbitMQ,ActiveMQ之類有著相當本質的區別,ZeroMQ根本就不是一個訊息佇列伺服器,更像是一組底層網路通訊庫,對原有的Socket API加上一層封裝,是我們操作更簡便。使用時只需要引入相應的jar包即可。

2. 工作模式

ZeroMQ與其他MQ類似,也實現了3中最基本的工作模式:釋出-訂閱,請求-應答,管道

1.釋出-訂閱
pub-sub

“釋出-訂閱”模式下,“釋出者”繫結一個指定的地址,例如“192.168.10.1:5500”,“訂閱者”連線到該地址。該模式下訊息流是單向的,只允許從“釋出者”流向“訂閱者”。且“釋出者”只管發訊息,不理會是否存在“訂閱者”。上圖只是“釋出-訂閱”的最基本的模型,一個“釋出者”可以擁有多個訂閱者,同樣的,一個“訂閱者”也可訂閱多個釋出者。下面給出“釋出-訂閱”模型的樣例程式:

釋出者:

 import org.zeromq.ZMQ;  

public
class Publisher { public static void main(String args[]) { ZMQ.Context context = ZMQ.context(1); //創建立包含一個I/O執行緒的context ZMQ.Socket publisher = context.socket(ZMQ.PUB); //建立一個publisher型別的socket,他可以向所有訂閱的subscriber廣播資料 publisher.bind("tcp://*:5555"); //將當前publisher繫結到5555埠上,可以接受subscriber的訂閱 while (!Thread.currentThread ().isInterrupted ()) { String message = "fjs hello"; //最開始可以理解為pub的channel,subscribe需要訂閱fjs這個channel才能接收到訊息 publisher.send(message.getBytes()); } publisher.close(); context.term(); } }

訂閱者

 import org.zeromq.ZMQ;  

public class Subscriber {  
    public static void main(String args[]) {  
        for (int j = 0; j < 100; j++) {  
            new Thread(new Runnable(){  

                public void run() {  
                    // TODO Auto-generated method stub  
                    ZMQ.Context context = ZMQ.context(1);  //建立1個I/O執行緒的上下文  
                    ZMQ.Socket subscriber = context.socket(ZMQ.SUB);     //建立一個sub型別,也就是subscriber型別的socket  
                    subscriber.connect("tcp://127.0.0.1:5555");    //與在5555埠監聽的publisher建立連線  
                    subscriber.subscribe("fjs".getBytes());     //訂閱fjs這個channel  

                    for (int i = 0; i < 100; i++) {  
                        byte[] message = subscriber.recv();  //接收publisher傳送過來的訊息  
                        System.out.println("receive : " + new String(message));  
                    }  
                    subscriber.close();  
                    context.term();  
                }  

            }).start();  
        }  


    }  
}  

雖然我們知道“釋出者”在傳送訊息時是不關心“訂閱者”的存在於否,所以先啟動“釋出者”,再啟動“訂閱者”是很容易導致部分訊息丟失的。那麼可能會提出一個說法“我先啟動‘訂閱者’,再啟動‘釋出者’,就能解決這個問題了?” 對於ZeroMQ而言,這種做法也並不能保證100%的可靠性。在ZeroMQ領域中有一個叫做“慢木匠”的術語,就是說即使我是先啟動了“訂閱者”,再啟動“釋出者”,“訂閱者”總是會丟失第一批資料。因為在“訂閱者”與端點建立TCP連線時,會包含幾毫秒的握手時間,雖然時間短,但是是存在的。再加上ZeroMQ後臺IO是以一部方式執行的,所以若不在雙方之間施加同步策略,訊息丟失是不可避免的。
關於“釋出-訂閱”模式在ZeroMQ中的一些其他特點:
1.公平排隊,一個“訂閱者”連線到多個釋出者時,會均衡的從每個“釋出者”讀取訊息,不會出現一個“釋出者”淹沒其他“釋出者”的情況。
2.ZMQ3.0以上的版本,過濾規則發生在“釋出方”. ZMQ3.0以下的版本,過濾規則發生在“訂閱方”。其實也就是處理訊息的位置。

2.請求-應答
req-res
說到“請求-應答”模式,不得不說的就是它的訊息流動模型以及資料包裝模型。
訊息流動模型指的是該模式下,必須嚴格遵守“一問一答”的方式。
在原始碼中Req存在兩個重要的標誌位:

private boolean receiving_reply;  //標誌位,如果是ture的話,表示request已經發送了,正在等待reponse  
private boolean message_begins;   //如果是true的話,那麼表示這裡還需要傳送第一個標誌的空msg 

發出訊息後,若沒有收到回覆,再發出第二條訊息時就會丟擲異常。同樣的,對於Rep也是,在沒有接收到訊息前,不允許發出訊息。基於此構成“一問一答”的響應模式。

對於訊息傳送時的具體資料格式,引入兩個圖作為參照:
含有識別碼:
has-address
不含識別碼:
no-address

這種資料結構ZeroMQ稱之為“封包”。一個封包由0個或多個“幀”組成。對於“請求-應答”模式,一個元封包一般由2-3個幀組成,可以看出,差別就是第一幀的存在與否。
對於含有目標地址的封包,第一幀存放訊息接收端的身份識別碼,該碼在ZeroMQ內部維護的一個Map中作為key,Value是對應的地址。第二幀是一個分隔幀,沒有任何意義,僅僅起分隔作用。第三幀是傳送的資料。對於這類封包,通常第一幀,也就是識別碼需要我們手動指定。
相比於前者,不含識別碼的封包內的幀的含義還是一樣,只是它的識別碼直接有ZeroMQ預設生成,無需手動指定。
傳送時,根據識別碼在記憶體Map中對應的地址,將訊息投遞過去。
示例程式:
服務端

 import org.zeromq.ZMQ;  

public class Response {  
    public static void main (String[] args) {  
        ZMQ.Context context = ZMQ.context(1);  //這個表示建立用於一個I/O執行緒的context  

        ZMQ.Socket socket = context.socket(ZMQ.REP);  //建立一個response型別的socket,他可以接收request傳送過來的請求,其實可以將其簡單的理解為服務端  
        socket.bind ("tcp://*:5555");    //繫結埠  
        int i = 0;  
        int number = 0;  
        while (!Thread.currentThread().isInterrupted()) {  
            i++;  
            if (i == 10000) {  
                i = 0;  
                System.out.println(++number);  
            }  
            byte[] request = socket.recv();  //獲取request傳送過來的資料  
            //System.out.println("receive : " + new String(request));  
            String response = "world";  
            socket.send(response.getBytes());  //向request端傳送資料  ,必須要要request端返回資料,沒有返回就又recv,將會出錯,這裡可以理解為強制要求走完整個request/response流程  
        }  
        socket.close();  //先關閉socket  
        context.term();  //關閉當前的上下文  


    }  
}  

客戶端

 import org.zeromq.ZMQ;  

public class Request {  
    public static void main(String args[]) {  
        for (int j = 0;  j < 5; j++) {  
            new Thread(new Runnable(){  

                public void run() {  
                    // TODO Auto-generated method stub  
                    ZMQ.Context context = ZMQ.context(1);  //建立一個I/O執行緒的上下文  
                    ZMQ.Socket socket = context.socket(ZMQ.REQ);   //建立一個request型別的socket,這裡可以將其簡單的理解為客戶端,用於向response端傳送資料  

                    socket.connect("tcp://127.0.0.1:5555");   //與response端建立連線  
                    long now = System.currentTimeMillis();  
                    for (int i = 0; i < 100000; i++) {  
                        String request = "hello";  
                        socket.send(request.getBytes());   //向reponse端傳送資料  
                        byte[] response = socket.recv();   //接收response傳送回來的資料  正在request/response模型中,send之後必須要recv之後才能繼續send,這可能是為了保證整個request/response的流程走完  
                    //  System.out.println("receive : " + new String(response));  
                    }  
                    long after = System.currentTimeMillis();  

                    System.out.println((after - now) / 1000);  
                }  

            }).start();;  
        }  

    }  
}  

“請求-應答”模式中Req套接字是同步的,每次只跟一個節點交流,如果Req套接字連線了多個節點,請求會同時分發到每一個節點。
相應的Rep套接字也是同步的,每次只跟一個節點交流,如果連線了多個節點,則以公平的方式以此從每個節點讀取請求,但是最先響應的是最後讀取的請求。
在接下來的內部結構分析時,將以“請求-應答”模式為例。

3.管道模式
channel

在說明“管道模式”前,需要明確的是在ZeroMQ中並沒有絕對的服務端與客戶端之分,所有的資料接收與傳送都是以連線為單位的,只區分ZeroMQ定義的型別。就像套接字繫結地址時,可以使用“bind”,也可以使用“connect”,只是通常我們將理解中的服務端“bind”到一個地址,而理解中的客戶端“connec”到該地址。

“管道模式”一般用於任務分發與結果收集,由一個任務發生器來產生任務,“公平”的派發到其管轄下的所有worker,完成後再由結果收集器來回收任務的執行結果。

任務發生器

import org.zeromq.ZMQ;  

public class Push {  
    public static void main(String args[]) {  

        ZMQ.Context context = ZMQ.context(1);  
        ZMQ.Socket push  = context.socket(ZMQ.PUSH);  
        push.bind("ipc://fjs");  

        for (int i = 0; i < 10000000; i++) {  
            push.send("hello".getBytes());  
        }  
        push.close();  
        context.term();  

    }  
}  

Worker

import java.util.concurrent.atomic.AtomicInteger;  

import org.zeromq.ZMQ;  

public class Pull {  
    public static void main(String args[]) {  
        final AtomicInteger number = new AtomicInteger(0);  
        for (int i = 0; i < 5; i++) {  
            new Thread(new Runnable(){  
                private int here = 0;  
                public void run() {  
                    // TODO Auto-generated method stub  
                    ZMQ.Context context = ZMQ.context(1);  
                    ZMQ.Socket pull = context.socket(ZMQ.PULL);  
                    pull.connect("ipc://fjs");  
                    //pull.connect("ipc://fjs");  
                    while (true) {  
                        String message = new String(pull.recv());  
                        int now = number.incrementAndGet();  
                        here++;  
                        if (now % 1000000 == 0) {  
                            System.out.println(now + "  here is : " + here);  
                        }  
                    }  
                }  

            }).start();  

        }  
    }  
}  

結果收集器

import org.zeromq.ZMQ;  

public class Pull {  
    public static void main(String args[]) {  
        ZMQ.Context context = ZMQ.context(1);  
        ZMQ.Socket pull = context.socket(ZMQ.PULL);  

        pull.bind("ipc://fjs");  

        int number = 0;  
        while (true) {  
            String message = new String(pull.recv());  
            number++;  
            if (number % 1000000 == 0) {  
                System.out.println(number);  
            }  
        }  
    }  
}

整體流程比較好理解,Worker連線到任務發生器上,等待任務的產生,完成後將結果傳送至結果收集器。如果要以客戶端服務端的概念來區分,這裡的任務發生器與結果收集器是服務端,而worker是客戶端。
前面說到了這裡任務的派發是“公平的”,因為內部採用了LRU的演算法來找到最近最久未工作的閒置worker。但是公平在這裡是相對的,當任務發生器啟動後,第一個連線到它的worker會在一瞬間承受整個任務發生器產生的tasks。

3. 層級模型與互動邏輯

層級模型

這是ZeroMQ的主要的層級模型,以“請求-應答”為例。
由上而下,最頂層的是ZObject與IPollEvent。
ZObject是所有ZeroMQ體系中類的父類,它存在的意義是傳送與接收命令(有別於訊息,命令是告訴ZeroMQ該做什麼,需要做什麼)。
IPollEvent則是一個介面,定義了若干操作,包括讀操作,寫操作,客戶端請求連線,服務端應答連線,超時操作等供5個操作,該操作的實現類包括Req,Rep等具體Socket。該介面的目的是定義終端間發生操作時的行為。
Ctx是一個上下文類,通常一個終端只需要建立一個上下文。
IOObject本身並沒有太多的屬性,主要是其內維護了一個IOThread
IOThread是用於處理命令的一個類,內部持有一個MailBox例項與Poller例項。
MailBox是一個重要的類,它被用作處理命令,包括命令的傳送與接收,需要注意的是,這裡的命令其實是本地傳送,也就是自己跟自己發,而不是端點間傳送。
Pipe用於處理接收到或者需要傳送的資料,是實際儲存待處理資料的資料結構,其內部是用佇列的形式實現。
LBFQ這兩者在官方給出的全名是“LoadBalance”,“FairQueue”。也就是負載均衡與公平排隊,分別用於處理要傳送的資料與要接收的資料。
SocketBase是例如Req,Rep,Pull等包裝後Socket的父類。其內含有一對Pipe(與SessionBase公用),用於在SocketBase與SessionBase之間傳遞訊息,具體傳遞過程在接下去或說明。
SessionBase是建立SocketChannel並與目標終端進行連線的地方,是與底層Poller最先進行互動的一層。具有超時重連,斷線重連等功能。
Poller是整個ZeroMQ的核心,它實現了命令的傳送與接收,資料的傳送與接收。由他來真正的傳送資料到其他終端,也是他處理來自其他終端的資料後交給SessionBase。

基於此層級模型的互動邏輯:

傳送訊息
Socket -> Session -> StreamEngine -> Poller
接收訊息
Poller -> StreamEngine -> Session -> Socket

4. 實現原理

這部分將說明從建立一個Socket開始到傳送或者接收資料的整個過程,ZeroMQ內部的處理流程。
不過我個人覺得先了解一些在底層的原理,對於整體的實現理解會有更好的幫助。

先看一下Poller的一些重要定義

 private static class PollSet {  
       protected IPollEvents handler;   //事件的回撥  
       protected SelectionKey key;   //註冊之後的key  
       protected int ops;    //註冊的事件  
       protected boolean cancelled;   //是否已經取消  

       protected PollSet(IPollEvents handler) {  
           this.handler = handler;  
           key = null;  
           cancelled = false;  
           ops = 0;  
       }  
   }  
   final private Map<SelectableChannel, PollSet> fd_table;   //記錄所有的註冊,key是channel  

   private boolean retired;    //當前註冊的物件是否有更新,如果有更新的話,在執行select之前需要先更新註冊  

   volatile private boolean stopping;    //如果是true的話,那麼執行執行緒將會停止  
   volatile private boolean stopped;   //是否已經停止  

   private Thread worker;   //worker執行緒  
   private Selector selector;   //selector  
   final private String name;   //名字  

PollerSet是Poller的一個巢狀類,所有需要註冊到selector上的channel都會先構建這個物件,將其當做附件註冊到selector上。其中handler是事件回撥(一般是一個IOObject例項),key是selector註冊後取得的key,ops是註冊的事件型別

fd_table用於維護註冊的channel物件與其的PollSet物件之間的對映關係。

retired用於標識當前的註冊的channel什麼的是否有更新,若是需要更新,則可能會重新生成key。

接下來來看看如何在poller物件上面註冊channel吧,有幾個比較重要的方法:


    //用於在當前的集合裡面新增需要註冊的channel,第一個引數是channel,第二個引數是事件回撥  
    public final void add_fd (SelectableChannel fd_, IPollEvents events_) {  
        fd_table.put(fd_, new PollSet(events_));  //直接把放到map裡面就好了  
        adjust_load (1);  //增加load值,這裡所謂的負載其實就是在當前poller裡面註冊的channel的數量  
    }  
    //在key上面註冊事件,如果negate為true的話,那麼表示是取消事件  
    private final void register (SelectableChannel handle_, int ops, boolean negate) {  
        PollSet pollset = fd_table.get(handle_);  //獲取pollset物件  

        if (negate)  {  
            pollset.ops = pollset.ops &~ ops;  //取反,相當於取消事件  
        } else {  
            pollset.ops = pollset.ops | ops;  //註冊事件  
        }  

        if (pollset.key != null) {  //如果有key了,那麼表示已經註冊到selector上面了,那麼只需要更新key就好了  
            pollset.key.interestOps(pollset.ops);    
        } else {  
            retired = true;  

        }  
    }  

可見在Poller裡註冊一個事件主要分為兩步 1.放入map中 2.設定PollerSet的相應屬性

Poller本身作為一個執行緒,來看看它的run方法

 public void run () {  
    int returnsImmediately = 0;  

    while (!stopping) {  
        long timeout = execute_timers ();  //執行所有的超時,並且獲取下一個超時的時間  

        if (retired) {  //這裡表示註冊的東西有更新  

            Iterator <Map.Entry <SelectableChannel,PollSet>> it = fd_table.entrySet ().iterator ();  
            while (it.hasNext ()) {  //遍歷所有需要註冊的  
                Map.Entry <SelectableChannel,PollSet> entry = it.next ();  
                SelectableChannel ch = entry.getKey ();  //獲取channel  
                PollSet pollset = entry.getValue ();   //獲取pollset  
                if (pollset.key == null) {  //這裡沒有key的話,表示當前channel並沒有註冊到selector上面去  
                    try {  
                        pollset.key = ch.register(selector, pollset.ops, pollset.handler);   //註冊 
                    } catch (ClosedChannelException e) {  
                    }  
                }   


                if (pollset.cancelled || !ch.isOpen()) {  //如果是取消註冊,那麼直接取消掉就可以了  
                    if(pollset.key != null) {  
                        pollset.key.cancel();  
                    }  
                    it.remove ();  
                }  
            }  
            retired = false;  

        }  

        //  Wait for events.  
        int rc;  
        long start = System.currentTimeMillis ();  //select之前的時間  
        try {  
            rc = selector.select (timeout);  
        } catch (IOException e) {  
            throw new ZError.IOException (e);  
        }  

        if (rc == 0) {   //出錯啦,好像  
            //  Guess JDK epoll bug  
            if (timeout == 0 ||  
                    System.currentTimeMillis () - start < timeout / 2)  
                returnsImmediately ++;  
            else  
                returnsImmediately = 0;  

            if (returnsImmediately > 10) {  
                rebuildSelector ();   //重建selector  
                returnsImmediately = 0;  
            }  
            continue;  
        }  


        Iterator<SelectionKey> it = selector.selectedKeys().iterator();  //所有select出來的key  
        while (it.hasNext()) {  //遍歷  
            SelectionKey key = it.next();  
            IPollEvents evt = (IPollEvents) key.attachment();  
            it.remove();  

            try {  //接下來就是判斷事件的型別執行相應的方法就好了  
                if (key.isReadable() ) {  //有資料可以讀取了   
                    evt.in_event();  
                } else if (key.isAcceptable()) {  //有新的連線進來了  
                    evt.accept_event();  
                } else if (key.isConnectable()) {  //連線建立  
                    evt.connect_event();  
                }   
                if (key.isWritable()) {  //可寫  
                    evt.out_event();  
                }   
            } catch (CancelledKeyException e) {  
                // channel might have been closed  
            }  

        }  

    }  

    stopped = true;  

}  

這部分還是好理解的,首先是檢查fd_table是否需要更新,其實就是有沒有新插入的channel或者有channel已經失效,由retired標誌位決定。如果需要更新,遍歷map中每個元素,檢查PollerSet裡的key,如果沒有,則在Selector上進行註冊。
然後呼叫selector.select(),若是有事件到來,根據其事件型別以及註冊事件時一併傳入的handle來決定執行何種操作。
簡單來說Poller就是一個輪詢器,我們在它的Selector上註冊相應的channel與事件。而Poller定期掃描來捕獲channel的狀態。同時我們也瞭解到一點,Poller才是真正的IO執行緒持有者。

粗淺的說明了Poller之後,再來看看MailBox

同樣,先是介紹一些重要的屬性


    private final YPipe<Command> cpipe;   //一個用來儲存command的佇列,內部以連結串列的形式實現 

    private final Signaler signaler;   //其實也是一個實現了一個SocketChannel,但是不對外發送訊息,而是向Poller傳送空白訊息,以提醒command佇列中有命令需要處理  

    private final Lock sync;  //只有一個執行緒從mailbox裡面收命令,但是會有很多執行緒向mialbox裡面傳送命令,用這個鎖來進行同步  

    public SelectableChannel get_fd () {  
        return signaler.get_fd ();   //這裡其實獲取的是signal用到的pipe的讀channel  
    }  

    //向當前的mailbox傳送命令,其實就是寫到command佇列裡面去而已  
    public void send (final Command cmd_) {     
        boolean ok = false;  
        sync.lock ();  
        try {  
            cpipe.write (cmd_, false);  
            ok = cpipe.flush ();  //pipeflush,這裡將會被selector感應到,從而可以執行相應的處理,在執行執行緒裡面執行命令  
        } finally {  
            sync.unlock ();  
        }  

        if (!ok) {  
            signaler.send (); //通過寫端寫資料,這樣子的話會被讀端收到  
        }  
    }  

    //收取命令,如果這裡無法立刻獲取命令的話,還可以有一個超時時間  
    public Command recv (long timeout_)  {  
        Command cmd_ = null;  
        //  Try to get the command straight away.  
        if (active) {  
            cmd_ = cpipe.read ();  //從佇列裡面獲取命令  
            if (cmd_ != null) {  

                return cmd_;  
            }  
            //  If there are no more commands available, switch into passive state.  
            active = false;  
            signaler.recv ();  //這裡會從讀端不斷的讀資料  
        }  


        //  Wait for signal from the command sender.  
        boolean rc = signaler.wait_event (timeout_);  
        if (!rc)  
            return null;  

        //  We've got the signal. Now we can switch into active state.  
        active = true;  

        //  Get a command.  
        cmd_ = cpipe.read ();  
        assert (cmd_ != null);  

        return cmd_;  
    }  

MailBox,就像之前說過的,只是用於處理命令的一個東西。命令的讀寫都是對本地的一個佇列進行操作。需要注意的是在寫命令與讀命令之間需要有Signaler來充當訊號通知者。
Signaler內部有一組變數:

private Pipe.SinkChannel w;     資料寫入端
private Pipe.SourceChannel r;   資料讀取端

將SourceChannel 註冊到了poller內。 這樣,當命令寫入到佇列中,會觸發SinkChannel的write操作,通過SinkChannel向SourceChannel寫資料,此時會被poller內的selector感知到。
由於IOThred在向poller註冊時,傳入的回撥是“this”,也就是本身,在發生in_event(讀事件)時,實際呼叫的時IOThread的in_event。
然後IOThread中的in_event從MailBox中讀取資料,實質是從YPipe中讀取command。
對於Signaler的作用,只是用於提醒poller有命令,它向SinkChannel內寫入的資料其實是一個大小為1沒有意義的ByteBuffer。只是用於觸發在poller內註冊的SourceChannel的Readable事件。
需要明確的是,command都是針對於本地的。不會在兩臺不同的機器間傳送command,因為send_command並沒有走socketchannel,所以不可能通過網路傳送。

command
MailBox裡面的邏輯大致就是如此:
1.命令寫入YPipe
2.Signaler提醒Poller,啟用in_event
3.MailBox從YPipe讀取命令並執行

ok,一些基本的概念說的差不多了,接下來開始說明Socket的建立以及訊息的傳送過程。

//這是一個建立上下文,Socket,與目標端進行連線,傳送資料以及接收資料的客戶端程式碼
Context context = ZMQ.context(1);
Socket worker = context.socket(ZMQ.REQ);
worker.connect("tcp://localhost:5671");
worker.send ("Hi Boss");
String workload = worker.recvStr ();
Sysout.out.println(workload);
1.建立上下文
    private final List<SocketBase> sockets;
    private final Deque<Integer> empty_slots;
    private volatile boolean starting;
    private boolean terminating;
    private final Lock slot_sync;
    private Reaper reaper;
    private final List<IOThread> io_threads;
    private int slot_count;
    private Mailbox[] slots;
    private final Mailbox term_mailbox;
    private final Lock endpoints_sync;
    private static AtomicInteger max_socket_id = new AtomicInteger(0);
    private int max_sockets;
    private int io_thread_count;
    private final Lock opt_sync;
    public static final int term_tid = 0;
    public static final int reaper_tid = 1;

上面給出了Ctx內的一些重要的成員變數,初始化過程中呼叫了init_ctx(),返回一個Ctx物件,此時僅僅只是對部分成員變數做了一個初始賦值,並沒有特殊操作。

2.建立Socket
//擷取部分程式碼,基本上能表示整個過程
if (starting) {

                starting = false;
                opt_sync.lock ();
                int mazmq = max_sockets;
                int ios = io_thread_count;
                opt_sync.unlock ();
                slot_count = mazmq + ios + 2;
                slots = new Mailbox[slot_count];
                slots [term_tid] = term_mailbox;
                reaper = new Reaper (this, reaper_tid);
                slots [reaper_tid] = reaper.get_mailbox ();
                reaper.start ();
                //以上部分建立的Reaper物件與兩個MailBox是作用於上下文銷燬的時候處理剩餘訊息以及釋放佔用資源。
                //下面是需要關注的部分,ios是在建立Ctx時我們指定需要建立的IO執行緒數,通常情況1個就足夠了。根據我們指定的數量建立相應的IOThread,每個IOThread都有他子身的MailBox。
                for (int i = 2; i != ios + 2; i++) {
                //建立IOThread物件的時候會建立一個Poller,以及一個MailBox,同時,將MailBox對應的Signaler的SourceChannel註冊到Poller中以監聽有無command需要執行。
                    IOThread io_thread = new IOThread (this, i);
                    io_threads.add(io_thread);
                    slots [i] = io_thread.get_mailbox ();
                    //啟動Poller
                    io_thread.start ();
                }
                for (int i = (int) slot_count - 1;
                      i >= (int) ios + 2; i--) {
                    empty_slots.add (i);
                    slots [i] = null;
                }

            }
            //以上為if部分,只會在Ctx已經建立好,第一次建立Socket會進入的分支,由於是第一次建立Socket,所以需要對一些Ctx成員進行初始化。而之後只需要建立每個Socket對應的IOThread以及必要屬性即可。
            int slot = empty_slots.pollLast();
            int sid = max_socket_id.incrementAndGet();
            //這一步比較重要,先建立一個SocketBase,它是所有Socket的父類
            s = SocketBase.create (type_, this, slot, sid);
            if (s == null) {
                empty_slots.addLast(slot);
                return null;
            }
            sockets.add (s);
            slots [slot] = s.get_mailbox ();

來看看SocketBase.create(type_,this,slot,sid)都做了些什麼。

//省略了部分操作,實際是先根據我們要建立的Socket型別呼叫建構函式,然後在建構函式中用super呼叫父類也就是SocketBase的建構函式...
//給出部分重要成員
private int tag;
    private boolean ctx_terminated;
    private boolean destroyed;
    private final Mailbox mailbox;
    private final List<Pipe> pipes;
    private Poller poller;
    private SelectableChannel handle
    private SocketBase monitor_socket;

protected SocketBase (Ctx parent_, int tid_, int sid_) 
    {
        //調了ZObject的建構函式,因為ZObject是所有類的父類
        super (parent_, tid_);
        tag = 0xbaddecaf;
        ctx_terminated = false;
        destroyed = false;
        last_tsc = 0;
        ticks = 0;
        rcvmore = false;
        monitor_socket = null;
        monitor_events = 0;

        options.socket_id = sid_;

        endpoints = new MultiMap<String, Own>();
        //這個pipes在後期會起到非常大的作用
        pipes = new ArrayList<Pipe>();
        //建立了一個MailBox
        mailbox = new Mailbox("socket-" + sid_);

        errno = new ValueReference<Integer>(0);
        ...
        return s;
    }

那到這裡為止,我們已經獲得了所需的Socket,但是需要注意的是現在只是獲得Socket,但是該Socket還沒有跟地址進行繫結或者連結。

現在說connect部分,這部分比較長,所以分開說明。

//這裡就是先去看看有沒有需要執行的command,有的話先執行。這樣做的目的應該是假如我們關閉了上下文,理論上來說是不再處理任何請求。但是關閉上下文也是一個動作,發出一個command,經過之前對MailBox的講解,我們知道處理一個command其實是先放到一個佇列中,等待Poller的訊號在從佇列中取出command然後執行。這樣如果Poller要處理較多事件時,可能會推遲command的執行,個人認為在ZeroMQ中,command的優先順序是大於訊息的。所以基本在執行大部分動作前會先去看看佇列中有沒有待執行的command。以避免command等待過久而不執行的尷尬。
 boolean brc = process_commands (0, false);
        if (!brc)
            return false;

...

//這裡沒什麼需要特別說明的,我們知道終端url的形式是像tcp://192.168.1.1:8000 這樣的形式存在的。所以這裡做的只是獲取IP,埠以及協議。
String protocol = uri.getScheme();
        String address = uri.getAuthority();
        String path = uri.getPath();
        if (address == null)
            address = path;

        check_protocol (protocol);

...

//建立一個與該Socket對應的Session,一個Socket可以繫結多個Session
SessionBase session = SessionBase.create (io_thread, true, this, options, paddr);

在SessionBase中才是發生SocketChannel對接的地方,下面來看看它做了些什麼。

//與SocketBase類似,也是進行了一些基本的初始化工作
    private boolean connect;
    private Pipe pipe;
    private final Set<Pipe> terminating_pipes;
    private boolean incomplete_in;
    private boolean pending;
    private SocketBase socket;
    private IOThread io_thread;
    private boolean identity_sent;
    private boolean identity_received;
    private final Address addr;

    private IOObject io_object;
public SessionBase(IOThread io_thread_, boolean connect_,
            SocketBase socket_, Options options_, Address addr_) {
        super(io_thread_, options_);
        io_object = new IOObject(io_thread_);

        connect = connect_;
        pipe = null;
        incomplete_in = false;
        pending = false;
        engine = null;
        socket = socket_;
        io_thread = io_thread_;
        has_linger_timer = false;
        identity_sent = false;
        identity_received = false;
        addr = addr_;

        terminating_pipes = new HashSet <Pipe> ();
    }

繼續看connect的過程。

...
if (options.delay_attach_on_connect != 1 || icanhasall) {
           //這個parents在回收資源的時候起作用,維護層級關係
            ZObject[] parents = {this, session};
            Pipe[] pipes = {null, null};
            //這是一個高水位線陣列,由於Socket根據不同型別會存在傳送緩衝區,接收緩衝區,或者一個公用緩衝區。雖然ZeroMQ沒有持久化操作。但是比如Req套接字,如果在Rep建立連線前就傳送訊息,實質是不會發出去的,會先快取在本地傳送快取區。同時,接收緩衝區也一樣,如果收到訊息還沒來的及處理,就會一直對方在接收快取區中。高水位線的作用就是給緩衝區定義一個大小,防止撐爆記憶體。
            int[] hwms = {options.sndhwm, options.rcvhwm};
            boolean[] delays = {options.delay_on_disconnect, options.delay_on_close};
            //OK,接下去的3步操作,直接為Socket與Session進行資料互動奠定了基礎。我們仔細看下。
            Pipe.pipepair (parents, pipes, hwms, delays);

            attach_pipe (pipes [0], icanhasall);

            session.attach_pipe (pipes [1]);
        }
public static void pipepair(ZObject[] parents_, Pipe[] pipes_, int[] hwms_, boolean[] delays_) {
       //從以下程式碼可以看到一個數據結構,它建立了2個YPipe物件(實質就是連結串列),然後又建立了2個Pipe,一般的1個Pipe需要一個寫端一個讀端,在這裡這兩個Pipe公用了2個YPipe。
       //也就是說,現有2個Pipe,分別是A,B;2個YPipe,分別是Y1,Y2 。A,B公用Y1,Y2,Y1作為A的讀端,作為B的寫端;Y2作為B的讀端,作為A的寫端。結構圖如下
        YPipe<Msg> upipe1 = new YPipe<Msg>(Msg.class, Config.message_pipe_granularity.getValue());
        YPipe<Msg> upipe2 = new YPipe<Msg>(Msg.class, Config.message_pipe_granularity.getValue());

        pipes_ [0] = new Pipe(parents_ [0], upipe1, upipe2,
            hwms_ [1], hwms_ [0], delays_ [0]);
        pipes_ [1] = new Pipe(parents_ [1], upipe2, upipe1,
            hwms_ [0], hwms_ [1], delays_ [1]);

         //兩個Pipe相互持有對方的引用       
        pipes_ [0].set_peer (pipes_ [1]);
        pipes_ [1].set_peer (pipes_ [0]);

    }

pipe

繼續看剩下的兩個操作

//這兩部操作就比較好說,因為我們有2個Pipe,但最後是SocketBase持有一個Pipe,另一個由SessionBase持有。這樣,SessionBase才能通過此與SocketBase進行資料互動,而實際上,傳送資料或者接收資料就是通過這兩個Pipe來流動的。

 attach_pipe (pipes [0], icanhasall);

 session.attach_pipe (pipes [1]);

傳送時,Pipe1通過LB向YPipe1寫入要傳送的資料,併發送read_activated命令,傳入引數為Pipe2,SessionBase中其實持有的就是Pipe2,所以Pipe2從YPipe讀取資料後由StreamEngine傳送
接收時,StreamEngine將訊息寫會SessionBase的Pipe,也就是Pipe2,從Pipe2寫入資料,其實是寫入到YPipe2,然後通知SocketBase中的Pipe1,Pipe1從YPipe2讀取資料
其實我覺得比如直接把Pipe設計成佇列的形式,同樣是兩個Pipe,SocketBase與SessionBase同時持有雙方引用,也能做到一樣的功能,也許是因為這樣做的話雙方要維護的Pipe對的引用數會加倍,所以沒有采用這種做法。

繼續講connect的最後一步。

 add_endpoint (addr_, session);

 //呼叫了 launch_child (endpoint_);方法
 protected void launch_child (Own object_)
    {
        //這步是設定層級關係,回收資源時用到
        object_.set_owner (this);
        //其實質就是傳送了一個plug命令,因為傳進來的object_是SessionBase,所以plug操作最後由SessionBase來完成。
        send_plug (object_);

        send_own (this, object_);
    }

    //SessionBase中的process_plug操作
     protected void process_plug ()
    {
        io_object.set_handler(this);
        if (connect)
            start_connecting (false);
    }
    //看到這裡似乎要發現了連線操作將要執行的細節了。

     private void start_connecting (boolean wait_)
    {
        assert (connect);
        assert (io_thread != null);

        //  Create the connecter object.

        if (addr.protocol().equals("tcp")) {
            TcpConnecter connecter = new TcpConnecter (
                io_thread, this, options, addr, wait_);
            //alloc_assert (connecter);
            //沒錯,又是 launch_child,不過這次傳進去的物件是上面建立的 TcpConnecter。同樣的一會來看看 TcpConnecter裡的process_plug操作。
            launch_child (connecter);
            return;
        }

        if (addr.protocol().equals("ipc")) {
            IpcConnecter connecter = new IpcConnecter (
                io_thread, this, options, addr, wait_);
            //alloc_assert (connecter);
            launch_child (connecter);
            return;
        }

        assert (false);
    }

簡要說明下TcpConnector的重要成員以及部分操作

    private final static