Libevent原始碼分析(六)--- bufferevent
上一節說過,libevent提供六種bufferevent型別,後面會詳細分析其中的兩個:bufferevent_sock和bufferevent_async.下面是bufferevent的詳細定義:
struct bufferevent {
/** Event base for which this bufferevent was created. */
struct event_base *ev_base;
/** Pointer to a table of function pointers to set up how this
bufferevent behaves. */
const struct bufferevent_ops *be_ops;
/** A read event that triggers when a timeout has happened or a socket
is ready to read data. Only used by some subtypes of
bufferevent. */
struct event ev_read;
/** A write event that triggers when a timeout has happened or a socket
is ready to write data. Only used by some subtypes of
bufferevent. */
struct event ev_write;
/** An input buffer. Only the bufferevent is allowed to add data to
this buffer, though the user is allowed to drain it. */
struct evbuffer *input;
/** An input buffer. Only the bufferevent is allowed to drain data
from this buffer, though the user is allowed to add it. */
struct evbuffer *output;
struct event_watermark wm_read;
struct event_watermark wm_write;
bufferevent_data_cb readcb;
bufferevent_data_cb writecb;
/* This should be called 'eventcb', but renaming it would break
* backward compatibility */
bufferevent_event_cb errorcb;
void *cbarg;
struct timeval timeout_read;
struct timeval timeout_write;
/** Events that are currently enabled: currently EV_READ and EV_WRITE
are supported. */
short enabled;
};
其中ev_base指向bufferevent所屬的event_base ,ev_read,ev_write是讀寫事件,timeout_read和timeout_write是讀寫超時時間,readcb,writecb和errorcb分別是讀回撥,寫回調和事件回撥,因為版本相容問題使用errorcb命名。input和output是evbuffer型別的讀寫快取。enabled表示支援的事件,目前只有EV_READ 和EV_WRITE。wm_read和wm_write代表讀寫快取的伐值,be_ops是bufferevent_ops型別變數,它的定義如下:
struct bufferevent_ops {
/** The name of the bufferevent's type. */
const char *type;
/** At what offset into the implementation type will we find a
bufferevent structure?
Example: if the type is implemented as
struct bufferevent_x {
int extra_data;
struct bufferevent bev;
}
then mem_offset should be offsetof(struct bufferevent_x, bev)
*/
off_t mem_offset;
/** Enables one or more of EV_READ|EV_WRITE on a bufferevent. Does
not need to adjust the 'enabled' field. Returns 0 on success, -1
on failure.
*/
int (*enable)(struct bufferevent *, short);
/** Disables one or more of EV_READ|EV_WRITE on a bufferevent. Does
not need to adjust the 'enabled' field. Returns 0 on success, -1
on failure.
*/
int (*disable)(struct bufferevent *, short);
/** Free any storage and deallocate any extra data or structures used
in this implementation.
*/
void (*destruct)(struct bufferevent *);
/** Called when the timeouts on the bufferevent have changed.*/
int (*adj_timeouts)(struct bufferevent *);
/** Called to flush data. */
int (*flush)(struct bufferevent *, short, enum bufferevent_flush_mode);
/** Called to access miscellaneous fields. */
int (*ctrl)(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *);
};
bufferevent_ops 的實現和eventop比較類似,都是定義了一組指標,不同的bufferevent實現可以自定義這些具體操作。比如bufferevent_sock的對應定義:
const struct bufferevent_ops bufferevent_ops_socket = {
"socket",
evutil_offsetof(struct bufferevent_private, bev),
be_socket_enable,
be_socket_disable,
be_socket_destruct,
be_socket_adj_timeouts,
be_socket_flush,
be_socket_ctrl,
};
接下來分析一下buffevent_sock的實現方式。
struct bufferevent *
bufferevent_new(evutil_socket_t fd,
bufferevent_data_cb readcb, bufferevent_data_cb writecb,
bufferevent_event_cb eventcb, void *cbarg)
{
struct bufferevent *bufev;
if (!(bufev = bufferevent_socket_new(NULL, fd, 0)))
return NULL;
bufferevent_setcb(bufev, readcb, writecb, eventcb, cbarg);
return bufev;
}
struct bufferevent *
bufferevent_socket_new(struct event_base *base, evutil_socket_t fd,
int options)
{
struct bufferevent_private *bufev_p;
struct bufferevent *bufev;
#ifdef WIN32
if (base && event_base_get_iocp(base))
return bufferevent_async_new(base, fd, options);
#endif
if ((bufev_p = mm_calloc(1, sizeof(struct bufferevent_private)))== NULL)
return NULL;
if (bufferevent_init_common(bufev_p, base, &bufferevent_ops_socket,
options) < 0) {
mm_free(bufev_p);
return NULL;
}
bufev = &bufev_p->bev;
evbuffer_set_flags(bufev->output, EVBUFFER_FLAG_DRAINS_TO_FD);
event_assign(&bufev->ev_read, bufev->ev_base, fd,
EV_READ|EV_PERSIST, bufferevent_readcb, bufev);
event_assign(&bufev->ev_write, bufev->ev_base, fd,
EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev);
evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev);
evbuffer_freeze(bufev->input, 0);
evbuffer_freeze(bufev->output, 1);
return bufev;
}
bufferevent_socket_new方法用於建立一個bufferevent_socket型別的bufferevent。如果是在win32環境下則會預設呼叫iocp的對應實現,即bufferevent_async_new。該方法還會建立一個bufferevent_private變數,下面是他的定義:
struct bufferevent_private {
/** The underlying bufferevent structure. */
struct bufferevent bev;
/** Evbuffer callback to enforce watermarks on input. */
struct evbuffer_cb_entry *read_watermarks_cb;
/** If set, we should free the lock when we free the bufferevent. */
unsigned own_lock : 1;
/** Flag: set if we have deferred callbacks and a read callback is
* pending. */
unsigned readcb_pending : 1;
/** Flag: set if we have deferred callbacks and a write callback is
* pending. */
unsigned writecb_pending : 1;
/** Flag: set if we are currently busy connecting. */
unsigned connecting : 1;
/** Flag: set if a connect failed prematurely; this is a hack for
* getting around the bufferevent abstraction. */
unsigned connection_refused : 1;
/** Set to the events pending if we have deferred callbacks and
* an events callback is pending. */
short eventcb_pending;
/** If set, read is suspended until one or more conditions are over.
* The actual value here is a bitfield of those conditions; see the
* BEV_SUSPEND_* flags above. */
bufferevent_suspend_flags read_suspended;
/** If set, writing is suspended until one or more conditions are over.
* The actual value here is a bitfield of those conditions; see the
* BEV_SUSPEND_* flags above. */
bufferevent_suspend_flags write_suspended;
/** Set to the current socket errno if we have deferred callbacks and
* an events callback is pending. */
int errno_pending;
/** The DNS error code for bufferevent_socket_connect_hostname */
int dns_error;
/** Used to implement deferred callbacks */
struct deferred_cb deferred;
/** The options this bufferevent was constructed with */
enum bufferevent_options options;
/** Current reference count for this bufferevent. */
int refcnt;
/** Lock for this bufferevent. Shared by the inbuf and the outbuf.
* If NULL, locking is disabled. */
void *lock;
/** Rate-limiting information for this bufferevent */
struct bufferevent_rate_limit *rate_limiting;
};
bufferevent_private結構的第一個變數是bufferevent型別的,其實每一個bufferevent都對應一個bufferevent_private變數,只是對使用者來說是透明的,使用者只需要瞭解bufferevent即可。bufferevent_private的變數基本都是用於記錄狀態。
繼續分析bufferevent_socket_new函式,建立bufferevent_private之後呼叫bufferevent_init_common函式,這個函式定義在bufferevent.c檔案中,該檔案中的函式都是各個bufferevent型別通用的函式。
int bufferevent_init_common(struct bufferevent_private *bufev_private,
struct event_base *base,
const struct bufferevent_ops *ops,
enum bufferevent_options options)
{
struct bufferevent *bufev = &bufev_private->bev;
if (!bufev->input) {
if ((bufev->input = evbuffer_new()) == NULL)
return -1;
}
if (!bufev->output) {
if ((bufev->output = evbuffer_new()) == NULL) {
evbuffer_free(bufev->input);
return -1;
}
}
bufev_private->refcnt = 1;
bufev->ev_base = base;
/* Disable timeouts. */
evutil_timerclear(&bufev->timeout_read);
evutil_timerclear(&bufev->timeout_write);
bufev->be_ops = ops;
/*
* Set to EV_WRITE so that using bufferevent_write is going to
* trigger a callback. Reading needs to be explicitly enabled
* because otherwise no data will be available.
*/
bufev->enabled = EV_WRITE;
#ifndef _EVENT_DISABLE_THREAD_SUPPORT
if (options & BEV_OPT_THREADSAFE) {
if (bufferevent_enable_locking(bufev, NULL) < 0) {
/* cleanup */
evbuffer_free(bufev->input);
evbuffer_free(bufev->output);
bufev->input = NULL;
bufev->output = NULL;
return -1;
}
}
#endif
if ((options & (BEV_OPT_DEFER_CALLBACKS|BEV_OPT_UNLOCK_CALLBACKS))
== BEV_OPT_UNLOCK_CALLBACKS) {
event_warnx("UNLOCK_CALLBACKS requires DEFER_CALLBACKS");
return -1;
}
if (options & BEV_OPT_DEFER_CALLBACKS) {
if (options & BEV_OPT_UNLOCK_CALLBACKS)
event_deferred_cb_init(&bufev_private->deferred,
bufferevent_run_deferred_callbacks_unlocked,
bufev_private);
else
event_deferred_cb_init(&bufev_private->deferred,
bufferevent_run_deferred_callbacks_locked,
bufev_private);
}
bufev_private->options = options;
evbuffer_set_parent(bufev->input, bufev);
evbuffer_set_parent(bufev->output, bufev);
return 0;
}
bufferevent_init_common比較簡單,需要注意 bufferEvent預設開啟EV_WRITE,EV_READ需要手動開啟,如果需要支援多執行緒則為bufferevent建立鎖,另外在目前的版本中,UNLOCK_CALLBACKS需要DEFER_CALLBACKS作為前置條件。
繼續分析bufferevent_socket_new函式:
evbuffer_set_flags(bufev->output, EVBUFFER_FLAG_DRAINS_TO_FD);
event_assign(&bufev->ev_read, bufev->ev_base, fd,
EV_READ|EV_PERSIST, bufferevent_readcb, bufev);
event_assign(&bufev->ev_write, bufev->ev_base, fd,
EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev);
evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev);
呼叫bufferevent_init_common之後會將bufev->output新增EVBUFFER_FLAG_DRAINS_TO_FD標記,這樣output就支援從檔案讀取資料了,接下來設定讀寫事件回掉,最後新增bufferevent_socket_outbuf_cb作為output的call,當output中有資料時呼叫bufferevent_socket_outbuf_cb,這樣就可以不必一直監聽寫事件,bufferevent_writecb在資料全部寫出之後可以移除寫事件, 然後在bufferevent_socket_outbuf_cb中判斷是否需要新增事件。
初始化bufferevent之後需要呼叫be_socket_setfd設定網路套接字來監聽讀寫事件:
static void
be_socket_setfd(struct bufferevent *bufev, evutil_socket_t fd)
{
BEV_LOCK(bufev);
EVUTIL_ASSERT(bufev->be_ops == &bufferevent_ops_socket);
event_del(&bufev->ev_read);
event_del(&bufev->ev_write);
event_assign(&bufev->ev_read, bufev->ev_base, fd,
EV_READ|EV_PERSIST, bufferevent_readcb, bufev);
event_assign(&bufev->ev_write, bufev->ev_base, fd,
EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev);
if (fd >= 0)
bufferevent_enable(bufev, bufev->enabled);
BEV_UNLOCK(bufev);
}
int bufferevent_enable(struct bufferevent *bufev, short event)
{
struct bufferevent_private *bufev_private =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
short impl_events = event;
int r = 0;
_bufferevent_incref_and_lock(bufev);
if (bufev_private->read_suspended)
impl_events &= ~EV_READ;
if (bufev_private->write_suspended)
impl_events &= ~EV_WRITE;
bufev->enabled |= event;
if (impl_events && bufev->be_ops->enable(bufev, impl_events) < 0)
r = -1;
_bufferevent_decref_and_unlock(bufev);
return r;
}
be_socket_setfd中設定的套接字必須是已經建立好連線的socket。如果是client端,bufferevent_sock提供了bufferevent_socket_connect用於連線套接字,但是如果是server端,libevent沒有提供listen的相關函式,不過libevent提供了listener.c檔案可以用於監聽連線。
int
bufferevent_socket_connect(struct bufferevent *bev,
struct sockaddr *sa, int socklen)
{
struct bufferevent_private *bufev_p =
EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
evutil_socket_t fd;
int r = 0;
int result=-1;
int ownfd = 0;
_bufferevent_incref_and_lock(bev);
if (!bufev_p)
goto done;
fd = bufferevent_getfd(bev);
if (fd < 0) {
if (!sa)
goto done;
fd = socket(sa->sa_family, SOCK_STREAM, 0);
if (fd < 0)
goto done;
if (evutil_make_socket_nonblocking(fd)<0)
goto done;
ownfd = 1;
}
if (sa) {
#ifdef WIN32
if (bufferevent_async_can_connect(bev)) {
bufferevent_setfd(bev, fd);
r = bufferevent_async_connect(bev, fd, sa, socklen);
if (r < 0)
goto freesock;
bufev_p->connecting = 1;
result = 0;
goto done;
} else
#endif
r = evutil_socket_connect(&fd, sa, socklen);
if (r < 0)
goto freesock;
}
#ifdef WIN32
/* ConnectEx() isn't always around, even when IOCP is enabled.
* Here, we borrow the socket object's write handler to fall back
* on a non-blocking connect() when ConnectEx() is unavailable. */
if (BEV_IS_ASYNC(bev)) {
event_assign(&bev->ev_write, bev->ev_base, fd,
EV_WRITE|EV_PERSIST, bufferevent_writecb, bev);
}
#endif
bufferevent_setfd(bev, fd);
if (r == 0) {
if (! be_socket_enable(bev, EV_WRITE)) {
bufev_p->connecting = 1;
result = 0;
goto done;
}
} else if (r == 1) {
/* The connect succeeded already. How very BSD of it. */
result = 0;
bufev_p->connecting = 1;
event_active(&bev->ev_write, EV_WRITE, 1);
} else {
/* The connect failed already. How very BSD of it. */
bufev_p->connection_refused = 1;
bufev_p->connecting = 1;
result = 0;
event_active(&bev->ev_write, EV_WRITE, 1);
}
goto done;
freesock:
_bufferevent_run_eventcb(bev, BEV_EVENT_ERROR);
if (ownfd)
evutil_closesocket(fd);
/* do something about the error? */
done:
_bufferevent_decref_and_unlock(bev);
return result;
}
win32的connect使用iocp實現,將在下一節iocp章節詳細分析,這裡只看其他平臺的實現方式。evutil_socket_connect是libevent提供的工具函式,它的返回值有三種情況:0代表正在連線,此時需要呼叫be_socket_enable啟用寫事件。當連線成功時會呼叫bufferevent_writecb回掉。1代表已經連線成功,此時呼叫event_active直接手動觸發bufferevent_writecb事件處理連線。-1代表連線被拒絕,此時同樣呼叫event_active處理連線失敗情況。
下面是bufferevent_writecb的具體實現:
static void
bufferevent_writecb(evutil_socket_t fd, short event, void *arg)
{
struct bufferevent *bufev = arg;
struct bufferevent_private *bufev_p =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
int res = 0;
short what = BEV_EVENT_WRITING;
int connected = 0;
ev_ssize_t atmost = -1;
_bufferevent_incref_and_lock(bufev);
if (event == EV_TIMEOUT) {
/* Note that we only check for event==EV_TIMEOUT. If
* event==EV_TIMEOUT|EV_WRITE, we can safely ignore the
* timeout, since a read has occurred */
what |= BEV_EVENT_TIMEOUT;
goto error;
}
if (bufev_p->connecting) {
int c = evutil_socket_finished_connecting(fd);
/* we need to fake the error if the connection was refused
* immediately - usually connection to localhost on BSD */
if (bufev_p->connection_refused) {
bufev_p->connection_refused = 0;
c = -1;
}
if (c == 0)
goto done;
bufev_p->connecting = 0;
if (c < 0) {
event_del(&bufev->ev_write);
event_del(&bufev->ev_read);
_bufferevent_run_eventcb(bufev, BEV_EVENT_ERROR);
goto done;
} else {
connected = 1;
#ifdef WIN32
if (BEV_IS_ASYNC(bufev)) {
event_del(&bufev->ev_write);
bufferevent_async_set_connected(bufev);
_bufferevent_run_eventcb(bufev,
BEV_EVENT_CONNECTED);
goto done;
}
#endif
_bufferevent_run_eventcb(bufev,
BEV_EVENT_CONNECTED);
if (!(bufev->enabled & EV_WRITE) ||
bufev_p->write_suspended) {
event_del(&bufev->ev_write);
goto done;
}
}
}
atmost = _bufferevent_get_write_max(bufev_p);
if (bufev_p->write_suspended)
goto done;
if (evbuffer_get_length(bufev->output)) {
evbuffer_unfreeze(bufev->output, 1);
res = evbuffer_write_atmost(bufev->output, fd, atmost);
evbuffer_freeze(bufev->output, 1);
if (res == -1) {
int err = evutil_socket_geterror(fd);
if (EVUTIL_ERR_RW_RETRIABLE(err))
goto reschedule;
what |= BEV_EVENT_ERROR;
} else if (res == 0) {
/* eof case
XXXX Actually, a 0 on write doesn't indicate
an EOF. An ECONNRESET might be more typical.
*/
what |= BEV_EVENT_EOF;
}
if (res <= 0)
goto error;
_bufferevent_decrement_write_buckets(bufev_p, res);
}
if (evbuffer_get_length(bufev->output) == 0) {
event_del(&bufev->ev_write);
}
/*
* Invoke the user callback if our buffer is drained or below the
* low watermark.
*/
if ((res || !connected) &&
evbuffer_get_length(bufev->output) <= bufev->wm_write.low) {
_bufferevent_run_writecb(bufev);
}
goto done;
reschedule:
if (evbuffer_get_length(bufev->output) == 0) {
event_del(&bufev->ev_write);
}
goto done;
error:
bufferevent_disable(bufev, EV_WRITE);
_bufferevent_run_eventcb(bufev, what);
done:
_bufferevent_decref_and_unlock(bufev);
}
bufferevent_writecb需要處理兩件事,一個是連線邏輯,一個是傳送資料邏輯,注意當處理連線邏輯時,如果當前沒有啟用EV_WRITE或者處於write_suspended狀態,需要刪除寫監聽,因為當前的寫監聽事件只用於處理連線。另外當處理資料邏輯時如果output的資料長度為0,同樣需要刪除寫事件,因為output的callback在有資料之後會重新新增寫事件:
static void
bufferevent_socket_outbuf_cb(struct evbuffer *buf,
const struct evbuffer_cb_info *cbinfo,
void *arg)
{
struct bufferevent *bufev = arg;
struct bufferevent_private *bufev_p =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
if (cbinfo->n_added &&
(bufev->enabled & EV_WRITE) &&
!event_pending(&bufev->ev_write, EV_WRITE, NULL) &&
!bufev_p->write_suspended) {
/* Somebody added data to the buffer, and we would like to
* write, and we were not writing. So, start writing. */
if (be_socket_add(&bufev->ev_write, &bufev->timeout_write) == -1) {
/* Should we log this? */
}
}
}
bufferevent_writecb和bufferevent_readcb還需要呼叫使用者自定義的讀寫事件:
void
_bufferevent_run_writecb(struct bufferevent *bufev)
{
/* Requires that we hold the lock and a reference */
struct bufferevent_private *p =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
if (bufev->writecb == NULL)
return;
if (p->options & BEV_OPT_DEFER_CALLBACKS) {
p->writecb_pending = 1;
if (!p->deferred.queued)
SCHEDULE_DEFERRED(p);
} else {
bufev->writecb(bufev, bufev->cbarg);
}
}
#define SCHEDULE_DEFERRED(bevp) \
do { \
bufferevent_incref(&(bevp)->bev); \
event_deferred_cb_schedule( \
event_base_get_deferred_cb_queue((bevp)->bev.ev_base), \
&(bevp)->deferred); \
} while (0)
如果bufferevent的選項包含BEV_OPT_DEFER_CALLBACKS則需要使用延遲呼叫方式,延遲呼叫是在event_base的loop迴圈統一處理的。p->deferred.queued如果沒有被設定過,則加入到ev_base的延遲呼叫佇列。bufferevent_init_common中設定過defered對應的回掉:
if (options & BEV_OPT_DEFER_CALLBACKS) {
if (options & BEV_OPT_UNLOCK_CALLBACKS)
event_deferred_cb_init(&bufev_private->deferred,
bufferevent_run_deferred_callbacks_unlocked,
bufev_private);
else
event_deferred_cb_init(&bufev_private->deferred,
bufferevent_run_deferred_callbacks_locked,
bufev_private);
}
這兩個回掉只是使用鎖的方式不同:
static void
bufferevent_run_deferred_callbacks_locked(struct deferred_cb *_, void *arg)
{
struct bufferevent_private *bufev_private = arg;
struct bufferevent *bufev = &bufev_private->bev;
BEV_LOCK(bufev);
if ((bufev_private->eventcb_pending & BEV_EVENT_CONNECTED) &&
bufev->errorcb) {
/* The "connected" happened before any reads or writes, so
send it first. */
bufev_private->eventcb_pending &= ~BEV_EVENT_CONNECTED;
bufev->errorcb(bufev, BEV_EVENT_CONNECTED, bufev->cbarg);
}
if (bufev_private->readcb_pending && bufev->readcb) {
bufev_private->readcb_pending = 0;
bufev->readcb(bufev, bufev->cbarg);
}
if (bufev_private->writecb_pending && bufev->writecb) {
bufev_private->writecb_pending = 0;
bufev->writecb(bufev, bufev->cbarg);
}
if (bufev_private->eventcb_pending && bufev->errorcb) {
short what = bufev_private->eventcb_pending;
int err = bufev_private->errno_pending;
bufev_private->eventcb_pending = 0;
bufev_private->errno_pending = 0;
EVUTIL_SET_SOCKET_ERROR(err);
bufev->errorcb(bufev, what, bufev->cbarg);
}
_bufferevent_decref_and_unlock(bufev);
}
static void
bufferevent_run_deferred_callbacks_unlocked(struct deferred_cb *_, void *arg)
{
struct bufferevent_private *bufev_private = arg;
struct bufferevent *bufev = &bufev_private->bev;
BEV_LOCK(bufev);
#define UNLOCKED(stmt) \
do { BEV_UNLOCK(bufev); stmt; BEV_LOCK(bufev); } while(0)
if ((bufev_private->eventcb_pending & BEV_EVENT_CONNECTED) &&
bufev->errorcb) {
/* The "connected" happened before any reads or writes, so
send it first. */
bufferevent_event_cb errorcb = bufev->errorcb;
void *cbarg = bufev->cbarg;
bufev_private->eventcb_pending &= ~BEV_EVENT_CONNECTED;
UNLOCKED(errorcb(bufev, BEV_EVENT_CONNECTED, cbarg));
}
if (bufev_private->readcb_pending && bufev->readcb) {
bufferevent_data_cb readcb = bufev->readcb;
void *cbarg = bufev->cbarg;
bufev_private->readcb_pending = 0;
UNLOCKED(readcb(bufev, cbarg));
}
if (bufev_private->writecb_pending && bufev->writecb) {
bufferevent_data_cb writecb = bufev->writecb;
void *cbarg = bufev->cbarg;
bufev_private->writecb_pending = 0;
UNLOCKED(writecb(bufev, cbarg));
}
if (bufev_private->eventcb_pending && bufev->errorcb) {
bufferevent_event_cb errorcb = bufev->errorcb;
void *cbarg = bufev->cbarg;
short what = bufev_private->eventcb_pending;
int err = bufev_private->errno_pending;
bufev_private->eventcb_pending = 0;
bufev_private->errno_pending = 0;
EVUTIL_SET_SOCKET_ERROR(err);
UNLOCKED(errorcb(bufev,what,cbarg));
}
_bufferevent_decref_and_unlock(bufev);
#undef UNLOCKED
}
以上就是bufferevent_sock的實現方式,下一節我們將分析IOCP的實現方式。