1. 程式人生 > >libevent原始碼分析之多執行緒準備工作

libevent原始碼分析之多執行緒準備工作

libevent中是預設不開啟多執行緒的,也就沒有什麼鎖, 條件變數等的說法了
我們可以使用evthread_use_pthreads()開啟linux下的pthread

#ifdef WIN32
int evthread_use_windows_threads(void);
#define EVTHREAD_USE_WINDOWS_THREADS_IMPLEMENTED
#endif
#ifdef _EVENT_HAVE_PTHREADS
int evthread_use_pthreads(void);
#define EVTHREAD_USE_PTHREADS_IMPLEMENTED
#endif

或者使用evthread_set_lock_callbacks定製屬於自己的多執行緒, 鎖, 條件變數等

libevent是如何封裝多執行緒的呢, 還需要從下面這個結構體說起:

這是對某種執行緒的鎖的一些操作封裝:


struct evthread_lock_callbacks {
    /** The current version of the locking API.  Set this to
     * EVTHREAD_LOCK_API_VERSION */
        /**/
    int lock_api_version;
    /** Which kinds of locks does this version of the locking API
     * support?  A bitfield of EVTHREAD_LOCKTYPE_RECURSIVE and
     * EVTHREAD_LOCKTYPE_READWRITE.
     *
     * (Note that RECURSIVE locks are currently mandatory, and
     * READWRITE locks are not currently used.)
     **/
    unsigned supported_locktypes;

        //初始化鎖
    void *(*alloc)(unsigned locktype);

        //釋放鎖
    void (*free)(void *lock, unsigned locktype);

    int (*lock)(unsigned mode, void *lock);

    int (*unlock)(unsigned mode, void *lock);
};

根據官方給出的文件, 支援的鎖型別有:
普通的非遞迴鎖                                               0
遞迴鎖, 即本執行緒可以多次對某鎖進行lock而不會進入死鎖          EVTHREAD_LOCKTYPE_RECURSIVE
讀寫鎖                                   EVTHREAD_LOCKTYPE_READWRITE

支援的鎖mode有:
EVTHREAD_READ , EVTHREAD_WRITE ,  EVTHREAD_TRY


以上是封裝的執行緒鎖, 下面是封裝的執行緒條件變數:
struct evthread_condition_callbacks {
    int condition_api_version;

    void *(*alloc_condition)(unsigned condtype);

    void (*free_condition)(void *cond);

    int (*signal_condition)(void *cond, int broadcast);

    int (*wait_condition)(void *cond, void *lock,
        const struct timeval *timeout);
};

因為對於條件變數的包裝類似, 所以這裡略過

如何與libeven相結合的呢?

文章頂部提到, 我們可以使用evthread_use_pthreads來開啟linux下的pthread:
int
evthread_use_pthreads(void)
{
    struct evthread_lock_callbacks cbs = {
        EVTHREAD_LOCK_API_VERSION,
        EVTHREAD_LOCKTYPE_RECURSIVE,
        evthread_posix_lock_alloc,    
        evthread_posix_lock_free,
        evthread_posix_lock,
        evthread_posix_unlock
    };
    struct evthread_condition_callbacks cond_cbs = {
        EVTHREAD_CONDITION_API_VERSION,
        evthread_posix_cond_alloc,
        evthread_posix_cond_free,
        evthread_posix_cond_signal,
        evthread_posix_cond_wait
    };
    /* Set ourselves up to get recursive locks. */
        //以下為初始化遞迴鎖的屬性,attr_recursive是全域性變數, 該屬性物件是在初始化一個鎖的時候使用的
        //雖然我們的預設鎖的名字叫EVTHREAD_LOCKTYPE_RECURSIVE,但真正讓它稱為遞迴鎖還是要靠這個屬性而不是單純的名字就行...
    if (pthread_mutexattr_init(&attr_recursive))
        return -1;
        //為該屬性新增遞迴鎖的遞迴屬性
    if (pthread_mutexattr_settype(&attr_recursive, PTHREAD_MUTEX_RECURSIVE))
        return -1;

        //設定整個event_base將會使用的執行緒鎖、條件變數、 以及如何獲取執行緒自身的執行緒ID
    evthread_set_lock_callbacks(&cbs);
    evthread_set_condition_callbacks(&cond_cbs);
    evthread_set_id_callback(evthread_posix_get_id);
    return 0;
}

