1. 程式人生 > >GCD原始碼吐血分析(2)——dispatch_async/dispatch_sync

GCD原始碼吐血分析(2)——dispatch_async/dispatch_sync

上一章中,我們知道了獲取GCD queue的底層實現。獲取到queue後,就需要將任務提交到queue中進行處理。
我們有兩種方式提交任務:
dispatch_asyncdispatch_sync。一個是非同步不等待任務完成就返回,另一個是同步任務,需要等待任務完成。這兩種提交任務的方式有所不同:

dispatch_async :底層運用了執行緒池,會在和當前執行緒不同的執行緒上處理任務。

dispatch_sync :一般不會新開啟執行緒,而是在當前執行緒執行任務(比較特殊的是main queue,它會利用main runloop 將任務提交到主執行緒來執行),同時,它會阻塞當前執行緒,等待提交的任務執行完畢。當target queue是併發執行緒時,會直接執行任務。而target queue是序列佇列時,會檢測當前執行緒是否已經擁有了該序列佇列,如果答案是肯定的,則會觸發crash,這與老版本GCD中會觸發死鎖不同,因為在新版GCD中,已經加入了這種死鎖檢測機制,從而觸發crash,避免了除錯困難的死鎖的發生。

如下圖所示,當我們在同一執行緒中的序列佇列任務執行期間,再次向該佇列提交任務時,會引發crash。

這裡寫圖片描述

但是,目前這種檢測死鎖的機制也不是完美的,我們仍可以繞過crash來引發死鎖(是不是沒事找事兒?),具體可見下面的原始碼分析。

dispatch_sync

void
dispatch_sync(dispatch_queue_t dq, dispatch_block_t work)
{
    dispatch_sync_f(dq, work, _dispatch_Block_invoke(work));
}

void
dispatch_sync_f(dispatch_queue_t
dq, void *ctxt, dispatch_function_t func) { // 序列佇列走這裡 return dispatch_barrier_sync_f(dq, ctxt, func); } // 並行佇列走這裡 _dispatch_sync_invoke_and_complete(dq, ctxt, func); }

先來看一下序列佇列會執行的dispatch_barrier_sync_f

DISPATCH_NOINLINE
void
dispatch_barrier_sync_f(dispatch_queue_t dq, void
*ctxt, dispatch_function_t func) { dispatch_tid tid = _dispatch_tid_self(); // 獲取當前thread id if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(dq, tid))) { // 當前執行緒嘗試繫結獲取序列佇列的lock return _dispatch_sync_f_slow(dq, ctxt, func, DISPATCH_OBJ_BARRIER_BIT); // 執行緒獲取不到queue的lock,則序列入隊等待,當前執行緒阻塞 } // 不需要等待,則走這裡 _dispatch_queue_barrier_sync_invoke_and_complete(dq, ctxt, func); }

我們重點看一下執行緒是如何嘗試獲取序列佇列lock的,這很重要,這一步是後面的死鎖檢測的基礎

DISPATCH_ALWAYS_INLINE DISPATCH_WARN_RESULT
static inline bool
_dispatch_queue_try_acquire_barrier_sync(dispatch_queue_t dq, uint32_t tid)
{
    uint64_t init  = DISPATCH_QUEUE_STATE_INIT_VALUE(dq->dq_width);
    uint64_t value = DISPATCH_QUEUE_WIDTH_FULL_BIT | DISPATCH_QUEUE_IN_BARRIER |
            _dispatch_lock_value_from_tid(tid); // _dispatch_lock_value_from_tid 會去取tid二進位制數的2到31位 作為值(從0位算起)
    uint64_t old_state, new_state;

    // 這裡面有一堆巨集定義的原子操作,事實是
    // 嘗試將new_state賦值給dq.dq_state。 首先會用原子操作(atomic_load_explicit)取當前dq_state的值,作為old_state。如果old_state 不是dq_state的預設值(init | role), 則賦值失敗,返回false(這說明之前已經有人更改過dq_state,在序列佇列中,一次僅允許一個人更改dq_state), 獲取lock失敗。否則dq_state賦值為new_state(利用原子操作atomic_compare_exchange_weak_explicit 做賦值), 返回true,獲取lock成功。
    return os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, acquire, {
        uint64_t role = old_state & DISPATCH_QUEUE_ROLE_MASK;
        if (old_state != (init | role)) { // 如果dq_state已經被修改過,則直接返回false,不更新dq_state為new_state
            os_atomic_rmw_loop_give_up(break);
        }
        new_state = value | role;
    });
}

上面程式碼會去取dispatch_queue_t中的dq_state值。當這個dq_state沒有被別人修改過,即第一次被修改時,會將dq_state設定為new_state, 並返回true。此時,在new_state中標記了當前的queue被lock,同時記錄了lock 當前queue的執行緒tid。

如果dq_state已經被修改過了, 則函式返回false,同時,保持dq_state值不變。

看過了_dispatch_queue_try_acquire_barrier_sync的內部實現,我們再回到上一級dispatch_barrier_sync_f中:

DISPATCH_NOINLINE
void
dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt,
        dispatch_function_t func)
{
    dispatch_tid tid = _dispatch_tid_self(); // 獲取當前thread id
    if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(dq, tid))) { // 當前執行緒嘗試繫結獲取序列佇列的lock 
        return _dispatch_sync_f_slow(dq, ctxt, func, DISPATCH_OBJ_BARRIER_BIT); // 執行緒獲取不到queue的lock,則序列入隊等待,當前執行緒阻塞
    }

    ...
}

如果_dispatch_queue_try_acquire_barrier_sync 返回了false,則會進入到_dispatch_sync_f_slow中,在這裡會嘗試等待序列佇列中上一個任務執行完畢:

DISPATCH_NOINLINE
static void
_dispatch_sync_f_slow(dispatch_queue_t dq, void *ctxt,
        dispatch_function_t func, uintptr_t dc_flags)
{

    if (unlikely(!dq->do_targetq)) { // 如果dq沒有target queue,走這裡。這種情況多半不會發生,因為所有自定義建立的queue都有target queue是root queue之一
        return _dispatch_sync_function_invoke(dq, ctxt, func);
    }
    // 多數會走這裡
    _dispatch_sync_wait(dq, ctxt, func, dc_flags, dq, dc_flags);
}

我們來看一下_dispatch_sync_wait的實現的超級簡略版:

