1. 程式人生 > >ZeroMQ原始碼分析之Context

ZeroMQ原始碼分析之Context

在庫中使用全域性變數並不理想.一個庫也許會被程式載入很多次,但即便如此,也只會存在唯一一個全域性變數集.

Figure24.1: ØMQ being used by different libraries

24.1中兩個不同且獨立的庫都使用了ZeroMQ,然後應用程式使用了這兩個庫.

當這種情況出現時,兩個ZeroMQ例項都訪問了相同的變數,會導致競爭條件,奇怪的錯誤和未定義行為.

為了防止這種問題的發生,ZeroMQ庫並沒有全域性變數.取而代之的是,由使用該庫的使用者負責顯式建立全域性狀態.我們把這個包含全域性狀態的物件稱為context.從使用者的角度看,這個context看起來像是一個工作執行緒池

,而從ZeroMQ的角度看,它只不過是一個儲存我們需要的一些全域性狀態的一個物件.在上面這幅圖中,libA會擁有自己的context,libB也同樣擁有自己的context.不會出現其中一個破壞或覆蓋另一個的情況.

在程式中使用ZeroMQ,需要先建立context,程式碼如下:

void *context =zmq_ctx_new();

zmq_ctx_new()實現如下:

zmq::ctx_t *ctx =new(std::nothrow) zmq::ctx_t;
return ctx;

我們接著去ctx.cpp檔案中看一下ctx_t類的建構函式中做了什麼工作:

zmq::ctx_t::ctx_t () :
    tag (ZMQ_CTX_TAG_VALUE_GOOD),
    starting (true),
    terminating (false),
    reaper (NULL),
    slot_count (0),
    slots (NULL),
    max_sockets (clipped_maxsocket (ZMQ_MAX_SOCKETS_DFLT)),
    io_thread_count (ZMQ_IO_THREADS_DFLT),
    ipv6 (false)
{
#ifdef HAVE_FORK
    pid = getpid();
#endif
}

果然如上文中提到的,context只是一個儲存我們需要的一些全域性狀態的一個物件,它在建構函式中初始化了一些全域性狀態:

// Used to checkwhether the object is a context.
uint32_t tag;

// If true,zmq_init has been called but no socket has been created
// yet. Launchingof I/O threads is delayed.
bool starting;
// If true,zmq_term was already called.
bool terminating;

// The reaperthread.
zmq::reaper_t*reaper;

// Array ofpointers to mailboxes for both application and I/O threads.
uint32_t slot_count;
mailbox_t **slots;

// Maximum numberof sockets that can be opened at the same time.
int max_sockets;

// Number of I/Othreads to launch.
int io_thread_count;

// Is IPv6 enabledon this context?
bool ipv6;

// the process thatcreated this context. Used to detect forking.
pid_t pid;

接著來看一下context中的全域性狀態是如何影響整個ZeroMQ呼叫過程的.

我們開始建立第一個socket,在應用程式程式碼中有:

void *responder =zmq_socket(context, ZMQ_REP);

zmq.cpp中對應的呼叫實現函式為:

void *zmq_socket (void *ctx_, int type_)
{
    if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) {
        errno = EFAULT;
        return NULL;
    }
    zmq::ctx_t *ctx = (zmq::ctx_t*) ctx_;
    zmq::socket_base_t *s = ctx->create_socket (type_);
    return (void *) s;
}

可見建立socket是使用context中的建立函式來建立的,我們去ctx.cpp中看對應的建立socket實現函式(附上中文註釋):

zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
{
    slot_sync.lock ();
    if (unlikely (starting)) {    //  初始化mailboxes佇列.多出來的兩個slots 是為
                                  // zmq_ctx_term執行緒andreaper 執行緒準備的. 

        starting = false;
        //  初始化mailboxes佇列.多出來的兩個slots 是為
        // zmq_ctx_term執行緒andreaper 執行緒準備的. 
        opt_sync.lock ();
        int mazmq = max_sockets;
        int ios = io_thread_count;
        opt_sync.unlock ();
        slot_count = mazmq + ios + 2;
        slots = (mailbox_t**) malloc (sizeof (mailbox_t*) * slot_count);
        alloc_assert (slots);

        //  Initialise the infrastructure for zmq_ctx_term thread.
        slots [term_tid] = &term_mailbox;

        //  建立reaper執行緒. 
        reaper = new (std::nothrow) reaper_t (this, reaper_tid);
        alloc_assert (reaper);
        slots [reaper_tid] = reaper->get_mailbox ();
        reaper->start ();

        //  建立I/O執行緒物件並啟動它們. 
        for (int i = 2; i != ios + 2; i++) {
            io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i);
            alloc_assert (io_thread);
            io_threads.push_back (io_thread);
            slots [i] = io_thread->get_mailbox ();
            io_thread->start ();
        }
 
        //  為slot佇列中沒有使用的部分,建立一個空slots列表指向它們
        for (int32_t i = (int32_t) slot_count - 1;
              i >= (int32_t) ios + 2; i--) {
            empty_slots.push_back (i);
            slots [i] = NULL;
        }
    }

    
         //  一旦zmq_ctx_term()被呼叫,我們就不能建立新的sockets了.
    if (terminating) {
        slot_sync.unlock ();
        errno = ETERM;
        return NULL;
    }

    // 如果達到了最大sockets數的限制,返回錯誤.
    if (empty_slots.empty ()) {
        slot_sync.unlock ();
        errno = EMFILE;
        return NULL;
    }

    // 在空的slot列表中選擇一個slot給新建socket.
    uint32_t slot = empty_slots.back ();
    empty_slots.pop_back ();
 
    // 產生一個新的唯一ID給socket.
    int sid = ((int) max_socket_id.add (1)) + 1;

    // 建立這個socket並且註冊它的mailbox.
    socket_base_t *s = socket_base_t::create (type_, this, slot, sid);
    if (!s) {
        empty_slots.push_back (slot);
        slot_sync.unlock ();
        return NULL;
    }
    sockets.push_back (s);
    slots [slot] = s->get_mailbox ();

    slot_sync.unlock ();
    return s;
}

接下來分析呼叫ZeroMQ程式中出現的如下兩行程式碼:

zmq_close(responder);
zmq_ctx_destroy(context);

zmq.cpp:

int zmq_close (void *s_)
{
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
        return -1;
    }
    ((zmq::socket_base_t*) s_)->close ();
    return 0;
}

可見關閉socket是由socket自己來做的,無需context插手.

接下來看zmq.cpp:

int zmq_ctx_destroy (void *ctx_)
{
    return zmq_ctx_term (ctx_);
}
int zmq_ctx_term (void *ctx_)
{
    if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) {
        errno = EFAULT;
        return -1;
    }

    int rc = ((zmq::ctx_t*) ctx_)->terminate ();
    int en = errno;

    //  Shut down only if termination was not interrupted by a signal.
    if (!rc || en != EINTR) {
#ifdef ZMQ_HAVE_WINDOWS
        //  On Windows, uninitialise socket layer.
        rc = WSACleanup ();
        wsa_assert (rc != SOCKET_ERROR);
#endif

#if defined ZMQ_HAVE_OPENPGM
        //  Shut down the OpenPGM library.
        if (pgm_shutdown () != TRUE)
            zmq_assert (false);
#endif
    }

    errno = en;
    return rc;
}

我們繼續去ctx.cpp中看:

int zmq::ctx_t::terminate ()
{
    // Connect up any pending inproc connections, otherwise we will hang
    pending_connections_t copy = pending_connections;
    for (pending_connections_t::iterator p = copy.begin (); p != copy.end (); ++p) {
        zmq::socket_base_t *s = create_socket (ZMQ_PAIR);
        s->bind (p->first.c_str ());
        s->close ();
    }

    slot_sync.lock ();
    if (!starting) {

#ifdef HAVE_FORK
        if (pid != getpid())
        {
            // we are a forked child process. Close all file descriptors
            // inherited from the parent.
            for (sockets_t::size_type i = 0; i != sockets.size (); i++)
            {
                sockets[i]->get_mailbox()->forked();
            }

            term_mailbox.forked();
        }
#endif
        //  Check whether termination was already underway, but interrupted and now
        //  restarted.
        bool restarted = terminating;
        terminating = true;

        //  First attempt to terminate the context.
        if (!restarted) {

            //  First send stop command to sockets so that any blocking calls
            //  can be interrupted. If there are no sockets we can ask reaper
            //  thread to stop.
            for (sockets_t::size_type i = 0; i != sockets.size (); i++)
                sockets [i]->stop ();
            if (sockets.empty ())
                reaper->stop ();
        }
        slot_sync.unlock();

        //  Wait till reaper thread closes all the sockets.
        command_t cmd;
        int rc = term_mailbox.recv (&cmd, -1);
        if (rc == -1 && errno == EINTR)
            return -1;
        errno_assert (rc == 0);
        zmq_assert (cmd.type == command_t::done);
        slot_sync.lock ();
        zmq_assert (sockets.empty ());
    }
    slot_sync.unlock ();

    //  Deallocate the resources.
    delete this;

    return 0;
}

相關推薦

ZeroMQ原始碼分析Context

在庫中使用全域性變數並不理想.一個庫也許會被程式載入很多次,但即便如此,也只會存在唯一一個全域性變數集. Figure24.1: ØMQ being used by different libraries 圖24.1中兩個不同且獨立的庫都使用了ZeroMQ庫,然後應用

jQuery原始碼分析jQuery(selector,context)詳解

首先我們給出下面的HTML程式碼: <div id="parent" class="parent"> <div class="child"> child1 </div> <div class="child">

Spark原始碼分析Spark Shell(上)

https://www.cnblogs.com/xing901022/p/6412619.html 文中分析的spark版本為apache的spark-2.1.0-bin-hadoop2.7。 bin目錄結構: -rwxr-xr-x. 1 bigdata bigdata 1089 Dec

Netty 原始碼分析拆包器的奧祕

為什麼要粘包拆包 為什麼要粘包 首先你得了解一下TCP/IP協議,在使用者資料量非常小的情況下,極端情況下,一個位元組,該TCP資料包的有效載荷非常低,傳遞100位元組的資料,需要100次TCP傳送,100次ACK,在應用及時性要求不高的情況下,將這100個有效資料拼接成一個數據包,那會縮短到一個TCP資

Android原始碼分析為什麼在onCreate() 和 onResume() 獲取不到 View 的寬高

轉載自:https://www.jianshu.com/p/d7ab114ac1f7 先來看一段很熟悉的程式碼,可能在最開始接觸安卓的時候,大部分人都寫過的一段程式碼;即嘗試在 onCreate() 和 onResume() 方法中去獲取某個 View 的寬高資訊: 但是列印輸出後,我們會發

netty原始碼分析服務端啟動

ServerBootstrap與Bootstrap分別是netty中服務端與客戶端的引導類,主要負責服務端與客戶端初始化、配置及啟動引導等工作,接下來我們就通過netty原始碼中的示例對ServerBootstrap與Bootstrap的原始碼進行一個簡單的分析。首先我們知道這兩個類都繼承自AbstractB

SNMP原始碼分析(一)配置檔案部分

snmpd.conf想必不陌生。在程序啟動過程中會去讀取配置檔案中各個配置。其中幾個引數需要先知道是幹什麼的:   token:配置檔案的每行的開頭,例如 group MyROGroup v1 readSec 這行token的引數是group。  