關於這裡面的函式, 要提一下的是下面的函式:
static void *
evthread_posix_lock_alloc(unsigned locktype)
{
    pthread_mutexattr_t *attr = NULL;
    pthread_mutex_t *lock = mm_malloc(sizeof(pthread_mutex_t));
    if (!lock)
        return NULL;
    //從這裡可以看出, 如果我們需要遞迴鎖, 那麼就會用到那個全域性變數, 否則就是普通的非遞迴鎖
        //還可以看出來的是, 並沒有支援讀寫鎖
    if (locktype & EVTHREAD_LOCKTYPE_RECURSIVE)        
        attr = &attr_recursive;
    if (pthread_mutex_init(lock, attr)) {
        mm_free(lock);
        return NULL;
    }
    return lock;
}

static int
evthread_posix_cond_signal(void *_cond, int broadcast)
{
    pthread_cond_t *cond = _cond;
    int r;
    if (broadcast)                //可見, 如果broadcast不為0, 那麼就會喚醒不止一個執行緒
        r = pthread_cond_broadcast(cond);
    else
        r = pthread_cond_signal(cond);
    return r ? -1 : 0;
}


好了,看到這裡, 想必對libevent對執行緒的支援有所瞭解了,只是簡單的在我們平時使用的基礎上進行一些封裝, 理解起來並不困難
還有要注意的是, 設定libevent支援執行緒必須在最開始就宣告好!與自己設定記憶體分配函式、日誌記錄函式類似, 不能半途設定的!

debug鎖

除了普通的系統支援的鎖之外, libevent還定製了一種特有的鎖

該鎖支援對鎖操作的幾個檢測,如下, 看過後還會做總結:
1、某執行緒可能解鎖自己並不持有的鎖
2、在沒有解鎖之前,想要鎖住一個非遞迴的鎖

如何開啟呢, 可以在使用evthread_use_pthreads開啟執行緒後, 再呼叫evthread_enable_lock_debugging就可以開啟debug鎖了
void
evthread_enable_lock_debuging(void)
{
    struct evthread_lock_callbacks cbs = {
        EVTHREAD_LOCK_API_VERSION,
        EVTHREAD_LOCKTYPE_RECURSIVE,
        debug_lock_alloc,
        debug_lock_free,
        debug_lock_lock,
        debug_lock_unlock
    };

        //是否已經宣告過該debug鎖了
    if (_evthread_lock_debugging_enabled)
        return;

        //全域性變數_evthread_lock_fns是在evthread_use_pthreads中呼叫evthread_set_lock_callbacks為其賦值的
        //現在用_original_lock_fns變數來儲存它
    memcpy(&_original_lock_fns, &_evthread_lock_fns,
        sizeof(struct evthread_lock_callbacks));

        //此刻的_evthread_lock_fns成為了debug鎖
    memcpy(&_evthread_lock_fns, &cbs,
        sizeof(struct evthread_lock_callbacks));

    memcpy(&_original_cond_fns, &_evthread_cond_fns,
        sizeof(struct evthread_condition_callbacks));

        //這裡只是修改了條件變數裡的一個函式, 這是因為只有wait函式才用到鎖...
    _evthread_cond_fns.wait_condition = debug_cond_wait;
    _evthread_lock_debugging_enabled = 1;

    /* XXX return value should get checked. */
    event_global_setup_locks_(0);
}