DISPATCH_NOINLINE
static void
_dispatch_sync_wait(dispatch_queue_t top_dq, void *ctxt,
        dispatch_function_t func, uintptr_t top_dc_flags,
        dispatch_queue_t dq, uintptr_t dc_flags)
{
    pthread_priority_t pp = _dispatch_get_priority();
    dispatch_tid tid = _dispatch_tid_self();
    dispatch_qos_t qos;
    uint64_t dq_state;
    // Step 1. 檢測是否會發生死鎖,若會發生死鎖,則直接crash
    dq_state = _dispatch_sync_wait_prepare(dq);
    // 如果當前的執行緒已經擁有目標queue,這時候在呼叫_dispatch_sync_wait,則會觸發crash
    // 這裡的判斷邏輯是lock的woner是否是tid(這裡因為在dq_state的lock裡面加入了tid的值,所有能夠自動識別出死鎖的情況:同一個序列佇列被同一個執行緒做兩次lock)
    if (unlikely(_dq_state_drain_locked_by(dq_state, tid))) {
        DISPATCH_CLIENT_CRASH((uintptr_t)dq_state,
                "dispatch_sync called on queue "
                "already owned by current thread");
    }

    // Step2. _dispatch_queue_push_sync_waiter(dq, &dsc, qos); // 將要執行的任務入隊

    // Step3.  等待前面的task執行完畢
    if (dsc.dc_data == DISPATCH_WLH_ANON) {
        // 等待執行緒事件,等待完成(進入dispach_sync的模式)
        _dispatch_thread_event_wait(&dsc.dsc_event); // 訊號量等待
        _dispatch_thread_event_destroy(&dsc.dsc_event); // 等待結束 銷燬thread event
        // If _dispatch_sync_waiter_wake() gave this thread an override,
        // ensure that the root queue sees it.
        if (dsc.dsc_override_qos > dsc.dsc_override_qos_floor) {
            _dispatch_set_basepri_override_qos(dsc.dsc_override_qos);
        }
    } else {
        _dispatch_event_loop_wait_for_ownership(&dsc);
    }

    // Step4. 
    // 等待結束,執行client程式碼
    _dispatch_sync_invoke_and_complete_recurse(top_dq, ctxt, func,top_dc_flags);
}

這裡我們重點看一下,GCD在Step1中是如何檢測死鎖的。其最終會呼叫函式

DISPATCH_ALWAYS_INLINE
static inline bool
_dispatch_lock_is_locked_by(dispatch_lock lock_value, dispatch_tid tid)
{
    // equivalent to _dispatch_lock_owner(lock_value) == tid
    return ((lock_value ^ tid) & DLOCK_OWNER_MASK) == 0;
}

lock_value就是dq_state,一個32位的整數。通過判斷((lock_value ^ tid) & DLOCK_OWNER_MASK)是否為0,來判斷當前的序列佇列是否已被同一個執行緒所獲取。如果當前佇列已經被當前執行緒獲取,即當前執行緒在執行一個序列任務中,如果此時我們在阻塞等待一個新的序列任務,則會發生死鎖。因此,在新版GCD中,當((lock_value ^ tid) & DLOCK_OWNER_MASK) == 0 時,就會主動觸發crash來避免死鎖。

序列佇列死鎖的情況是:

執行緒A在序列佇列dq中執行序列任務task1的過程中,如果再向dq中投遞序列任務task2,同時還要求必須阻塞當前執行緒,來等待task2結束(呼叫dispatch_sync投遞task2),那麼這時候會發生死鎖。

因為這時候task1還沒有結束,序列佇列不會取執行task2,而我們又要在當前執行緒等待task2的結束才肯繼續執行task1,即task1在等task2,而task2也在等task1,迴圈等待,形成死鎖。

總結一下,GCD會發生死鎖的情況必須同時滿足3個條件,才會形成task1和task2相互等待的情況:

  1. 序列佇列正在執行task1
  2. 在task1中又向序列佇列投遞了task2
  3. task2是以dispatch_sync 方式投遞的,會阻塞當前執行緒

其實在GCD的死鎖檢測中,並沒有完全覆蓋以上3個條件。因為GCD對於條件2,附加加了條件限制,即task2是在task1的執行執行緒中提交的。而條件2其實是沒有這個限制的,task2可以在和task1不同的執行緒中提交,同樣可以造成死鎖。因此,在這種情況下的死鎖,GCD是檢測不出來的,也就不會crash,僅僅是死鎖。

以下是GCD會檢測出的死鎖以及不會檢測出的死鎖,可以自己體會一下:

  // 序列佇列死鎖crash的例子(在同個執行緒的序列佇列任務執行過程中,再次傳送dispatch_sync 任務到序列佇列,會crash)
  //==============================
    dispatch_queue_t sQ = dispatch_queue_create("st0", 0);
    dispatch_async(sQ, ^{
        NSLog(@"Enter");
        dispatch_sync(sQ, ^{   //  這裡會crash
            NSLog(@"sync task");
        });
    });

   // 序列死鎖的例子(這裡不會crash,線上程A執行序列任務task1的過程中,又線上程B中投遞了一個task2到序列佇列同時使用dispatch_sync等待,死鎖,但GCD不會測出)
    //==============================
    dispatch_queue_t sQ1 = dispatch_queue_create("st01", 0);
    dispatch_async(sQ1, ^{
        NSLog(@"Enter");
        dispatch_sync(dispatch_get_main_queue(), ^{
            dispatch_sync(sQ1, ^{
                NSArray *a = [NSArray new];
                NSLog(@"Enter again %@", a);
            });
        });
        NSLog(@"Done");
    });

在邏輯結構良好的情況下,序列佇列不會發生死鎖,而只是task1,task2依次執行:

    // 序列佇列等待的例子1
    //==============================
    dispatch_queue_t sQ1 = dispatch_queue_create("st01", 0);
    dispatch_async(sQ1, ^{
        NSLog(@"Enter");
        sleep(5);
        NSLog(@"Done");
    });
    dispatch_sync(sQ1, ^{
        NSLog(@"It is my turn");
    });

再來看並行佇列會走的分支

DISPATCH_NOINLINE
static void
_dispatch_sync_invoke_and_complete(dispatch_queue_t dq, void *ctxt,
        dispatch_function_t func)
{
    _dispatch_sync_function_invoke_inline(dq, ctxt, func); // call clinet function
    _dispatch_queue_non_barrier_complete(dq); // 結束
}

DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_sync_function_invoke_inline(dispatch_queue_t dq, void *ctxt,
        dispatch_function_t func)
{
    dispatch_thread_frame_s dtf;
    _dispatch_thread_frame_push(&dtf, dq); // 保護現場
    _dispatch_client_callout(ctxt, func); // 回撥到client
    _dispatch_perfmon_workitem_inc();
    _dispatch_thread_frame_pop(&dtf);
}

可見,並行佇列不會建立執行緒取執行dispatch_sync命令。

dispatch_async

自定義序列佇列的async派發

 dispatch_queue_t sq1 = dispatch_queue_create("sq1", NULL);
    dispatch_async(sq1, ^{
        NSLog(@"Serial aysnc task");
    });

他的呼叫堆疊是:
這裡寫圖片描述

我們來看一下原始碼:

void
dispatch_async(dispatch_queue_t dq, dispatch_block_t work)
{
    dispatch_continuation_t dc = _dispatch_continuation_alloc();
    // 設定標誌位
    uintptr_t dc_flags = DISPATCH_OBJ_CONSUME_BIT;
    // 將work打包成dispatch_continuation_t
    _dispatch_continuation_init(dc, dq, work, 0, 0, dc_flags);
    _dispatch_continuation_async(dq, dc);
}

無論dq是什麼型別的queue,GCD首先會將work打包成dispatch_continuation_t 型別,然後呼叫方法_dispatch_continuation_async

在繼續深入之前,先來看一下work是如何打包的:

static inline void
_dispatch_continuation_init(dispatch_continuation_t dc,
        dispatch_queue_class_t dqu, dispatch_block_t work,
        pthread_priority_t pp, dispatch_block_flags_t flags, uintptr_t dc_flags)
{
    dc->dc_flags = dc_flags | DISPATCH_OBJ_BLOCK_BIT;
    // 將work封裝到dispatch_continuation_t中
    dc->dc_ctxt = _dispatch_Block_copy(work);
    _dispatch_continuation_priority_set(dc, pp, flags);

    if (unlikely(_dispatch_block_has_private_data(work))) {
        // always sets dc_func & dc_voucher
        // may update dc_priority & do_vtable
        return _dispatch_continuation_init_slow(dc, dqu, flags);
    }

    if (dc_flags & DISPATCH_OBJ_CONSUME_BIT) { // 之前的flag 被設定為DISPATCH_OBJ_CONSUME_BIT,因此會走這裡
        dc->dc_func = _dispatch_call_block_and_release; // 這裡是設定dc的功能函式:1. 執行block 2. release block物件
    } else {
        dc->dc_func = _dispatch_Block_invoke(work);
    }
    _dispatch_continuation_voucher_set(dc, dqu, flags);
}

我們重點關注dc->dc_func = _dispatch_call_block_and_release方法,它會在dispatch_continuation_t dc被執行時呼叫:

void
_dispatch_call_block_and_release(void *block)
{
    void (^b)(void) = block;
    b();
    Block_release(b);
}

dc_func的邏輯也很簡單:執行block,釋放block。

看完了work是如何打包成dispatch_continuation_t的,我們回過頭來繼續看_dispatch_continuation_async,它接受兩個引數:work queue以及dispatch_continuation_t 形式的work:

void
_dispatch_continuation_async(dispatch_queue_t dq, dispatch_continuation_t dc)
{
    _dispatch_continuation_async2(dq, dc,
            dc->dc_flags & DISPATCH_OBJ_BARRIER_BIT);
}

static inline void
_dispatch_continuation_async2(dispatch_queue_t dq, dispatch_continuation_t dc,
        bool barrier)
{
    // 如果是用barrier插進來的任務或者是序列佇列,直接將任務加入到佇列
    // #define DISPATCH_QUEUE_USES_REDIRECTION(width) \
    //    ({ uint16_t _width = (width); \
    //    _width > 1 && _width < DISPATCH_QUEUE_WIDTH_POOL; })
    if (fastpath(barrier || !DISPATCH_QUEUE_USES_REDIRECTION(dq->dq_width)))
        return _dispatch_continuation_push(dq, dc); // 入隊
    }
    return _dispatch_async_f2(dq, dc); // 並行佇列走這裡
}

執行到_dispatch_continuation_async2時,就出現了分支:(1)序列(barrier)執行_dispatch_continuation_push (2)並行執行_dispatch_async_f2

我們這裡關注的是自定義序列佇列的分支,因此繼續看_dispatch_continuation_push這一支。

static void
_dispatch_continuation_push(dispatch_queue_t dq, dispatch_continuation_t dc)
{
    dx_push(dq, dc, _dispatch_continuation_override_qos(dq, dc));
}
// dx_push是一個巨集定義:
#define dx_push(x, y, z) dx_vtable(x)->do_push(x, y, z)
#define dx_vtable(x) (&(x)->do_vtable->_os_obj_vtable)

會呼叫dq的do_push方法,可以在init.c中檢視到do_push的定義:

DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_serial, queue,
    .do_type = DISPATCH_QUEUE_SERIAL_TYPE,
    .do_kind = "serial-queue",
    .do_dispose = _dispatch_queue_dispose,
    .do_suspend = _dispatch_queue_suspend,
    .do_resume = _dispatch_queue_resume,
    .do_finalize_activation = _dispatch_queue_finalize_activation,
    .do_push = _dispatch_queue_push,
    .do_invoke = _dispatch_queue_invoke,
    .do_wakeup = _dispatch_queue_wakeup,
    .do_debug = dispatch_queue_debug,
    .do_set_targetq = _dispatch_queue_set_target_queue,
);

檢視_dispatch_queue_push的定義:

void
_dispatch_queue_push(dispatch_queue_t dq, dispatch_object_t dou,
        dispatch_qos_t qos)
{
    _dispatch_queue_push_inline(dq, dou, qos);
}

#define _dispatch_queue_push_inline _dispatch_trace_queue_push_inline

