ZeroMQ原始碼分析之Context
在庫中使用全域性變數並不理想.一個庫也許會被程式載入很多次,但即便如此,也只會存在唯一一個全域性變數集.
Figure24.1: ØMQ being used by different libraries
圖24.1中兩個不同且獨立的庫都使用了ZeroMQ庫,然後應用程式使用了這兩個庫.
當這種情況出現時,兩個ZeroMQ例項都訪問了相同的變數,會導致競爭條件,奇怪的錯誤和未定義行為.
為了防止這種問題的發生,ZeroMQ庫並沒有全域性變數.取而代之的是,由使用該庫的使用者負責顯式建立全域性狀態.我們把這個包含全域性狀態的物件稱為context.從使用者的角度看,這個context看起來像是一個工作執行緒池
在程式中使用ZeroMQ時,需要先建立context,程式碼如下:
void *context =zmq_ctx_new();
zmq_ctx_new()實現如下:
zmq::ctx_t *ctx =new(std::nothrow) zmq::ctx_t;
return ctx;
我們接著去ctx.cpp檔案中看一下ctx_t類的建構函式中做了什麼工作:
zmq::ctx_t::ctx_t () : tag (ZMQ_CTX_TAG_VALUE_GOOD), starting (true), terminating (false), reaper (NULL), slot_count (0), slots (NULL), max_sockets (clipped_maxsocket (ZMQ_MAX_SOCKETS_DFLT)), io_thread_count (ZMQ_IO_THREADS_DFLT), ipv6 (false) { #ifdef HAVE_FORK pid = getpid(); #endif }
果然如上文中提到的,context只是一個儲存我們需要的一些全域性狀態的一個物件,它在建構函式中初始化了一些全域性狀態:
// Used to checkwhether the object is a context. uint32_t tag; // If true,zmq_init has been called but no socket has been created // yet. Launchingof I/O threads is delayed. bool starting; // If true,zmq_term was already called. bool terminating; // The reaperthread. zmq::reaper_t*reaper; // Array ofpointers to mailboxes for both application and I/O threads. uint32_t slot_count; mailbox_t **slots; // Maximum numberof sockets that can be opened at the same time. int max_sockets; // Number of I/Othreads to launch. int io_thread_count; // Is IPv6 enabledon this context? bool ipv6; // the process thatcreated this context. Used to detect forking. pid_t pid;
接著來看一下context中的全域性狀態是如何影響整個ZeroMQ呼叫過程的.
我們開始建立第一個socket,在應用程式程式碼中有:
void *responder =zmq_socket(context, ZMQ_REP);
在zmq.cpp中對應的呼叫實現函式為:
void *zmq_socket (void *ctx_, int type_)
{
if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) {
errno = EFAULT;
return NULL;
}
zmq::ctx_t *ctx = (zmq::ctx_t*) ctx_;
zmq::socket_base_t *s = ctx->create_socket (type_);
return (void *) s;
}
可見建立socket是使用context中的建立函式來建立的,我們去ctx.cpp中看對應的建立socket實現函式(附上中文註釋):
zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
{
slot_sync.lock ();
if (unlikely (starting)) { // 初始化mailboxes佇列.多出來的兩個slots 是為
// zmq_ctx_term執行緒andreaper 執行緒準備的.
starting = false;
// 初始化mailboxes佇列.多出來的兩個slots 是為
// zmq_ctx_term執行緒andreaper 執行緒準備的.
opt_sync.lock ();
int mazmq = max_sockets;
int ios = io_thread_count;
opt_sync.unlock ();
slot_count = mazmq + ios + 2;
slots = (mailbox_t**) malloc (sizeof (mailbox_t*) * slot_count);
alloc_assert (slots);
// Initialise the infrastructure for zmq_ctx_term thread.
slots [term_tid] = &term_mailbox;
// 建立reaper執行緒.
reaper = new (std::nothrow) reaper_t (this, reaper_tid);
alloc_assert (reaper);
slots [reaper_tid] = reaper->get_mailbox ();
reaper->start ();
// 建立I/O執行緒物件並啟動它們.
for (int i = 2; i != ios + 2; i++) {
io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i);
alloc_assert (io_thread);
io_threads.push_back (io_thread);
slots [i] = io_thread->get_mailbox ();
io_thread->start ();
}
// 為slot佇列中沒有使用的部分,建立一個空slots列表指向它們
for (int32_t i = (int32_t) slot_count - 1;
i >= (int32_t) ios + 2; i--) {
empty_slots.push_back (i);
slots [i] = NULL;
}
}
// 一旦zmq_ctx_term()被呼叫,我們就不能建立新的sockets了.
if (terminating) {
slot_sync.unlock ();
errno = ETERM;
return NULL;
}
// 如果達到了最大sockets數的限制,返回錯誤.
if (empty_slots.empty ()) {
slot_sync.unlock ();
errno = EMFILE;
return NULL;
}
// 在空的slot列表中選擇一個slot給新建socket.
uint32_t slot = empty_slots.back ();
empty_slots.pop_back ();
// 產生一個新的唯一ID給socket.
int sid = ((int) max_socket_id.add (1)) + 1;
// 建立這個socket並且註冊它的mailbox.
socket_base_t *s = socket_base_t::create (type_, this, slot, sid);
if (!s) {
empty_slots.push_back (slot);
slot_sync.unlock ();
return NULL;
}
sockets.push_back (s);
slots [slot] = s->get_mailbox ();
slot_sync.unlock ();
return s;
}
接下來分析呼叫ZeroMQ程式中出現的如下兩行程式碼:
zmq_close(responder);
zmq_ctx_destroy(context);
在zmq.cpp中:
int zmq_close (void *s_)
{
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
((zmq::socket_base_t*) s_)->close ();
return 0;
}
可見關閉socket是由socket自己來做的,無需context插手.
接下來看zmq.cpp:
int zmq_ctx_destroy (void *ctx_)
{
return zmq_ctx_term (ctx_);
}
int zmq_ctx_term (void *ctx_)
{
if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) {
errno = EFAULT;
return -1;
}
int rc = ((zmq::ctx_t*) ctx_)->terminate ();
int en = errno;
// Shut down only if termination was not interrupted by a signal.
if (!rc || en != EINTR) {
#ifdef ZMQ_HAVE_WINDOWS
// On Windows, uninitialise socket layer.
rc = WSACleanup ();
wsa_assert (rc != SOCKET_ERROR);
#endif
#if defined ZMQ_HAVE_OPENPGM
// Shut down the OpenPGM library.
if (pgm_shutdown () != TRUE)
zmq_assert (false);
#endif
}
errno = en;
return rc;
}
我們繼續去ctx.cpp中看:
int zmq::ctx_t::terminate ()
{
// Connect up any pending inproc connections, otherwise we will hang
pending_connections_t copy = pending_connections;
for (pending_connections_t::iterator p = copy.begin (); p != copy.end (); ++p) {
zmq::socket_base_t *s = create_socket (ZMQ_PAIR);
s->bind (p->first.c_str ());
s->close ();
}
slot_sync.lock ();
if (!starting) {
#ifdef HAVE_FORK
if (pid != getpid())
{
// we are a forked child process. Close all file descriptors
// inherited from the parent.
for (sockets_t::size_type i = 0; i != sockets.size (); i++)
{
sockets[i]->get_mailbox()->forked();
}
term_mailbox.forked();
}
#endif
// Check whether termination was already underway, but interrupted and now
// restarted.
bool restarted = terminating;
terminating = true;
// First attempt to terminate the context.
if (!restarted) {
// First send stop command to sockets so that any blocking calls
// can be interrupted. If there are no sockets we can ask reaper
// thread to stop.
for (sockets_t::size_type i = 0; i != sockets.size (); i++)
sockets [i]->stop ();
if (sockets.empty ())
reaper->stop ();
}
slot_sync.unlock();
// Wait till reaper thread closes all the sockets.
command_t cmd;
int rc = term_mailbox.recv (&cmd, -1);
if (rc == -1 && errno == EINTR)
return -1;
errno_assert (rc == 0);
zmq_assert (cmd.type == command_t::done);
slot_sync.lock ();
zmq_assert (sockets.empty ());
}
slot_sync.unlock ();
// Deallocate the resources.
delete this;
return 0;
}
相關推薦
ZeroMQ原始碼分析之Context
在庫中使用全域性變數並不理想.一個庫也許會被程式載入很多次,但即便如此,也只會存在唯一一個全域性變數集. Figure24.1: ØMQ being used by different libraries 圖24.1中兩個不同且獨立的庫都使用了ZeroMQ庫,然後應用
jQuery原始碼分析之jQuery(selector,context)詳解
首先我們給出下面的HTML程式碼: <div id="parent" class="parent"> <div class="child"> child1 </div> <div class="child">
Spark原始碼分析之Spark Shell(上)
https://www.cnblogs.com/xing901022/p/6412619.html 文中分析的spark版本為apache的spark-2.1.0-bin-hadoop2.7。 bin目錄結構: -rwxr-xr-x. 1 bigdata bigdata 1089 Dec
Netty 原始碼分析之拆包器的奧祕
為什麼要粘包拆包 為什麼要粘包 首先你得了解一下TCP/IP協議,在使用者資料量非常小的情況下,極端情況下,一個位元組,該TCP資料包的有效載荷非常低,傳遞100位元組的資料,需要100次TCP傳送,100次ACK,在應用及時性要求不高的情況下,將這100個有效資料拼接成一個數據包,那會縮短到一個TCP資
Android原始碼分析之為什麼在onCreate() 和 onResume() 獲取不到 View 的寬高
轉載自:https://www.jianshu.com/p/d7ab114ac1f7 先來看一段很熟悉的程式碼,可能在最開始接觸安卓的時候,大部分人都寫過的一段程式碼;即嘗試在 onCreate() 和 onResume() 方法中去獲取某個 View 的寬高資訊: 但是列印輸出後,我們會發
netty原始碼分析之服務端啟動
ServerBootstrap與Bootstrap分別是netty中服務端與客戶端的引導類,主要負責服務端與客戶端初始化、配置及啟動引導等工作,接下來我們就通過netty原始碼中的示例對ServerBootstrap與Bootstrap的原始碼進行一個簡單的分析。首先我們知道這兩個類都繼承自AbstractB
SNMP原始碼分析之(一)配置檔案部分
snmpd.conf想必不陌生。在程序啟動過程中會去讀取配置檔案中各個配置。其中幾個引數需要先知道是幹什麼的: token:配置檔案的每行的開頭,例如 group MyROGroup v1 readSec 這行token的引數是group。
【kubernetes/k8s原始碼分析】kubelet原始碼分析之cdvisor原始碼分析
資料流 UnsecuredDependencies -> run 1. cadvisor.New初始化 if kubeDeps.CAdvisorInterface == nil { imageFsInfoProvider := cadv
【kubernetes/k8s原始碼分析】kubelet原始碼分析之容器網路初始化原始碼分析
一. 網路基礎 1.1 網路名稱空間的操作 建立網路名稱空間: ip netns add 名稱空間內執行命令: ip netns exec 進入名稱空間: ip netns exec bash 1.2 bridge-nf-c
【kubernetes/k8s原始碼分析】kubelet原始碼分析之資源上報
0. 資料流 路徑: pkg/kubelet/kubelet.go Run函式() -> syncNodeStatus () -> registerWithAPIServer() ->
【kubernetes/k8s原始碼分析】kubelet原始碼分析之啟動容器
主要是呼叫runtime,這裡預設為docker 0. 資料流 NewMainKubelet(cmd/kubelet/app/server.go) -> NewKubeGenericRuntimeManager(pkg/kubelet/kuberuntime/kuberuntime
Android系統原始碼分析之-ContentProvider
距離上一次寫部落格已經半年多了,這半年發生了很多事情,也有了很多感觸,最主要是改變了忙碌了工作,更加重視身體的健康,為此也把工作地點從深圳這個一線城市換到了珠海,工作相對沒有那麼累,身體感覺也好了很多。所以在工作完成之餘,也有了更多的時間來自我學習和提高,後續會用更多時間來寫更多實用的東西,幫助我們理解
Vue 原始碼分析之proxy代理
Vue 原始碼分析之proxy代理 當我們在使用Vue進行資料設定時,通常初始化格式為: let data = { age: 12, name: 'yang' } // 例項化Vue物件 let vm = new Vue({ data })
Qt原始碼分析之事件分發器QEventDispatcherWin32
分析Qt原始碼一則想自己在開發學習中有積累,同時自己也一直有一種理念,使用她那麼就更深入的認識她。 如果有分析不正確的,還煩請各位看官指正。 事件分發器建立 在QCoreApplication建構函式中 if (!QCoreApplicationPrivate
lodash原始碼分析之isArguments
lodash原始碼分析之isArguments 有人命中註定要過平庸的生活,默默無聞,因為他們經歷了痛苦或不幸;有人卻故意這樣做,那是因為他們得到的幸福超過了他們的承受能力。 ——卡爾維諾《煙雲》 本文為讀 lodash 原始碼的第二十一篇,後續文章會更新到這個倉庫中,歡迎 star:poc
Netty原始碼分析之LengthFieldBasedFrameDecoder
拆包的原理 關於拆包原理的上一篇博文 netty原始碼分析之拆包器的奧祕 中已詳細闡述,這裡簡單總結下:netty的拆包過程和自己寫手工拆包並沒有什麼不同,都是將位元組累加到一個容器裡面,判斷當前累加的位元組資料是否達到了一個包的大小,達到一個包大小就拆開,進而傳遞到上層業務解碼handler 之所以ne
illuminate/routing 原始碼分析之註冊路由
我們知道,在 Laravel 世界裡,外界傳進來一個 Request 時,會被 Kernel 處理並返回給外界一個 Response。Kernel 在處理 Request 時,會呼叫 illuminate/routing 包提供的路由功能,來根據當前的 Request,轉發到對應的執行邏輯(執行邏輯的形式可以
Uboot啟動過程原始碼分析之第二階段
UBoot的最終目標是啟動核心 1.從Flash中讀出核心 2.啟動核心 通過呼叫lib_arm/board.c中的start_armboot函式進入uboot第二階段 第二階段總結圖 typedef struct global_data { bd_t *bd; unsigned
Uboot啟動過程原始碼分析之第一階段(硬體相關)
從上一個部落格知道uboot的入口點在 cpu/arm920t/start.s 開啟cpu/arm920t/start.s 跳轉到reset reset: /* * set the cpu to SVC32 mode// CUP設定為管理模式 */ mrs r0,cps
Java集合原始碼分析之LikedList
一、LinkedList結構 LinkedList是一種可以在任何位置進行高效地插入和移除操作的有序序列,它是基於雙向連結串列實現的。 LinkedList 是一個繼承於AbstractSequentialList的雙向連結串列。它也可以被當作堆疊、佇列或雙端佇列進行操作。