Libevent原始碼分析(七)--- IOCP
event_base中有一個iocp變數,event_base的初始化函式中會呼叫event_base_start_iocp開啟iocp功能,event_base_start_iocp又會呼叫event_iocp_port_launch來初始化IOCP:
#ifdef WIN32
if (cfg && (cfg->flags & EVENT_BASE_FLAG_STARTUP_IOCP))
event_base_start_iocp(base, cfg->n_cpus_hint);
#endif
int
event_base_start_iocp(struct event_base *base, int n_cpus)
{
#ifdef WIN32
if (base->iocp)
return 0;
base->iocp = event_iocp_port_launch(n_cpus);
if (!base->iocp) {
event_warnx("%s: Couldn't launch IOCP", __func__);
return -1;
}
return 0;
#else
return -1;
#endif
iocp指向一個event_iocp_port結構體:
struct event_iocp_port {
/** The port itself */
HANDLE port;
/* A lock to cover internal structures. */
CRITICAL_SECTION lock;
/** Number of threads ever open on the port. */
short n_threads;
/** True iff we're shutting down all the threads on this port */
short shutdown;
/** How often the threads on this port check for shutdown and other
* conditions */
long ms;
/* The threads that are waiting for events. */
HANDLE *threads;
/** Number of threads currently open on this port. */
short n_live_threads;
/** A semaphore to signal when we are done shutting down. */
HANDLE *shutdownSemaphore;
};
其中port使iocp的埠,shutdown,lock和shutdownSemaphore用於關閉iocp。ms是GetQueuedCompletionStatus的等待時間,threads是執行緒控制代碼,n_threads代表執行緒數量,n_live_threads代表當前沒有關閉的執行緒。
event_iocp_port的初始化在event_iocp_port_launch函式中進行:
struct event_iocp_port *event_iocp_port_launch(int n_cpus)
{
struct event_iocp_port *port;
int i;
if (!extension_fns_initialized)
init_extension_functions(&the_extension_fns);
if (!(port = mm_calloc(1, sizeof(struct event_iocp_port))))
return NULL;
if (n_cpus <= 0)
n_cpus = N_CPUS_DEFAULT;
port->n_threads = n_cpus * 2;
port->threads = mm_calloc(port->n_threads, sizeof(HANDLE));
if (!port->threads)
goto err;
port->port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0,
n_cpus);
port->ms = -1;
if (!port->port)
goto err;
port->shutdownSemaphore = CreateSemaphore(NULL, 0, 1, NULL);
if (!port->shutdownSemaphore)
goto err;
for (i=0; i<port->n_threads; ++i) {
ev_uintptr_t th = _beginthread(loop, 0, port);
if (th == (ev_uintptr_t)-1)
goto err;
port->threads[i] = (HANDLE)th;
++port->n_live_threads;
}
InitializeCriticalSectionAndSpinCount(&port->lock, 1000);
return port;
err:
if (port->port)
CloseHandle(port->port);
if (port->threads)
mm_free(port->threads);
if (port->shutdownSemaphore)
CloseHandle(port->shutdownSemaphore);
mm_free(port);
return NULL;
}
event_iocp_port_launch會獲取iocp的擴充套件函式庫,然後建立iocp的埠和用於關閉iocp的訊號量,iocp還需要一個執行緒池,執行緒池大小為cpu數量的2倍。
void
event_overlapped_init(struct event_overlapped *o, iocp_callback cb)
{
memset(o, 0, sizeof(struct event_overlapped));
o->cb = cb;
}
static void
handle_entry(OVERLAPPED *o, ULONG_PTR completion_key, DWORD nBytes, int ok)
{
struct event_overlapped *eo =
EVUTIL_UPCAST(o, struct event_overlapped, overlapped);
eo->cb(eo, completion_key, nBytes, ok);
}
static void
loop(void *_port)
{
struct event_iocp_port *port = _port;
long ms = port->ms;
HANDLE p = port->port;
if (ms <= 0)
ms = INFINITE;
while (1) {
OVERLAPPED *overlapped=NULL;
ULONG_PTR key=0;
DWORD bytes=0;
int ok = GetQueuedCompletionStatus(p, &bytes, &key,
&overlapped, ms);
EnterCriticalSection(&port->lock);
if (port->shutdown) {
if (--port->n_live_threads == 0)
ReleaseSemaphore(port->shutdownSemaphore, 1,
NULL);
LeaveCriticalSection(&port->lock);
return;
}
LeaveCriticalSection(&port->lock);
if (key != NOTIFICATION_KEY && overlapped)
handle_entry(overlapped, key, bytes, ok);
else if (!overlapped)
break;
}
event_warnx("GetQueuedCompletionStatus exited with no event.");
EnterCriticalSection(&port->lock);
if (--port->n_live_threads == 0)
ReleaseSemaphore(port->shutdownSemaphore, 1, NULL);
LeaveCriticalSection(&port->lock);
}
loop是執行緒池執行的函式,他通過呼叫GetQueuedCompletionStatus監聽iocp中的事件。ms在初始化中被設定為-1,無限等待。正常輕快下GetQueuedCompletionStatus中的key會返回0,如果返回-1即NOTIFICATION_KEY證明呼叫了event_iocp_notify_all函式從而呼叫了PostQueuedCompletionStatus導致GetQueuedCompletionStatus立刻返回。這時需要關閉執行緒,結束iocp。當loop檢測到事件之後會呼叫handle_entry來呼叫對應的回撥。
event_overlapped的定義如下:
struct event_overlapped {
OVERLAPPED overlapped;
iocp_callback cb;
};
該結構體的第一個變數OVERLAPPED就是投入讀寫事件時傳入的OVERLAPPED,根據GetQueuedCompletionStatus的返回可以獲得對應的OVERLAPPED,從而可以根據EVUTIL_UPCAST獲得event_overlapped結構體,呼叫對應的回撥。
看完iocp的工作流程,接下來看bufferevent_asyn是如何使用iocp的。想要試用bufferevent_asyn,首先要呼叫bufferevent_async_new:
struct bufferevent *
bufferevent_async_new(struct event_base *base,
evutil_socket_t fd, int options)
{
struct bufferevent_async *bev_a;
struct bufferevent *bev;
struct event_iocp_port *iocp;
options |= BEV_OPT_THREADSAFE;
if (!(iocp = event_base_get_iocp(base)))
return NULL;
if (fd >= 0 && event_iocp_port_associate(iocp, fd, 1)<0) {
int err = GetLastError();
/* We may have alrady associated this fd with a port.
* Let's hope it's this port, and that the error code
* for doing this neer changes. */
if (err != ERROR_INVALID_PARAMETER)
return NULL;
}
if (!(bev_a = mm_calloc(1, sizeof(struct bufferevent_async))))
return NULL;
bev = &bev_a->bev.bev;
if (!(bev->input = evbuffer_overlapped_new(fd))) {
mm_free(bev_a);
return NULL;
}
if (!(bev->output = evbuffer_overlapped_new(fd))) {
evbuffer_free(bev->input);
mm_free(bev_a);
return NULL;
}
if (bufferevent_init_common(&bev_a->bev, base, &bufferevent_ops_async,
options)<0)
goto err;
evbuffer_add_cb(bev->input, be_async_inbuf_callback, bev);
evbuffer_add_cb(bev->output, be_async_outbuf_callback, bev);
event_overlapped_init(&bev_a->connect_overlapped, connect_complete);
event_overlapped_init(&bev_a->read_overlapped, read_complete);
event_overlapped_init(&bev_a->write_overlapped, write_complete);
bev_a->ok = fd >= 0;
if (bev_a->ok)
_bufferevent_init_generic_timeout_cbs(bev);
return bev;
err:
bufferevent_free(&bev_a->bev.bev);
return NULL;
}
引數中的fd可以是有效的套接字,也可以是-1(之後在be_async_ctrl中指定),event_iocp_port_associate用於把套接字關聯到iocp的埠上。evbuffer_overlapped_new建立一個evbuffer_overlapped結構體並且返回一個evbuffer:
struct evbuffer_overlapped {
struct evbuffer buffer;
/** The socket that we're doing overlapped IO on. */
evutil_socket_t fd;
/** pending I/O type */
unsigned read_in_progress : 1;
unsigned write_in_progress : 1;
/** The first pinned chain in the buffer. */
struct evbuffer_chain *first_pinned;
/** How many chains are pinned; how many of the fields in buffers
* are we using. */
int n_buffers;
WSABUF buffers[MAX_WSABUFS];
};
struct evbuffer *
evbuffer_overlapped_new(evutil_socket_t fd)
{
struct evbuffer_overlapped *evo;
evo = mm_calloc(1, sizeof(struct evbuffer_overlapped));
if (!evo)
return NULL;
TAILQ_INIT(&evo->buffer.callbacks);
evo->buffer.refcnt = 1;
evo->buffer.last_with_datap = &evo->buffer.first;
evo->buffer.is_overlapped = 1;
evo->fd = fd;
return &evo->buffer;
}
evbuffer_overlapped結構體可以由evbuffer通過upcast_evbuffer函式得到。libevent大量使用這種方法隱藏複雜實現,只給開發者簡單的通用的結構,內部則通過型別轉換獲得真正的結構體。evbuffer_overlapped結構體中read_in_progress和write_in_progress用來標記讀寫事件是否已經投遞。buffers是iocp投遞讀寫時用到的資料快取。
繼續看bufferevent_async_new函式,be_async_inbuf_callback和be_async_outbuf_callback函式被設定為讀寫evbuffer的callback。connect_complete,read_complete和write_complete分別被設定為connect_overlapped,read_overlapped和write_overlapped的回撥。接下來就看一下這些回撥:
static void
be_async_outbuf_callback(struct evbuffer *buf,
const struct evbuffer_cb_info *cbinfo,
void *arg)
{
struct bufferevent *bev = arg;
struct bufferevent_async *bev_async = upcast(bev);
/* If we added data to the outbuf and were not writing before,
* we may want to write now. */
_bufferevent_incref_and_lock(bev);
if (cbinfo->n_added)
bev_async_consider_writing(bev_async);
_bufferevent_decref_and_unlock(bev);
}
be_async_outbuf_callback是output的回撥,當有資料寫入output(如果設定閥值需要達到閥值)時會呼叫此函式。該函式會呼叫bev_async_consider_writing進行寫事件的投遞:
static void
bev_async_consider_writing(struct bufferevent_async *beva)
{
size_t at_most;
int limit;
struct bufferevent *bev = &beva->bev.bev;
/* Don't write if there's a write in progress, or we do not
* want to write, or when there's nothing left to write. */
if (beva->write_in_progress || beva->bev.connecting)
return;
if (!beva->ok || !(bev->enabled&EV_WRITE) ||
!evbuffer_get_length(bev->output)) {
bev_async_del_write(beva);
return;
}
at_most = evbuffer_get_length(bev->output);
/* This is safe so long as bufferevent_get_write_max never returns
* more than INT_MAX. That's true for now. XXXX */
limit = (int)_bufferevent_get_write_max(&beva->bev);
if (at_most >= (size_t)limit && limit >= 0)
at_most = limit;
if (beva->bev.write_suspended) {
bev_async_del_write(beva);
return;
}
/* XXXX doesn't respect low-water mark very well. */
bufferevent_incref(bev);
if (evbuffer_launch_write(bev->output, at_most,
&beva->write_overlapped)) {
bufferevent_decref(bev);
beva->ok = 0;
_bufferevent_run_eventcb(bev, BEV_EVENT_ERROR);
} else {
beva->write_in_progress = at_most;
_bufferevent_decrement_write_buckets(&beva->bev, at_most);
bev_async_add_write(beva);
}
}
bev_async_consider_writing會判斷當前是否已經有資料在投遞,如果沒有則設定一個合理的at_most值然後呼叫evbuffer_launch_write:
int
evbuffer_launch_write(struct evbuffer *buf, ev_ssize_t at_most,
struct event_overlapped *ol)
{
struct evbuffer_overlapped *buf_o = upcast_evbuffer(buf);
int r = -1;
int i;
struct evbuffer_chain *chain;
DWORD bytesSent;
if (!buf) {
/* No buffer, or it isn't overlapped */
return -1;
}
EVBUFFER_LOCK(buf);
EVUTIL_ASSERT(!buf_o->read_in_progress);
if (buf->freeze_start || buf_o->write_in_progress)
goto done;
if (!buf->total_len) {
/* Nothing to write */
r = 0;
goto done;
} else if (at_most < 0 || (size_t)at_most > buf->total_len) {
at_most = buf->total_len;
}
evbuffer_freeze(buf, 1);
buf_o->first_pinned = NULL;
buf_o->n_buffers = 0;
memset(buf_o->buffers, 0, sizeof(buf_o->buffers));
chain = buf_o->first_pinned = buf->first;
for (i=0; i < MAX_WSABUFS && chain; ++i, chain=chain->next) {
WSABUF *b = &buf_o->buffers[i];
b->buf = (char*)( chain->buffer + chain->misalign );
_evbuffer_chain_pin(chain, EVBUFFER_MEM_PINNED_W);
if ((size_t)at_most > chain->off) {
/* XXXX Cast is safe for now, since win32 has no
mmaped chains. But later, we need to have this
add more WSAbufs if chain->off is greater than
ULONG_MAX */
b->len = (unsigned long)chain->off;
at_most -= chain->off;
} else {
b->len = (unsigned long)at_most;
++i;
break;
}
}
buf_o->n_buffers = i;
_evbuffer_incref(buf);
if (WSASend(buf_o->fd, buf_o->buffers, i, &bytesSent, 0,
&ol->overlapped, NULL)) {
int error = WSAGetLastError();
if (error != WSA_IO_PENDING) {
/* An actual error. */
pin_release(buf_o, EVBUFFER_MEM_PINNED_W);
evbuffer_unfreeze(buf, 1);
evbuffer_free(buf); /* decref */
goto done;
}
}
buf_o->write_in_progress = 1;
r = 0;
done:
EVBUFFER_UNLOCK(buf);
return r;
}
evbuffer_launch_write將evbuffer中的資料和WSABUF做一個對映,之後呼叫WSASend進行傳送,次函式的引數ol是bufferevent_async的write_overlapped結構體,WSASend函式中傳入的就是write_overlapped結構體中的overlapped變數:
static void
handle_entry(OVERLAPPED *o, ULONG_PTR completion_key, DWORD nBytes, int ok)
{
struct event_overlapped *eo =
EVUTIL_UPCAST(o, struct event_overlapped, overlapped);
eo->cb(eo, completion_key, nBytes, ok);
}
這樣當handle_entry處理回撥時就會找到write_overlapped結構體,從而找到write_complete回撥:
static void
write_complete(struct event_overlapped *eo, ev_uintptr_t key,
ev_ssize_t nbytes, int ok)
{
struct bufferevent_async *bev_a = upcast_write(eo);
struct bufferevent *bev = &bev_a->bev.bev;
short what = BEV_EVENT_WRITING;
ev_ssize_t amount_unwritten;
BEV_LOCK(bev);
EVUTIL_ASSERT(bev_a->write_in_progress);
amount_unwritten = bev_a->write_in_progress - nbytes;
evbuffer_commit_write(bev->output, nbytes);
bev_a->write_in_progress = 0;
if (amount_unwritten)
_bufferevent_decrement_write_buckets(&bev_a->bev,
-amount_unwritten);
if (!ok)
bev_async_set_wsa_error(bev, eo);
if (bev_a->ok) {
if (ok && nbytes) {
BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
if (evbuffer_get_length(bev->output) <=
bev->wm_write.low)
_bufferevent_run_writecb(bev);
bev_async_consider_writing(bev_a);
} else if (!ok) {
what |= BEV_EVENT_ERROR;
bev_a->ok = 0;
_bufferevent_run_eventcb(bev, what);
} else if (!nbytes) {
what |= BEV_EVENT_EOF;
bev_a->ok = 0;
_bufferevent_run_eventcb(bev, what);
}
}
_bufferevent_decref_and_unlock(bev);
}
write_complete首先呼叫evbuffer_commit_write來處理evbuffer中的資料,然後判斷是否需要呼叫bufferevent的寫事件回撥活著事件回撥,同時如果evbuffer中還有資料需要再次投遞。
讀事件和連線事件的流程和寫事件的流程比較類似,這裡不在分析,注意的是當使用IOCP模式時,bufferevent的讀寫事件不再又event_base管理,而是直接使用iocp。當投遞讀寫事件活著連線事件時會呼叫event_base_add_virtual,當事件完成時則會呼叫event_base_del_virtual,這些事件對event_base來說是虛擬的,因為它們是iocp負責管理的。但是當iocp統計事件時同樣會將他們計算在內。
static void
bev_async_del_write(struct bufferevent_async *beva)
{
struct bufferevent *bev = &beva->bev.bev;
if (beva->write_added) {
beva->write_added = 0;
event_base_del_virtual(bev->ev_base);
}
}
static void
bev_async_del_read(struct bufferevent_async *beva)
{
struct bufferevent *bev = &beva->bev.bev;
if (beva->read_added) {
beva->read_added = 0;
event_base_del_virtual(bev->ev_base);
}
}
static void
bev_async_add_write(struct bufferevent_async *beva)
{
struct bufferevent *bev = &beva->bev.bev;
if (!beva->write_added) {
beva->write_added = 1;
event_base_add_virtual(bev->ev_base);
}
}
static void
bev_async_add_read(struct bufferevent_async *beva)
{
struct bufferevent *bev = &beva->bev.bev;
if (!beva->read_added) {
beva->read_added = 1;
event_base_add_virtual(bev->ev_base);
}
}
“`
libevent的原始碼分析至此告一段落,另外libevent的http和dns在這裡不在分析,它們都是建立在之前分析的原始碼基礎上的應用,如果後面有時間會補上這部分的內容。
相關推薦
Libevent原始碼分析(七)--- IOCP
event_base中有一個iocp變數,event_base的初始化函式中會呼叫event_base_start_iocp開啟iocp功能,event_base_start_iocp又會呼叫event_iocp_port_launch來初始化IOCP:
Flume NG原始碼分析(七)ChannelSelector
前幾篇介紹了Flume NG Source元件的基本情況,接下來看看Channel相關的元件,Channel相關元件有: 1. Channel 2. ChannelSelector 3. Interceptor / InterceptorChain 4. ChannelProcess
libevent原始碼分析(五)
libevent-1.4/sample/signal-test.c event_add(&signal_int, NULL); 將 struct event signal_int新增到struct event_base* base,即註冊號監聽事件以及回
libevent原始碼分析(六)
libevent-1.4/sample/singnal-test.c 接下來看看 event_base_dispatch(base); 這個函式是整個Reactor的核心,是一個loop. 函式定義: int event_base_dispatch(struct
libevent原始碼分析(1)
有過看nginx原始碼的基礎,現在來看libevent原始碼,感覺要輕鬆多了。。 第一篇文章,主要是還是介紹一些幾個重要的資料結構吧。。。。 首先是event結構:struct event { TAILQ_ENTRY (event) ev_next; //用於構成eve
mochiweb原始碼分析(七)
前面說到了filelib:is_dir這函式,如果是目錄,則執行true流程,進入mochiweb_request:maybe_redirect/4這函式 然後根據第一個引數是否是[],呼叫不同的分支 回到之前說的true、false判斷,如果是檔案,則呼叫mochiweb_reque
Libevent原始碼分析(四)--- libevent事件機制
之前幾個章節都是分析libevent的輔助功能,這一節將要詳細分析libevent處理事件的流程和機制,在分析之前先看一下libevent的使用方法,本文也將以libevent的使用方式入手來分析libevent的工作機制。 void cb_func(ev
libevent原始碼分析(8)--2.1.8--事件申請與釋放
一、event_new 主要用來建立事件結構體,根據監聽事件型別,檔案描述符,以及回撥函式,回撥函式引數等建立,可以看成是事件的初始化過程,主要是設定事件的初始狀態,此時事件結構體剛剛創建出來還沒有新增到event_base的啟用或者等待列表中,是孤立存在的,需要呼叫eve
libevent原始碼分析(四)
還是上次那個訊號函式的例子, sample/signal-test.c——註冊訊號以及回撥事件。 /* Initalize one event */ event_set(&signal_int, SIGINT, EV_SIGNAL|EV_PERSIST,
libevent原始碼分析(1)--2.1.8--標誌資訊
一、事件型別 event-internal.h /** * @name event flags * * Flags to pass to event_new(), event_assign(), event_pending(), and * anything e
libevent原始碼分析(二)
libevet——Reactor初始化,先來看看event_base_new的定義,先關注主線程式碼,後續研究細節: struct event_base *event_base_new(void) { int i; struct event_base *ba
libevent原始碼分析(2)--2.1.8--結構體 struct event和struct event_callback
一、event_callback結構體 struct event_callback { //下一個回撥事件 TAILQ_ENTRY(event_callback) evcb_active_next; //回撥事件的狀態標識,具體為:
libevent原始碼分析(6)--2.1.8--建立和釋放libevent控制代碼event_base的相關函式
一、event_base_new 建立預設的event_base ** * Create and return a new event_base to use with the rest of Libevent. * * @return a new event_ba
Libevent原始碼分析(六)--- bufferevent
上一節說過,libevent提供六種bufferevent型別,後面會詳細分析其中的兩個:bufferevent_sock和bufferevent_async.下面是bufferevent的詳細定義: struct bufferevent { /*
ZMQ原始碼分析(七) --程序內通訊
之前兩節分析了zmq的tcp通訊流程,除了tcp之外,zmq還支援許多其他的通訊模式,比如inproc,ipc,pgm,epgm,tipc等。這一節接著分析inpro,即程序內通訊。 和tcp通訊相比,程序內通訊要簡單許多,因為不涉及到遠端連線的認證以及資料的
Libevent原始碼分析(五)--- evbuffer的基本操作
之前幾節分析了libevent底層的結構和執行機制,接下來的幾節將會分析Bufferevents,Bufferevents在event的基礎上加入了資料快取邏輯,使得事件和資料結合在一起。libevent的bufferevent有六種型別,分別是:buffere
Tomcat原始碼分析 (七)----- Tomcat 啟動過程(二)
在上一篇文章中,我們分析了tomcat的初始化過程,是由Bootstrap反射呼叫Catalina的load方法完成tomcat的初始化,包括server.xml的解析、例項化各大元件、初始化元件等邏輯。那麼tomcat又是如何啟動webapp應用,又是如何載入應用程式的ServletContextListe
mybatis 原始碼分析(七)KeyGenerator 詳解
一、KeyGenerator 概述 在平時開發的時候經常會有這樣的需求,插入資料返回主鍵,或者插入資料之前需要獲取主鍵,這樣的需求在 mybatis 中也是支援的,其中主要的邏輯部分就在 KeyGenerator 中,下面是他的類圖: 其中: NoKeyGenerator:預設空實現,不需要對主鍵單獨處
Netty原始碼分析 (七)----- read過程 原始碼分析
在上一篇文章中,我們分析了processSelectedKey這個方法中的accept過程,本文將分析一下work執行緒中的read過程。 private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
HLS學習(七)HLSDownloader原始碼分析(6)下載TS檔案片段
下載TS檔案片段 處理完Master PlayList和Media PlayList之後就可以開始下載TS視訊片段了 下載的流程如下: 1、初始化本地的檔名 2、設定本地檔案的訪問許可權 3、建立任務佇列 4、建立PlayList更新執行緒,因為伺服器上的m3u8檔案