static inline void
_dispatch_trace_queue_push_inline(dispatch_queue_t dq, dispatch_object_t _tail,
        dispatch_qos_t qos)
{
    if (slowpath(DISPATCH_QUEUE_PUSH_ENABLED())) {
        struct dispatch_object_s *dou = _tail._do;
        _dispatch_trace_continuation(dq, dou, DISPATCH_QUEUE_PUSH);
    }

    _dispatch_introspection_queue_push(dq, _tail); // 第一個push似乎是為了監聽dq入隊(enqueue)的訊息
    _dispatch_queue_push_inline(dq, _tail, qos);  // 第二個push才是將dq入隊, 這裡的_tail,實質是_dispatch_continuation_t 型別
}

static inline void
_dispatch_queue_push_inline(dispatch_queue_t dq, dispatch_object_t _tail,
        dispatch_qos_t qos)
{
    struct dispatch_object_s *tail = _tail._do;
    dispatch_wakeup_flags_t flags = 0;
    bool overriding = _dispatch_queue_need_override_retain(dq, qos);
    if (unlikely(_dispatch_queue_push_update_tail(dq, tail))) { // 將tail放入到dq中
        if (!overriding) _dispatch_retain_2(dq->_as_os_obj);
        _dispatch_queue_push_update_head(dq, tail);
        flags = DISPATCH_WAKEUP_CONSUME_2 | DISPATCH_WAKEUP_MAKE_DIRTY;
    } else if (overriding) {
        flags = DISPATCH_WAKEUP_CONSUME_2;
    } else {
        return;
    }
    return dx_wakeup(dq, qos, flags);
}

這裡,會將我們提交的任務,放到dq的隊尾。將任務入隊後,則呼叫dx_wakeup方法喚醒dq:

#define dx_wakeup(x, y, z) dx_vtable(x)->do_wakeup(x, y, z)

同樣,檢視init.c檔案,do_wakeup定義:

void
_dispatch_queue_wakeup(dispatch_queue_t dq, dispatch_qos_t qos,
        dispatch_wakeup_flags_t flags)
{
    dispatch_queue_wakeup_target_t target = DISPATCH_QUEUE_WAKEUP_NONE;

    if (unlikely(flags & DISPATCH_WAKEUP_BARRIER_COMPLETE)) {
        return _dispatch_queue_barrier_complete(dq, qos, flags);
    }
    if (_dispatch_queue_class_probe(dq)) { // 如果dq中有任務,則target = DISPATCH_QUEUE_WAKEUP_TARGET. 當我們第一次進入_dispatch_queue_wakeup時,dq是我們自定義的dq,會進入這裡
        target = DISPATCH_QUEUE_WAKEUP_TARGET;
    }
    return _dispatch_queue_class_wakeup(dq, qos, flags, target);
}

當dq是我們自定義時,因為之前我們已經將任務入隊,因此dq中肯定有任務,因此target 被設定為了 DISPATCH_QUEUE_WAKEUP_TARGET

由於第一次進入時target != NULL, 因此我們刪除無關程式碼:

void
_dispatch_queue_class_wakeup(dispatch_queue_t dq, dispatch_qos_t qos,
        dispatch_wakeup_flags_t flags, dispatch_queue_wakeup_target_t target)
{
    dispatch_assert(target != DISPATCH_QUEUE_WAKEUP_WAIT_FOR_EVENT);

    if (target && !(flags & DISPATCH_WAKEUP_CONSUME_2)) {
        _dispatch_retain_2(dq);
        flags |= DISPATCH_WAKEUP_CONSUME_2;
    }

    // 這裡target 如果dq是root queue大概率為null,否則,target == DISPATCH_QUEUE_WAKEUP_TARGET, 呼叫_dispatch_queue_push_queue,將自定義dq入隊,然後會在呼叫一遍wake up,最終在root queue中執行方法
    if (target) {
        uint64_t old_state, new_state, enqueue = DISPATCH_QUEUE_ENQUEUED;
        qos = _dispatch_queue_override_qos(dq, qos);
        os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, release, {
            new_state = _dq_state_merge_qos(old_state, qos);
            if (likely(!_dq_state_is_suspended(old_state) &&
                    !_dq_state_is_enqueued(old_state) &&
                    (!_dq_state_drain_locked(old_state) ||
                    (enqueue != DISPATCH_QUEUE_ENQUEUED_ON_MGR &&
                    _dq_state_is_base_wlh(old_state))))) {
                new_state |= enqueue;
            }
            if (flags & DISPATCH_WAKEUP_MAKE_DIRTY) {
                new_state |= DISPATCH_QUEUE_DIRTY;
            } else if (new_state == old_state) {
                os_atomic_rmw_loop_give_up(goto done);
            }
        });

        if (likely((old_state ^ new_state) & enqueue)) {
            dispatch_queue_t tq;
            if (target == DISPATCH_QUEUE_WAKEUP_TARGET) {
                os_atomic_thread_fence(dependency);
                tq = os_atomic_load_with_dependency_on2o(dq, do_targetq,
                        (long)new_state);
            }
            dispatch_assert(_dq_state_is_enqueued(new_state));
            return _dispatch_queue_push_queue(tq, dq, new_state); // 將dq push到target queue中,並再次呼叫wake up 方法,tq作為dq傳入
        }
}

第一次進入wake up方法時,GCD會呼叫_dispatch_queue_push_queue 方法將自定義dq入隊到target queue中,即root queue中:

static inline void
_dispatch_queue_push_queue(dispatch_queue_t tq, dispatch_queue_t dq,
        uint64_t dq_state)
{
    return dx_push(tq, dq, _dq_state_max_qos(dq_state));
}

這裡又呼叫了dx_push方法,會將我們自定義的dq加入到target queue中。

那麼,我們的work,會在root queue中什麼時候被執行呢?
我們會看一下呼叫堆疊:
這裡寫圖片描述

其實,root queue中維護了一個執行緒池,當執行緒執行方法時,會呼叫_dispatch_worker_thread3方法。為什麼會呼叫thread3方法?執行緒池是如何建立的?這些問題我們稍後再提,現在,我們先看_dispatch_worker_thread3的實現:

static void
_dispatch_worker_thread3(pthread_priority_t pp)
{
    bool overcommit = pp & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG;
    dispatch_queue_t dq;
    pp &= _PTHREAD_PRIORITY_OVERCOMMIT_FLAG | ~_PTHREAD_PRIORITY_FLAGS_MASK;
    _dispatch_thread_setspecific(dispatch_priority_key, (void *)(uintptr_t)pp);
    dq = _dispatch_get_root_queue(_dispatch_qos_from_pp(pp), overcommit); // 根據thread pripority和是否overcommit,取出root queue陣列中對應的root queue
    return _dispatch_worker_thread4(dq); // 最終會呼叫它
}
static void
_dispatch_worker_thread4(void *context)
{
    dispatch_queue_t dq = context;
    dispatch_root_queue_context_t qc = dq->do_ctxt;

    _dispatch_introspection_thread_add();
    int pending = os_atomic_dec2o(qc, dgq_pending, relaxed);
    dispatch_assert(pending >= 0);
    _dispatch_root_queue_drain(dq, _dispatch_get_priority()); // 將root queue的所有任務都drain(傾倒),並執行
    _dispatch_voucher_debug("root queue clear", NULL);
    _dispatch_reset_voucher(NULL, DISPATCH_THREAD_PARK);
}

我們來看root queue是如何被‘傾倒’的:

static void
_dispatch_root_queue_drain(dispatch_queue_t dq, pthread_priority_t pp)
{
#if DISPATCH_DEBUG
    dispatch_queue_t cq;
    if (slowpath(cq = _dispatch_queue_get_current())) {
        DISPATCH_INTERNAL_CRASH(cq, "Premature thread recycling");
    }
#endif
    _dispatch_queue_set_current(dq); // 設定dispatch thread的當前queue是dq
    dispatch_priority_t pri = dq->dq_priority;
    if (!pri) pri = _dispatch_priority_from_pp(pp);
    dispatch_priority_t old_dbp = _dispatch_set_basepri(pri);
    _dispatch_adopt_wlh_anon();

    struct dispatch_object_s *item;
    bool reset = false;
    dispatch_invoke_context_s dic = { };
#if DISPATCH_COCOA_COMPAT
    _dispatch_last_resort_autorelease_pool_push(&dic);
#endif // DISPATCH_COCOA_COMPAT
    dispatch_invoke_flags_t flags = DISPATCH_INVOKE_WORKER_DRAIN |
            DISPATCH_INVOKE_REDIRECTING_DRAIN;
    _dispatch_queue_drain_init_narrowing_check_deadline(&dic, pri);
    _dispatch_perfmon_start();
    while ((item = fastpath(_dispatch_root_queue_drain_one(dq)))) { // 拿出queue中的一個item
        if (reset) _dispatch_wqthread_override_reset();
        _dispatch_continuation_pop_inline(item, &dic, flags, dq); // 執行這個item
        reset = _dispatch_reset_basepri_override();
        if (unlikely(_dispatch_queue_drain_should_narrow(&dic))) {
            break;
        }
    }

    // overcommit or not. worker thread
    if (pri & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG) {
        _dispatch_perfmon_end(perfmon_thread_worker_oc);
    } else {
        _dispatch_perfmon_end(perfmon_thread_worker_non_oc);
    }

#if DISPATCH_COCOA_COMPAT
    _dispatch_last_resort_autorelease_pool_pop(&dic);
#endif // DISPATCH_COCOA_COMPAT
    _dispatch_reset_wlh();
    _dispatch_reset_basepri(old_dbp);
    _dispatch_reset_basepri_override();
    _dispatch_queue_set_current(NULL);  // 設定dispatch thread的當前queue是NULL
}

程式碼很多,核心是中間的while迴圈:

while ((item = fastpath(_dispatch_root_queue_drain_one(dq)))) { // 拿出queue中的一個item
        if (reset) _dispatch_wqthread_override_reset();
        _dispatch_continuation_pop_inline(item, &dic, flags, dq); // 執行這個item
        reset = _dispatch_reset_basepri_override();
        if (unlikely(_dispatch_queue_drain_should_narrow(&dic))) {
            break;
        }
}

通過while迴圈,GCD每次從root queue中取出一個queue item,並呼叫_dispatch_continuation_pop_inline 執行它,直到root queue中的item全部清空為止。

我們來看一下queue item是如何被執行的,這裡的queue item,應該是一個dispatch queue:

static inline void
_dispatch_continuation_pop_inline(dispatch_object_t dou,
        dispatch_invoke_context_t dic, dispatch_invoke_flags_t flags,
        dispatch_queue_t dq)
{
    dispatch_pthread_root_queue_observer_hooks_t observer_hooks =
            _dispatch_get_pthread_root_queue_observer_hooks();
    if (observer_hooks) observer_hooks->queue_will_execute(dq);
    _dispatch_trace_continuation_pop(dq, dou);
    flags &= _DISPATCH_INVOKE_PROPAGATE_MASK;
    if (_dispatch_object_has_vtable(dou)) {  // 到這裡,我們提交的任務,才被queue執行。簡直是百轉千回啊!!!!
        dx_invoke(dou._do, dic, flags);
    } else {
        _dispatch_continuation_invoke_inline(dou, DISPATCH_NO_VOUCHER, flags);
    }
    if (observer_hooks) observer_hooks->queue_did_execute(dq);
}

這裡dx_invoke是一個巨集,它會呼叫_dispatch_queue_invoke 方法, 結合呼叫堆疊,其最後會呼叫
_dispatch_queue_serial_drain

OK,上面就是dispatch async序列佇列的執行步驟,總結一下就是:
將work打包成dispatch_continuation_t, 然後將dq入隊到響應的root queue中,root queue中的執行緒池中的執行緒會被喚醒,執行執行緒函式_dispatch_worker_thread3,root queue會被傾倒,執行queue中的任務。

讓我們再看一下 dispatch_async 到並行佇列的情況:

這裡寫圖片描述

回到上面序列佇列和並行佇列分支的地方:

static inline void
_dispatch_continuation_async2(dispatch_queue_t dq, dispatch_continuation_t dc,
        bool barrier)
{
    // 如果是用barrier插進來的任務或者是序列佇列,直接將任務加入到佇列
    // #define DISPATCH_QUEUE_USES_REDIRECTION(width) \
    //    ({ uint16_t _width = (width); \
    //    _width > 1 && _width < DISPATCH_QUEUE_WIDTH_POOL; })
    if (fastpath(barrier || !DISPATCH_QUEUE_USES_REDIRECTION(dq->dq_width)))
        return _dispatch_continuation_push(dq, dc); // 入隊
    }
    return _dispatch_async_f2(dq, dc); // 並行佇列走這裡
}

並行佇列會走_dispatch_async_f2