也就是說, 現在所有的鎖相關到操作都被替換為debug鎖的操作了, 之前在evthread_set_lock_callbacks設定的雖然被取代但也都被保留了下來
於是我們現在看看debug鎖的具體操作:

struct debug_lock {
    unsigned locktype;
    unsigned long held_by;
    /* XXXX if we ever use read-write locks, we will need a separate
     * lock to protect count. */
    int count;
    void *lock;
};
static void *
debug_lock_alloc(unsigned locktype)
{
    struct debug_lock *result = mm_malloc(sizeof(struct debug_lock));
    if (!result)
        return NULL;
        //如果我們是在開啟了執行緒之後開啟debug鎖的, 那麼我們會利用該鎖的alloc函式為debug鎖生成一個物件, 要注意的是, 是遞迴鎖
    if (_original_lock_fns.alloc) {
        if (!(result->lock = _original_lock_fns.alloc(
                locktype|EVTHREAD_LOCKTYPE_RECURSIVE))) {
            mm_free(result);
            return NULL;
        }
    } else {
        result->lock = NULL;
    }
    result->locktype = locktype;
    result->count = 0;
    result->held_by = 0;
    return result;
}



從上面我們可以知道, 分配、釋放debug鎖和分配、釋放系統鎖並沒有太大差別.
差別主要在於debug的主要職責上, 即在加鎖和解鎖時進行特別的判定
如下:

//此時,我們需要上鎖
static int
debug_lock_lock(unsigned mode, void *lock_)
{
    struct debug_lock *lock = lock_;
    int res = 0;
        //此鎖的locktype就是建立時候自己定義的,外加一個遞迴屬性 , 如果此鎖是讀寫鎖
        //這裡的判斷是, 如果是讀寫鎖, 至少有個讀鎖或寫鎖的屬性
    if (lock->locktype & EVTHREAD_LOCKTYPE_READWRITE)
        EVUTIL_ASSERT(mode & (EVTHREAD_READ|EVTHREAD_WRITE));
    else
        //既然不是讀寫鎖, 那就不應該有讀鎖或寫鎖的屬性
        EVUTIL_ASSERT((mode & (EVTHREAD_READ|EVTHREAD_WRITE)) == 0);

        //如果被debug封裝的鎖具有鎖的屬性, 如果沒有,那就表示這之前沒有呼叫evthread_use_pthreads等開啟多執行緒, 也表示lock變數中的lock成員為空
    if (_original_lock_fns.lock)
        res = _original_lock_fns.lock(mode, lock->lock);   //要知道的是, lock成功返回0

        //進入這個if有兩個條件, 一個是這之前沒有呼叫evthread_use_pthreads等, 二是成功鎖住此鎖了
    if (!res) {
        evthread_debug_lock_mark_locked(mode, lock);
    }
    return res;
}

static void
evthread_debug_lock_mark_locked(unsigned mode, struct debug_lock *lock)
{
    ++lock->count;            //此變數初始值為0
        //如果沒有遞迴這個屬性, 即非遞迴鎖, count只能為1. 只是若一個非遞迴的鎖是正常的(是沒bug的?),那麼肯定會在呼叫_original_lock_fns.lock時形成死鎖而不會到這一步...
    if (!(lock->locktype & EVTHREAD_LOCKTYPE_RECURSIVE))
        EVUTIL_ASSERT(lock->count == 1);
    if (_evthread_id_fn) {
                //得到當前執行緒的執行緒ID
        unsigned long me;
        me = _evthread_id_fn();
                //如果count大於1了,說明到了遞迴鎖的特性了, 必須是本執行緒先前就擁有的這把鎖的所有權
        if (lock->count > 1)
            EVUTIL_ASSERT(lock->held_by == me);
        lock->held_by = me;
    }
}



看了加鎖的實現, 肯定是迫不及待的去看解鎖的實現了:

