libevent原始碼分析之多執行緒準備工作
阿新 • • 發佈:2019-01-01
libevent中是預設不開啟多執行緒的,也就沒有什麼鎖, 條件變數等的說法了
我們可以使用evthread_use_pthreads()開啟linux下的pthread
或者使用evthread_set_lock_callbacks定製屬於自己的多執行緒, 鎖, 條件變數等
根據官方給出的文件, 支援的鎖型別有:
支援的鎖mode有:
以上是封裝的執行緒鎖, 下面是封裝的執行緒條件變數:
因為對於條件變數的包裝類似, 所以這裡略過
關於這裡面的函式, 要提一下的是下面的函式:
好了,看到這裡, 想必對libevent對執行緒的支援有所瞭解了,只是簡單的在我們平時使用的基礎上進行一些封裝, 理解起來並不困難
還有要注意的是, 設定libevent支援執行緒必須在最開始就宣告好!與自己設定記憶體分配函式、日誌記錄函式類似, 不能半途設定的!
1、某執行緒可能解鎖自己並不持有的鎖
2、在沒有解鎖之前,想要鎖住一個非遞迴的鎖
如何開啟呢, 可以在使用evthread_use_pthreads開啟執行緒後, 再呼叫evthread_enable_lock_debugging就可以開啟debug鎖了
也就是說, 現在所有的鎖相關到操作都被替換為debug鎖的操作了, 之前在evthread_set_lock_callbacks設定的雖然被取代但也都被保留了下來
於是我們現在看看debug鎖的具體操作:
從上面我們可以知道, 分配、釋放debug鎖和分配、釋放系統鎖並沒有太大差別.
差別主要在於debug的主要職責上, 即在加鎖和解鎖時進行特別的判定
如下:
看了加鎖的實現, 肯定是迫不及待的去看解鎖的實現了:
綜上,再來總結一下debug鎖解決了哪些不該出現的問題呢?
1、首先我們知道, 對於一個讀寫鎖(在呼叫alloc申請的時候表明的意圖),如果在給鎖加鎖的時候沒有指定任何EVTHREAD_READ或寫的加鎖模式,那麼肯定是錯誤的(解鎖亦然)
2、(可能,這一條不確定...)對於一個非遞迴鎖卻在一個執行緒中呼叫lock不止一次,可能會報錯(可能直接死鎖了)
3、遞迴四隻能在一個執行緒中鎖幾次
4、當前鎖是鎖住的,但加鎖的不是我, 但我要去解鎖,就出錯
5、沒有加過鎖, 卻還要解鎖
看到這裡,是不是覺得就不能修改鎖的相關函數了呢, 其實是可以的, 首先我們第一次呼叫傳入NULL引數, 第二次呼叫傳入新的操作函式結構體, 可能就可以了
但是, 這樣做是存在問題的.
瞭解了鎖的大部分實現後, 我們來看看是libevent是如何應用他們的
經常見到的就是這兩個了:
某個鎖定然是與某個base相關的,於是我們第一事件找到event_base:
接下來看看巨集的相關實現:
這個lockvar我們看到在event_base中有定義, 憑什麼說lockvar存在, 對應的鎖函式就存在呢?
我們就來看看是如何初始化這個鎖的:
此鎖也是普通的利用鎖的alloc函式構造的,所以並不是說 lockvar存在, 對應的鎖函式就存在, 而是說如果lockvar不存在, 那麼對應鎖函式也不會存在, 也就不會去呼叫到它
也就避免了這個可能的錯誤
不過,之前我們提到, 在宣告使用執行緒後, base的初始化就會成功初始化base->th_base_lock, 此時我們可以通過給evthread_set_lock_callbacks傳入NULL引數, 現在就算lcokvar存在,我們也無法證明其對應的鎖函式是存在的了!像現在的情況, 傳入NULL後, 鎖函式為空, 再次執行到鎖的巨集, 就會出錯了。
所以不提倡在執行後修改鎖的操作函式。
我們可以使用evthread_use_pthreads()開啟linux下的pthread
#ifdef WIN32 int evthread_use_windows_threads(void); #define EVTHREAD_USE_WINDOWS_THREADS_IMPLEMENTED #endif #ifdef _EVENT_HAVE_PTHREADS int evthread_use_pthreads(void); #define EVTHREAD_USE_PTHREADS_IMPLEMENTED #endif
或者使用evthread_set_lock_callbacks定製屬於自己的多執行緒, 鎖, 條件變數等
libevent是如何封裝多執行緒的呢, 還需要從下面這個結構體說起:
這是對某種執行緒的鎖的一些操作封裝:
struct evthread_lock_callbacks { /** The current version of the locking API. Set this to * EVTHREAD_LOCK_API_VERSION */ /**/ int lock_api_version; /** Which kinds of locks does this version of the locking API * support? A bitfield of EVTHREAD_LOCKTYPE_RECURSIVE and * EVTHREAD_LOCKTYPE_READWRITE. * * (Note that RECURSIVE locks are currently mandatory, and * READWRITE locks are not currently used.) **/ unsigned supported_locktypes; //初始化鎖 void *(*alloc)(unsigned locktype); //釋放鎖 void (*free)(void *lock, unsigned locktype); int (*lock)(unsigned mode, void *lock); int (*unlock)(unsigned mode, void *lock); };
根據官方給出的文件, 支援的鎖型別有:
普通的非遞迴鎖 0
遞迴鎖, 即本執行緒可以多次對某鎖進行lock而不會進入死鎖 EVTHREAD_LOCKTYPE_RECURSIVE
讀寫鎖 EVTHREAD_LOCKTYPE_READWRITE
支援的鎖mode有:
EVTHREAD_READ , EVTHREAD_WRITE , EVTHREAD_TRY
以上是封裝的執行緒鎖, 下面是封裝的執行緒條件變數:
struct evthread_condition_callbacks { int condition_api_version; void *(*alloc_condition)(unsigned condtype); void (*free_condition)(void *cond); int (*signal_condition)(void *cond, int broadcast); int (*wait_condition)(void *cond, void *lock, const struct timeval *timeout); };
因為對於條件變數的包裝類似, 所以這裡略過
如何與libeven相結合的呢?
文章頂部提到, 我們可以使用evthread_use_pthreads來開啟linux下的pthread:int
evthread_use_pthreads(void)
{
struct evthread_lock_callbacks cbs = {
EVTHREAD_LOCK_API_VERSION,
EVTHREAD_LOCKTYPE_RECURSIVE,
evthread_posix_lock_alloc,
evthread_posix_lock_free,
evthread_posix_lock,
evthread_posix_unlock
};
struct evthread_condition_callbacks cond_cbs = {
EVTHREAD_CONDITION_API_VERSION,
evthread_posix_cond_alloc,
evthread_posix_cond_free,
evthread_posix_cond_signal,
evthread_posix_cond_wait
};
/* Set ourselves up to get recursive locks. */
//以下為初始化遞迴鎖的屬性,attr_recursive是全域性變數, 該屬性物件是在初始化一個鎖的時候使用的
//雖然我們的預設鎖的名字叫EVTHREAD_LOCKTYPE_RECURSIVE,但真正讓它稱為遞迴鎖還是要靠這個屬性而不是單純的名字就行...
if (pthread_mutexattr_init(&attr_recursive))
return -1;
//為該屬性新增遞迴鎖的遞迴屬性
if (pthread_mutexattr_settype(&attr_recursive, PTHREAD_MUTEX_RECURSIVE))
return -1;
//設定整個event_base將會使用的執行緒鎖、條件變數、 以及如何獲取執行緒自身的執行緒ID
evthread_set_lock_callbacks(&cbs);
evthread_set_condition_callbacks(&cond_cbs);
evthread_set_id_callback(evthread_posix_get_id);
return 0;
}
關於這裡面的函式, 要提一下的是下面的函式:
static void *
evthread_posix_lock_alloc(unsigned locktype)
{
pthread_mutexattr_t *attr = NULL;
pthread_mutex_t *lock = mm_malloc(sizeof(pthread_mutex_t));
if (!lock)
return NULL;
//從這裡可以看出, 如果我們需要遞迴鎖, 那麼就會用到那個全域性變數, 否則就是普通的非遞迴鎖
//還可以看出來的是, 並沒有支援讀寫鎖
if (locktype & EVTHREAD_LOCKTYPE_RECURSIVE)
attr = &attr_recursive;
if (pthread_mutex_init(lock, attr)) {
mm_free(lock);
return NULL;
}
return lock;
}
static int
evthread_posix_cond_signal(void *_cond, int broadcast)
{
pthread_cond_t *cond = _cond;
int r;
if (broadcast) //可見, 如果broadcast不為0, 那麼就會喚醒不止一個執行緒
r = pthread_cond_broadcast(cond);
else
r = pthread_cond_signal(cond);
return r ? -1 : 0;
}
好了,看到這裡, 想必對libevent對執行緒的支援有所瞭解了,只是簡單的在我們平時使用的基礎上進行一些封裝, 理解起來並不困難
還有要注意的是, 設定libevent支援執行緒必須在最開始就宣告好!與自己設定記憶體分配函式、日誌記錄函式類似, 不能半途設定的!
debug鎖
除了普通的系統支援的鎖之外, libevent還定製了一種特有的鎖
該鎖支援對鎖操作的幾個檢測,如下, 看過後還會做總結:1、某執行緒可能解鎖自己並不持有的鎖
2、在沒有解鎖之前,想要鎖住一個非遞迴的鎖
如何開啟呢, 可以在使用evthread_use_pthreads開啟執行緒後, 再呼叫evthread_enable_lock_debugging就可以開啟debug鎖了
void
evthread_enable_lock_debuging(void)
{
struct evthread_lock_callbacks cbs = {
EVTHREAD_LOCK_API_VERSION,
EVTHREAD_LOCKTYPE_RECURSIVE,
debug_lock_alloc,
debug_lock_free,
debug_lock_lock,
debug_lock_unlock
};
//是否已經宣告過該debug鎖了
if (_evthread_lock_debugging_enabled)
return;
//全域性變數_evthread_lock_fns是在evthread_use_pthreads中呼叫evthread_set_lock_callbacks為其賦值的
//現在用_original_lock_fns變數來儲存它
memcpy(&_original_lock_fns, &_evthread_lock_fns,
sizeof(struct evthread_lock_callbacks));
//此刻的_evthread_lock_fns成為了debug鎖
memcpy(&_evthread_lock_fns, &cbs,
sizeof(struct evthread_lock_callbacks));
memcpy(&_original_cond_fns, &_evthread_cond_fns,
sizeof(struct evthread_condition_callbacks));
//這裡只是修改了條件變數裡的一個函式, 這是因為只有wait函式才用到鎖...
_evthread_cond_fns.wait_condition = debug_cond_wait;
_evthread_lock_debugging_enabled = 1;
/* XXX return value should get checked. */
event_global_setup_locks_(0);
}
也就是說, 現在所有的鎖相關到操作都被替換為debug鎖的操作了, 之前在evthread_set_lock_callbacks設定的雖然被取代但也都被保留了下來
於是我們現在看看debug鎖的具體操作:
struct debug_lock {
unsigned locktype;
unsigned long held_by;
/* XXXX if we ever use read-write locks, we will need a separate
* lock to protect count. */
int count;
void *lock;
};
static void *
debug_lock_alloc(unsigned locktype)
{
struct debug_lock *result = mm_malloc(sizeof(struct debug_lock));
if (!result)
return NULL;
//如果我們是在開啟了執行緒之後開啟debug鎖的, 那麼我們會利用該鎖的alloc函式為debug鎖生成一個物件, 要注意的是, 是遞迴鎖
if (_original_lock_fns.alloc) {
if (!(result->lock = _original_lock_fns.alloc(
locktype|EVTHREAD_LOCKTYPE_RECURSIVE))) {
mm_free(result);
return NULL;
}
} else {
result->lock = NULL;
}
result->locktype = locktype;
result->count = 0;
result->held_by = 0;
return result;
}
從上面我們可以知道, 分配、釋放debug鎖和分配、釋放系統鎖並沒有太大差別.
差別主要在於debug的主要職責上, 即在加鎖和解鎖時進行特別的判定
如下:
//此時,我們需要上鎖
static int
debug_lock_lock(unsigned mode, void *lock_)
{
struct debug_lock *lock = lock_;
int res = 0;
//此鎖的locktype就是建立時候自己定義的,外加一個遞迴屬性 , 如果此鎖是讀寫鎖
//這裡的判斷是, 如果是讀寫鎖, 至少有個讀鎖或寫鎖的屬性
if (lock->locktype & EVTHREAD_LOCKTYPE_READWRITE)
EVUTIL_ASSERT(mode & (EVTHREAD_READ|EVTHREAD_WRITE));
else
//既然不是讀寫鎖, 那就不應該有讀鎖或寫鎖的屬性
EVUTIL_ASSERT((mode & (EVTHREAD_READ|EVTHREAD_WRITE)) == 0);
//如果被debug封裝的鎖具有鎖的屬性, 如果沒有,那就表示這之前沒有呼叫evthread_use_pthreads等開啟多執行緒, 也表示lock變數中的lock成員為空
if (_original_lock_fns.lock)
res = _original_lock_fns.lock(mode, lock->lock); //要知道的是, lock成功返回0
//進入這個if有兩個條件, 一個是這之前沒有呼叫evthread_use_pthreads等, 二是成功鎖住此鎖了
if (!res) {
evthread_debug_lock_mark_locked(mode, lock);
}
return res;
}
static void
evthread_debug_lock_mark_locked(unsigned mode, struct debug_lock *lock)
{
++lock->count; //此變數初始值為0
//如果沒有遞迴這個屬性, 即非遞迴鎖, count只能為1. 只是若一個非遞迴的鎖是正常的(是沒bug的?),那麼肯定會在呼叫_original_lock_fns.lock時形成死鎖而不會到這一步...
if (!(lock->locktype & EVTHREAD_LOCKTYPE_RECURSIVE))
EVUTIL_ASSERT(lock->count == 1);
if (_evthread_id_fn) {
//得到當前執行緒的執行緒ID
unsigned long me;
me = _evthread_id_fn();
//如果count大於1了,說明到了遞迴鎖的特性了, 必須是本執行緒先前就擁有的這把鎖的所有權
if (lock->count > 1)
EVUTIL_ASSERT(lock->held_by == me);
lock->held_by = me;
}
}
看了加鎖的實現, 肯定是迫不及待的去看解鎖的實現了:
static int
debug_lock_unlock(unsigned mode, void *lock_)
{
struct debug_lock *lock = lock_;
int res = 0;
evthread_debug_lock_mark_unlocked(mode, lock);
if (_original_lock_fns.unlock)
res = _original_lock_fns.unlock(mode, lock->lock);
return res;
}
static void
evthread_debug_lock_mark_unlocked(unsigned mode, struct debug_lock *lock)
{
if (lock->locktype & EVTHREAD_LOCKTYPE_READWRITE)
EVUTIL_ASSERT(mode & (EVTHREAD_READ|EVTHREAD_WRITE));
else
EVUTIL_ASSERT((mode & (EVTHREAD_READ|EVTHREAD_WRITE)) == 0);
if (_evthread_id_fn) {
//如果持有鎖的執行緒不是我,那麼就出錯了
EVUTIL_ASSERT(lock->held_by == _evthread_id_fn());
//如果此鎖count為1,那麼解鎖後就不歸任何執行緒所有了
if (lock->count == 1)
lock->held_by = 0;
}
--lock->count;
//如果之前沒有鎖過這個鎖, 那麼無需解...
EVUTIL_ASSERT(lock->count >= 0);
}
綜上,再來總結一下debug鎖解決了哪些不該出現的問題呢?
1、首先我們知道, 對於一個讀寫鎖(在呼叫alloc申請的時候表明的意圖),如果在給鎖加鎖的時候沒有指定任何EVTHREAD_READ或寫的加鎖模式,那麼肯定是錯誤的(解鎖亦然)
2、(可能,這一條不確定...)對於一個非遞迴鎖卻在一個執行緒中呼叫lock不止一次,可能會報錯(可能直接死鎖了)
3、遞迴四隻能在一個執行緒中鎖幾次
4、當前鎖是鎖住的,但加鎖的不是我, 但我要去解鎖,就出錯
5、沒有加過鎖, 卻還要解鎖
設定libevent要用的執行緒相關工具
認識了debug鎖, 接下來我們更加深入的探討一下evthread_set_lock_callbacks函式:
int
evthread_set_lock_callbacks(const struct evthread_lock_callbacks *cbs)
{
//_evthread_lock_debugging_enabled標誌為全域性變數,初始為0, 為是否開啟debug鎖, 不過開啟debug鎖一般在開啟執行緒之後的
//然而呼叫此函式evthread_set_lock_callbacks
struct evthread_lock_callbacks *target =
_evthread_lock_debugging_enabled
? &_original_lock_fns : &_evthread_lock_fns;
if (!cbs) {
//加入引數為NULL, 那麼可以取消鎖的實現, 只是警告中說這並不一定起作用
if (target->alloc)
event_warnx("Trying to disable lock functions after "
"they have been set up will probaby not work.");
memset(target, 0, sizeof(_evthread_lock_fns));
return 0;
}
//這步操作可能並不是在我們之前提到的整個libevent的最前面發生的,情理之中也是不允許中途改鎖的
if (target->alloc) {
/* Uh oh; we already had locking callbacks set up.*/
if (target->lock_api_version == cbs->lock_api_version &&
target->supported_locktypes == cbs->supported_locktypes &&
target->alloc == cbs->alloc &&
target->free == cbs->free &&
target->lock == cbs->lock &&
target->unlock == cbs->unlock) {
/* no change -- allow this. */
return 0;
}
event_warnx("Can't change lock callbacks once they have been "
"initialized.");
return -1;
}
//必須支援這裡的所有操作,否則無法接受
if (cbs->alloc && cbs->free && cbs->lock && cbs->unlock) {
memcpy(target, cbs, sizeof(_evthread_lock_fns));
return event_global_setup_locks_(1);
} else {
return -1;
}
}
看到這裡,是不是覺得就不能修改鎖的相關函數了呢, 其實是可以的, 首先我們第一次呼叫傳入NULL引數, 第二次呼叫傳入新的操作函式結構體, 可能就可以了
但是, 這樣做是存在問題的.
瞭解了鎖的大部分實現後, 我們來看看是libevent是如何應用他們的
經常見到的就是這兩個了:
EVBASE_ACQUIRE_LOCK(ev->ev_base, th_base_lock);
和EVBASE_RELEASE_LOCK(ev->ev_base, th_base_lock);
第一個引數為base , 第二個為鎖物件某個鎖定然是與某個base相關的,於是我們第一事件找到event_base:
struct event_base {
... ...
#ifndef _EVENT_DISABLE_THREAD_SUPPORT
/* threading support */
/** The thread currently running the event_loop for this base */
//主執行緒ID
unsigned long th_owner_id;
/** A lock to prevent conflicting accesses to this event_base */
//訪問base中元素用的lock, 比如有時候需要新增一些事件等
void *th_base_lock;
... ...
/** A condition that gets signalled when we're done processing an
* event with waiters on it. */
void *current_event_cond;
/** Number of threads blocking on current_event_cond. */
int current_event_waiters;
#endif
... ...
};
接下來看看巨集的相關實現:
#define EVBASE_ACQUIRE_LOCK(base, lockvar) do { \
EVLOCK_LOCK((base)->lockvar, 0);
} while (0)
#define EVLOCK_LOCK(lockvar,mode) \
do { \
//如果這個鎖是存在的, 就呼叫lock函式來鎖
if (lockvar) \
_evthread_lock_fns.lock(mode, lockvar); \
} while (0)
#define EVBASE_RELEASE_LOCK(base, lockvar) do { \
EVLOCK_UNLOCK((base)->lockvar, 0); \
} while (0)
#define EVLOCK_UNLOCK(lockvar,mode) \
do { \
if (lockvar) \
_evthread_lock_fns.unlock(mode, lockvar); \
} while (0)
這個lockvar我們看到在event_base中有定義, 憑什麼說lockvar存在, 對應的鎖函式就存在呢?
我們就來看看是如何初始化這個鎖的:
struct event_base *
event_base_new_with_config(const struct event_config *cfg)
{
... ...
/* prepare for threading */
#ifndef _EVENT_DISABLE_THREAD_SUPPORT
if (EVTHREAD_LOCKING_ENABLED() &&
//如果沒有事先宣告不需要鎖
(!cfg || !(cfg->flags & EVENT_BASE_FLAG_NOLOCK))) {
int r;
//此鎖被宣告為遞迴鎖
EVTHREAD_ALLOC_LOCK(base->th_base_lock,
EVTHREAD_LOCKTYPE_RECURSIVE);
base->defer_queue.lock = base->th_base_lock;
EVTHREAD_ALLOC_COND(base->current_event_cond);
r = evthread_make_base_notifiable(base);
if (r<0) {
event_warnx("%s: Unable to make base notifiable.", __func__);
event_base_free(base);
return NULL;
}
}
#endif
... ...
}
此鎖也是普通的利用鎖的alloc函式構造的,所以並不是說 lockvar存在, 對應的鎖函式就存在, 而是說如果lockvar不存在, 那麼對應鎖函式也不會存在, 也就不會去呼叫到它
也就避免了這個可能的錯誤
不過,之前我們提到, 在宣告使用執行緒後, base的初始化就會成功初始化base->th_base_lock, 此時我們可以通過給evthread_set_lock_callbacks傳入NULL引數, 現在就算lcokvar存在,我們也無法證明其對應的鎖函式是存在的了!像現在的情況, 傳入NULL後, 鎖函式為空, 再次執行到鎖的巨集, 就會出錯了。
所以不提倡在執行後修改鎖的操作函式。