static void
_dispatch_async_f2(dispatch_queue_t dq, dispatch_continuation_t dc)
{
    // <rdar://problem/24738102&24743140> reserving non barrier width
    // doesn't fail if only the ENQUEUED bit is set (unlike its barrier width
    // equivalent), so we have to check that this thread hasn't enqueued
    // anything ahead of this call or we can break ordering
    if (slowpath(dq->dq_items_tail)) {
        return _dispatch_continuation_push(dq, dc);
    }

    if (slowpath(!_dispatch_queue_try_acquire_async(dq))) {
        return _dispatch_continuation_push(dq, dc);
    }

    // async 重定向,任務的執行由自動定義queue轉入root queue
    return _dispatch_async_f_redirect(dq, dc,
            _dispatch_continuation_override_qos(dq, dc));
}

這裡,會呼叫_dispatch_async_f_redirect,同樣的,會將dq重定向到root queue中。

static void
_dispatch_async_f_redirect(dispatch_queue_t dq,
        dispatch_object_t dou, dispatch_qos_t qos)
{
    if (!slowpath(_dispatch_object_is_redirection(dou))) {
        dou._dc = _dispatch_async_redirect_wrap(dq, dou);
    }

    // 將dq替換為root queue
    dq = dq->do_targetq;

    // 這裡一般不會進入,主要是將dq替換為最終的targetq
    while (slowpath(DISPATCH_QUEUE_USES_REDIRECTION(dq->dq_width))) {
        if (!fastpath(_dispatch_queue_try_acquire_async(dq))) {
            break;
        }
        if (!dou._dc->dc_ctxt) {
            // find first queue in descending target queue order that has
            // an autorelease frequency set, and use that as the frequency for
            // this continuation.
            dou._dc->dc_ctxt = (void *)
                    (uintptr_t)_dispatch_queue_autorelease_frequency(dq);
        }
        dq = dq->do_targetq;
    }

    // 任務入隊,展開巨集定義:
    // #define dx_push(x, y, z) dx_vtable(x)->do_push(x, y, z)
    // #define dx_vtable(x) (&(x)->do_vtable->_os_obj_vtable)
    // 由於此時的x實質上是root queue,可以檢視init.c 中的
    // do_push 實質會呼叫 _dispatch_root_queue_push
    dx_push(dq, dou, qos);
}

這裡又出現了dx_push,檢視init.c,實質會呼叫_dispatch_root_queue_push

void
_dispatch_root_queue_push(dispatch_queue_t rq, dispatch_object_t dou,
        dispatch_qos_t qos)
{
#if DISPATCH_USE_KEVENT_WORKQUEUE
    dispatch_deferred_items_t ddi = _dispatch_deferred_items_get();
    if (unlikely(ddi && ddi->ddi_can_stash)) {
        dispatch_object_t old_dou = ddi->ddi_stashed_dou;
        dispatch_priority_t rq_overcommit;
        rq_overcommit = rq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT;

        if (likely(!old_dou._do || rq_overcommit)) {
            dispatch_queue_t old_rq = ddi->ddi_stashed_rq;
            dispatch_qos_t old_qos = ddi->ddi_stashed_qos;
            ddi->ddi_stashed_rq = rq;
            ddi->ddi_stashed_dou = dou;
            ddi->ddi_stashed_qos = qos;
            _dispatch_debug("deferring item %p, rq %p, qos %d",
                    dou._do, rq, qos);
            if (rq_overcommit) {
                ddi->ddi_can_stash = false;
            }
            if (likely(!old_dou._do)) {
                return;
            }
            // push the previously stashed item
            qos = old_qos;
            rq = old_rq;
            dou = old_dou;
        }
    }
#endif
#if HAVE_PTHREAD_WORKQUEUE_QOS
    if (_dispatch_root_queue_push_needs_override(rq, qos)) { // 判斷root queue的優先順序和 自定義優先順序是否相等,不相等,進入if(一般不相等)
        return _dispatch_root_queue_push_override(rq, dou, qos);
    }
#else
    (void)qos;
#endif
    _dispatch_root_queue_push_inline(rq, dou, dou, 1);
}

會呼叫_dispatch_root_queue_push_override

static void
_dispatch_root_queue_push_override(dispatch_queue_t orig_rq,
        dispatch_object_t dou, dispatch_qos_t qos)
{
    bool overcommit = orig_rq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
    dispatch_queue_t rq = _dispatch_get_root_queue(qos, overcommit); // 根據優先順序,獲取root queue
    dispatch_continuation_t dc = dou._dc;

    if (_dispatch_object_is_redirection(dc)) {
        // no double-wrap is needed, _dispatch_async_redirect_invoke will do
        // the right thing
        dc->dc_func = (void *)orig_rq;
    } else {
        dc = _dispatch_continuation_alloc();
        dc->do_vtable = DC_VTABLE(OVERRIDE_OWNING);
        // fake that we queued `dou` on `orig_rq` for introspection purposes
        _dispatch_trace_continuation_push(orig_rq, dou);
        dc->dc_ctxt = dc;
        dc->dc_other = orig_rq;
        dc->dc_data = dou._do;
        dc->dc_priority = DISPATCH_NO_PRIORITY;
        dc->dc_voucher = DISPATCH_NO_VOUCHER;
    }
    _dispatch_root_queue_push_inline(rq, dc, dc, 1); // 又會呼叫這裡
}
static inline void
_dispatch_root_queue_push_inline(dispatch_queue_t dq, dispatch_object_t _head,
        dispatch_object_t _tail, int n)
{
    struct dispatch_object_s *head = _head._do, *tail = _tail._do;
    // 當queue為空,且需要設定header時,會進入到這裡。這裡應該是第一次使用root queue的時候會進入一次
    if (unlikely(_dispatch_queue_push_update_tail_list(dq, head, tail))) { // 嘗試更新dq的list,如果是第一次,list 為空,返回false
        _dispatch_queue_push_update_head(dq, head); // 設定queue 頭
        return _dispatch_global_queue_poke(dq, n, 0); // 這裡啟用root queue,這裡的n是入隊dq個數,是1
    }
}

上面的判斷很重要:

if (unlikely(_dispatch_queue_push_update_tail_list(dq, head, tail))) { // 嘗試更新dq的list,如果是第一次,list 為空,返回false
        _dispatch_queue_push_update_head(dq, head); // 設定queue 頭
        return _dispatch_global_queue_poke(dq, n, 0); // 這裡啟用root queue,這裡的n是入隊dq個數,是1
    }