static int
debug_lock_unlock(unsigned mode, void *lock_)
{
    struct debug_lock *lock = lock_;
    int res = 0;
    evthread_debug_lock_mark_unlocked(mode, lock);
    if (_original_lock_fns.unlock)
        res = _original_lock_fns.unlock(mode, lock->lock);
    return res;
}
static void
evthread_debug_lock_mark_unlocked(unsigned mode, struct debug_lock *lock)
{
    if (lock->locktype & EVTHREAD_LOCKTYPE_READWRITE)
        EVUTIL_ASSERT(mode & (EVTHREAD_READ|EVTHREAD_WRITE));
    else
        EVUTIL_ASSERT((mode & (EVTHREAD_READ|EVTHREAD_WRITE)) == 0);
    if (_evthread_id_fn) {
                //如果持有鎖的執行緒不是我,那麼就出錯了
        EVUTIL_ASSERT(lock->held_by == _evthread_id_fn());
                //如果此鎖count為1,那麼解鎖後就不歸任何執行緒所有了
        if (lock->count == 1)
            lock->held_by = 0;
    }
    --lock->count;
        //如果之前沒有鎖過這個鎖, 那麼無需解...
    EVUTIL_ASSERT(lock->count >= 0);
}


綜上,再來總結一下debug鎖解決了哪些不該出現的問題呢?
1、首先我們知道, 對於一個讀寫鎖(在呼叫alloc申請的時候表明的意圖),如果在給鎖加鎖的時候沒有指定任何EVTHREAD_READ或寫的加鎖模式,那麼肯定是錯誤的(解鎖亦然)
2、(可能,這一條不確定...)對於一個非遞迴鎖卻在一個執行緒中呼叫lock不止一次,可能會報錯(可能直接死鎖了)
3、遞迴四隻能在一個執行緒中鎖幾次
4、當前鎖是鎖住的,但加鎖的不是我, 但我要去解鎖,就出錯
5、沒有加過鎖, 卻還要解鎖


設定libevent要用的執行緒相關工具

認識了debug鎖, 接下來我們更加深入的探討一下evthread_set_lock_callbacks函式:

int
evthread_set_lock_callbacks(const struct evthread_lock_callbacks *cbs)
{
        //_evthread_lock_debugging_enabled標誌為全域性變數,初始為0, 為是否開啟debug鎖, 不過開啟debug鎖一般在開啟執行緒之後的
        //然而呼叫此函式evthread_set_lock_callbacks
    struct evthread_lock_callbacks *target =
        _evthread_lock_debugging_enabled
        ? &_original_lock_fns : &_evthread_lock_fns;

    if (!cbs) {
                //加入引數為NULL, 那麼可以取消鎖的實現, 只是警告中說這並不一定起作用
        if (target->alloc)
            event_warnx("Trying to disable lock functions after "
                "they have been set up will probaby not work.");
        memset(target, 0, sizeof(_evthread_lock_fns));
        return 0;
    }
        //這步操作可能並不是在我們之前提到的整個libevent的最前面發生的,情理之中也是不允許中途改鎖的
    if (target->alloc) {
        /* Uh oh; we already had locking callbacks set up.*/
        if (target->lock_api_version == cbs->lock_api_version &&
            target->supported_locktypes == cbs->supported_locktypes &&
            target->alloc == cbs->alloc &&
            target->free == cbs->free &&
            target->lock == cbs->lock &&
            target->unlock == cbs->unlock) {
            /* no change -- allow this. */
            return 0;
        }
        event_warnx("Can't change lock callbacks once they have been "
            "initialized.");
        return -1;
    }

        //必須支援這裡的所有操作,否則無法接受
    if (cbs->alloc && cbs->free && cbs->lock && cbs->unlock) {
        memcpy(target, cbs, sizeof(_evthread_lock_fns));
        return event_global_setup_locks_(1);
    } else {
        return -1;
    }
}



看到這裡,是不是覺得就不能修改鎖的相關函數了呢, 其實是可以的, 首先我們第一次呼叫傳入NULL引數, 第二次呼叫傳入新的操作函式結構體, 可能就可以了
但是, 這樣做是存在問題的.