【kubernetes/k8s原始碼分析】kubelet原始碼分析cdvisor原始碼分析

  資料流 UnsecuredDependencies -> run   1. cadvisor.New初始化 if kubeDeps.CAdvisorInterface == nil { imageFsInfoProvider := cadv

【kubernetes/k8s原始碼分析】kubelet原始碼分析容器網路初始化原始碼分析

一. 網路基礎   1.1 網路名稱空間的操作 建立網路名稱空間: ip netns add 名稱空間內執行命令: ip netns exec 進入名稱空間: ip netns exec bash   1.2 bridge-nf-c

【kubernetes/k8s原始碼分析】kubelet原始碼分析資源上報

0. 資料流   路徑: pkg/kubelet/kubelet.go   Run函式() ->   syncNodeStatus ()  ->   registerWithAPIServer() ->

【kubernetes/k8s原始碼分析】kubelet原始碼分析啟動容器

主要是呼叫runtime,這裡預設為docker 0. 資料流 NewMainKubelet(cmd/kubelet/app/server.go) -> NewKubeGenericRuntimeManager(pkg/kubelet/kuberuntime/kuberuntime

Android系統原始碼分析-ContentProvider

距離上一次寫部落格已經半年多了,這半年發生了很多事情,也有了很多感觸,最主要是改變了忙碌了工作,更加重視身體的健康,為此也把工作地點從深圳這個一線城市換到了珠海,工作相對沒有那麼累,身體感覺也好了很多。所以在工作完成之餘,也有了更多的時間來自我學習和提高,後續會用更多時間來寫更多實用的東西,幫助我們理解

Vue 原始碼分析proxy代理

Vue 原始碼分析之proxy代理 當我們在使用Vue進行資料設定時,通常初始化格式為: let data = { age: 12, name: 'yang' } // 例項化Vue物件 let vm = new Vue({ data })

Qt原始碼分析事件分發器QEventDispatcherWin32

分析Qt原始碼一則想自己在開發學習中有積累,同時自己也一直有一種理念,使用她那麼就更深入的認識她。 如果有分析不正確的,還煩請各位看官指正。 事件分發器建立 在QCoreApplication建構函式中 if (!QCoreApplicationPrivate

lodash原始碼分析isArguments

lodash原始碼分析之isArguments 有人命中註定要過平庸的生活,默默無聞,因為他們經歷了痛苦或不幸;有人卻故意這樣做,那是因為他們得到的幸福超過了他們的承受能力。 ——卡爾維諾《煙雲》 本文為讀 lodash 原始碼的第二十一篇,後續文章會更新到這個倉庫中,歡迎 star:poc

Netty原始碼分析LengthFieldBasedFrameDecoder

拆包的原理 關於拆包原理的上一篇博文 netty原始碼分析之拆包器的奧祕 中已詳細闡述,這裡簡單總結下:netty的拆包過程和自己寫手工拆包並沒有什麼不同,都是將位元組累加到一個容器裡面,判斷當前累加的位元組資料是否達到了一個包的大小,達到一個包大小就拆開,進而傳遞到上層業務解碼handler 之所以ne

illuminate/routing 原始碼分析註冊路由

我們知道,在 Laravel 世界裡,外界傳進來一個 Request 時,會被 Kernel 處理並返回給外界一個 Response。Kernel 在處理 Request 時,會呼叫 illuminate/routing 包提供的路由功能,來根據當前的 Request,轉發到對應的執行邏輯(執行邏輯的形式可以

Uboot啟動過程原始碼分析第二階段

UBoot的最終目標是啟動核心 1.從Flash中讀出核心 2.啟動核心 通過呼叫lib_arm/board.c中的start_armboot函式進入uboot第二階段 第二階段總結圖 typedef struct global_data { bd_t *bd; unsigned

Uboot啟動過程原始碼分析第一階段(硬體相關)

從上一個部落格知道uboot的入口點在 cpu/arm920t/start.s 開啟cpu/arm920t/start.s 跳轉到reset reset: /* * set the cpu to SVC32 mode// CUP設定為管理模式 */ mrs r0,cps

Java集合原始碼分析LikedList

一、LinkedList結構     LinkedList是一種可以在任何位置進行高效地插入和移除操作的有序序列,它是基於雙向連結串列實現的。   LinkedList 是一個繼承於AbstractSequentialList的雙向連結串列。它也可以被當作堆疊、佇列或雙端佇列進行操作。