在執行新的任務時,GCD會嘗試更新root queue的任務列表。如果是第一次向root queue投遞任務,則此時的任務列表是空,更新任務列表失敗,則會進入_dispatch_global_queue_poke 來啟用root queue:

void
_dispatch_global_queue_poke(dispatch_queue_t dq, int n, int floor)
{
    if (!_dispatch_queue_class_probe(dq)) {  // 如果還有要執行的,直接返回
        return;
    }
#if DISPATCH_USE_WORKQUEUES
    dispatch_root_queue_context_t qc = dq->do_ctxt;
    if (
#if DISPATCH_USE_PTHREAD_POOL
            (qc->dgq_kworkqueue != (void*)(~0ul)) &&
#endif
            !os_atomic_cmpxchg2o(qc, dgq_pending, 0, n, relaxed)) {
        _dispatch_root_queue_debug("worker thread request still pending for "
                "global queue: %p", dq);
        return;
    }
#endif // DISPATCH_USE_WORKQUEUES
    return _dispatch_global_queue_poke_slow(dq, n, floor);
}

poke 會進入第二階段 _dispatch_global_queue_poke_slow

static void
_dispatch_global_queue_poke_slow(dispatch_queue_t dq, int n, int floor)
{
    dispatch_root_queue_context_t qc = dq->do_ctxt;
    int remaining = n;  // remaining 表示要執行的任務數量 1
    int r = ENOSYS;

    // step1. 先初始化root queues 包括初始化XUN 的workqueue
    _dispatch_root_queues_init();
    _dispatch_debug_root_queue(dq, __func__);
#if DISPATCH_USE_WORKQUEUES
#if DISPATCH_USE_PTHREAD_POOL
    if (qc->dgq_kworkqueue != (void*)(~0ul))
#endif
    {
        _dispatch_root_queue_debug("requesting new worker thread for global "
                "queue: %p", dq);
#if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
        if (qc->dgq_kworkqueue) {
            pthread_workitem_handle_t wh;
            unsigned int gen_cnt;
            do {
                // 呼叫XUN核心的workqueue函式,來維護GCD層的 pthread pool
                r = pthread_workqueue_additem_np(qc->dgq_kworkqueue,
                        _dispatch_worker_thread4, dq, &wh, &gen_cnt);
                (void)dispatch_assume_zero(r);
            } while (--remaining);
            return;
        }
#endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
#if HAVE_PTHREAD_WORKQUEUE_QOS
        r = _pthread_workqueue_addthreads(remaining,
                _dispatch_priority_to_pp(dq->dq_priority));
#elif DISPATCH_USE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
        r = pthread_workqueue_addthreads_np(qc->dgq_wq_priority,
                qc->dgq_wq_options, remaining);
#endif
        (void)dispatch_assume_zero(r);
        return;
    }
#endif // DISPATCH_USE_WORKQUEUES
#if DISPATCH_USE_PTHREAD_POOL
    dispatch_pthread_root_queue_context_t pqc = qc->dgq_ctxt;
    if (fastpath(pqc->dpq_thread_mediator.do_vtable)) {
        while (dispatch_semaphore_signal(&pqc->dpq_thread_mediator)) { // step2. 喚醒執行緒,來做事情
            _dispatch_root_queue_debug("signaled sleeping worker for "
                    "global queue: %p", dq);
            if (!--remaining) { // 如果沒有要處理的dq了,返回
                return;
            }
        }
    }
}

說實話,這裡看的不是很清楚,重點是step1, 初始化root queue的方法:_dispatch_root_queues_init

void
_dispatch_root_queues_init(void)
{
    // 這裡用了dispatch_once_f, 僅會執行一次
    static dispatch_once_t _dispatch_root_queues_pred;
    dispatch_once_f(&_dispatch_root_queues_pred, NULL,
            _dispatch_root_queues_init_once);
}
static void
_dispatch_root_queues_init_once(void *context DISPATCH_UNUSED)
{
    int wq_supported;
    _dispatch_fork_becomes_unsafe();
    if (!_dispatch_root_queues_init_workq(&wq_supported)) {
#if DISPATCH_ENABLE_THREAD_POOL
        size_t i;
        for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) {
            bool overcommit = true;
#if TARGET_OS_EMBEDDED || (DISPATCH_USE_INTERNAL_WORKQUEUE && HAVE_DISPATCH_WORKQ_MONITORING)
            // some software hangs if the non-overcommitting queues do not
            // overcommit when threads block. Someday, this behavior should
            // apply to all platforms
            if (!(i & 1)) {
                overcommit = false;
            }
#endif
            _dispatch_root_queue_init_pthread_pool(
                    &_dispatch_root_queue_contexts[i], 0, overcommit);
        }
#else
        DISPATCH_INTERNAL_CRASH((errno << 16) | wq_supported,
                "Root queue initialization failed");
#endif // DISPATCH_ENABLE_THREAD_POOL
    }
}

這個初始化函式有兩個分支,GCD首先會呼叫_dispatch_root_queues_init_workq 來初始化root queues,如果不成功,才使用_dispatch_root_queue_init_pthread_pool 。 這裡可以看出,GCD是優先使用XUN核心提供的workqueue,而非使用使用者層的執行緒池。 我們這裡重點關注GCD使用workqueue的情況:

static inline bool
_dispatch_root_queues_init_workq(int *wq_supported)
{
    int r; (void)r;
    bool result = false;
    *wq_supported = 0;
#if DISPATCH_USE_WORKQUEUES
    bool disable_wq = false; (void)disable_wq;
#if DISPATCH_ENABLE_THREAD_POOL && DISPATCH_DEBUG
    disable_wq = slowpath(getenv(
            
           

相關推薦

GCD原始碼吐血分析(2)——dispatch_async/dispatch_sync

上一章中,我們知道了獲取GCD queue的底層實現。獲取到queue後,就需要將任務提交到queue中進行處理。 我們有兩種方式提交任務: dispatch_async和dispatch_sync。一個是非同步不等待任務完成就返回,另一個是同步任務,需要等待任務完成。這兩種提交任務

GCD原始碼吐血分析(1)——GCD Queue

看了快半個月的GCD原始碼,只能說太變態了。 先來吐槽一下:一個函式,呼叫棧都是十幾層…… 為了效率,程式碼使用了純C語言,但是為了模擬面向物件中的繼承,虛擬函式等,定義了一層層的巨集定義,看一個struct的定義要繞過來繞過去…… 網上的資料極少,有的那幾篇,還都是用舊版本的GCD在

完整詳解 swift GCD系列(一)dispatch_async;dispatch_sync;dispatch_async_f;dispatch_sync_f

為什麼要寫這個系列,因為百度了一下,找了很多都是些片面的Blog,拷貝來拷貝去的,寫的也很粗糙。 所以,我要寫這個系列,儘量把官網文件中GCD的強大功能完整的表達出來。方便自己,也方便別人,如果發現有問題,歡迎提出 本教程的計劃:在完整的看過GCD的官方文件之後,我實在想

Netty Pipeline原始碼分析(2)

原文連結:wangwei.one/posts/netty… 前面 ,我們分析了Netty Pipeline的初始化及節點新增與刪除邏輯。接下來,我們將來分析Pipeline的事件傳播機制。 Netty版本:4.1.30 inBound事件傳播 示例 我們通過下面這個例子來演示Ne

lucene原始碼分析(2)讀取過程例項

1.官方提供的程式碼demo Analyzer analyzer = new StandardAnalyzer(); // Store the index in memory: Directory directory = new RAMDirec

x264裡的2pass指的是什麼意思 x264原始碼分析2 encode

分享一下我老師大神的人工智慧教程!零基礎,通俗易懂!http://blog.csdn.net/jiangjunshow 也歡迎大家轉載本篇文章。分享知識,造福人民,實現我們中華民族偉大復興!        

Shiro原始碼分析(2) - 會話管理器(SessionManager)

本文在於分析Shiro原始碼,對於新學習的朋友可以參考 [開濤部落格](http://jinnianshilongnian.iteye.com/blog/2018398)進行學習。 本文對Shiro中的SessionManager進行分析,SessionMan

2.uboot和系統移植-第6部分-2.6.uboot原始碼分析2-啟動第二階段》

《2.uboot和系統移植-第6部分-2.6.uboot原始碼分析2-啟動第二階段》 第一部分、章節目錄 2.6.1.start_armboot函式簡介 2.6.2.start_armboot解析1 2.6.3.記憶體使用排布 2.6.4.start_armboot解析2 2.6.5.s

am335x 核心原始碼分析2 LCD移植

1、/arch/arm/mach-omap2/board-am335xevm.c/lcdc_init(){得到LCD硬體引數struct da8xx_lcdc_platform_data} -> am33xx_register_lcdc() -> omap_device_

MyBatis原始碼分析-2-基礎支援層-反射模組-TypeParameterResolver/ObjectFactory

TypeParameterResolver: TypeParameterResolver的功能是:當存在複雜的繼承關係以及泛型定義時, TypeParameterResolver 可以幫助我們解析欄位、方法引數或方法返回值的型別。TypeParameterResolver 是在Refelctor

Spark2.2.2原始碼解析: 2.啟動master節點流程分析

本文主要說明在啟動master節點的時候,程式碼的流程走向。   授予檔案執行許可權 chmod755  兩個目錄裡的檔案: /workspace/spark-2.2.2/bin  --所有檔案 /workspace/spark-2.2.2/sb

muduo原始碼分析(2) --記憶體分配

寫在前面: ​ 這個原始碼是分析libevent-2.0.20-stable, 並非最新版本的libevent,作者並沒有全看原始碼,在這裡會推薦以下參考的一些網站,也歡迎大家在不足的地方提出來進行討論。 什麼都沒包裝的記憶體管理 ​ 預設情況下,l

Minix3原始碼分析(2)——系統初始化

minix3的啟動牽扯到幾次控制權轉移,它們發生在mpx386.s中的組合語言例程和start.c及main.c中的C語言例程之間。 彙編程式碼需要做許多工作,包括建立一個 棧幀 以便為C編譯器編譯的程式碼提供適當的環境,複製處理器所使用的表格來定義儲存器段,建

Spring5原始碼分析系列(四)Spring5原始碼分析2

本文緊接上文Spring5原始碼分析1,講解基於XML的依賴注入,文章參考自Tom老師視訊,下一篇文章將介紹基於Annotation的依賴注入。 基於XML的依賴注入 1、依賴注入發生的時間 當SpringIOC容器完成了Bean定義資源的定位、載入和解析註冊以後,IO

JDK1.8ArrayList原始碼分析2

E get(int index) 因為ArrayList是採用陣列結構來儲存的,所以它的get方法非常簡單,先是判斷一下有沒有越界,之後就可以直接通過陣列下標來獲取元素了,所以get的時間複雜度是O(1)。 /** * Returns the

谷歌瀏覽器的原始碼分析 2

這麼大的工程,我從哪裡開始呢?我認為從介面開始,這樣才可以快速地深入研究。下面就可以先嚐試修改一個chrome的關於對話方塊,上一次看到它是英語的,那麼我就來把它改成中文的吧,這樣有目標了。從chrome的工程裡可以看到它是支援多種語言的,在Windows平臺上支援多語言的標準做法,就是寫多個語言的DL

3.24 vchain原始碼分析2

接下來是合約的第二部分,直接上程式碼,註釋都在程式碼中 // Contract to sell and distribute VEN tokens // 分發VEN 代幣 contract VENSale is Owned{ /// chart of stage t

x264裡的2pass指的是什麼意思 x264原始碼分析2 encode

                A:x264裡的2pass指的是什麼意思?另外stat是什麼意思, 比如有個引數--stats <string>        Filename for 2 pass stats [/"%s/"]/n", defaults->rc.psz_stat_out )

新人分享——hadoop原始碼分析2

這裡配置好了這個類,獲取outputFormat -之後就是會去讀取切片的元資料資訊,然後獲取reduce數量,之後會設定作業進度,然後獲取可執行的maptask 執行maptask 在這裡跑任務,並且監控 我們來看maptaskRunable的run方法 這裡設定了一堆屬性在裡面 這裡map.r

Fabric 1.0原始碼分析(2) blockfile(區塊檔案儲存)

Fabric 1.0原始碼筆記 之 blockfile(區塊檔案儲存) 1、blockfile概述 blockfile,即Fabric區塊鏈區塊檔案儲存,預設目錄/var/hyperledger/production/ledgersData/chains,含in