瞭解了鎖的大部分實現後, 我們來看看是libevent是如何應用他們的
經常見到的就是這兩個了:
EVBASE_ACQUIRE_LOCK(ev->ev_base, th_base_lock);  

EVBASE_RELEASE_LOCK(ev->ev_base, th_base_lock);
第一個引數為base , 第二個為鎖物件

某個鎖定然是與某個base相關的,於是我們第一事件找到event_base:
struct event_base {

        ... ...

#ifndef _EVENT_DISABLE_THREAD_SUPPORT
    /* threading support */
    /** The thread currently running the event_loop for this base */
        //主執行緒ID
    unsigned long th_owner_id;
    /** A lock to prevent conflicting accesses to this event_base */
        //訪問base中元素用的lock, 比如有時候需要新增一些事件等
    void *th_base_lock;
        
        ... ...

    /** A condition that gets signalled when we're done processing an
     * event with waiters on it. */
    void *current_event_cond;
    /** Number of threads blocking on current_event_cond. */
    int current_event_waiters;
#endif
        
        ... ...

};


接下來看看巨集的相關實現:
#define EVBASE_ACQUIRE_LOCK(base, lockvar) do {                         \
                EVLOCK_LOCK((base)->lockvar, 0);
        } while (0)

#define EVLOCK_LOCK(lockvar,mode)                                       \
        do {                                                            \
                     //如果這個鎖是存在的, 就呼叫lock函式來鎖
                if (lockvar)                                            \
                        _evthread_lock_fns.lock(mode, lockvar);         \
        } while (0)

#define EVBASE_RELEASE_LOCK(base, lockvar) do {                         \
                EVLOCK_UNLOCK((base)->lockvar, 0);                      \
        } while (0)

#define EVLOCK_UNLOCK(lockvar,mode)                                     \
        do {                                                            \
                if (lockvar)                                            \
                        _evthread_lock_fns.unlock(mode, lockvar);       \
        } while (0)


這個lockvar我們看到在event_base中有定義, 憑什麼說lockvar存在, 對應的鎖函式就存在呢?
我們就來看看是如何初始化這個鎖的:
struct event_base *
event_base_new_with_config(const struct event_config *cfg)
{

        ... ...

    /* prepare for threading */

#ifndef _EVENT_DISABLE_THREAD_SUPPORT
    if (EVTHREAD_LOCKING_ENABLED() &&
              //如果沒有事先宣告不需要鎖
        (!cfg || !(cfg->flags & EVENT_BASE_FLAG_NOLOCK))) {
        int r;
 
                //此鎖被宣告為遞迴鎖
        EVTHREAD_ALLOC_LOCK(base->th_base_lock,
            EVTHREAD_LOCKTYPE_RECURSIVE);
        base->defer_queue.lock = base->th_base_lock;
        EVTHREAD_ALLOC_COND(base->current_event_cond);
        r = evthread_make_base_notifiable(base);
        if (r<0) {
            event_warnx("%s: Unable to make base notifiable.", __func__);
            event_base_free(base);
            return NULL;
        }
    }
#endif

        ... ...

}



此鎖也是普通的利用鎖的alloc函式構造的,所以並不是說 lockvar存在, 對應的鎖函式就存在, 而是說如果lockvar不存在, 那麼對應鎖函式也不會存在, 也就不會去呼叫到它
也就避免了這個可能的錯誤
不過,之前我們提到, 在宣告使用執行緒後, base的初始化就會成功初始化base->th_base_lock, 此時我們可以通過給evthread_set_lock_callbacks傳入NULL引數, 現在就算lcokvar存在,我們也無法證明其對應的鎖函式是存在的了!像現在的情況, 傳入NULL後, 鎖函式為空, 再次執行到鎖的巨集, 就會出錯了。
所以不提倡在執行後修改鎖的操作函式。