GCD原始碼吐血分析(2)——dispatch_async/dispatch_sync
上一章中,我們知道了獲取GCD queue的底層實現。獲取到queue後,就需要將任務提交到queue中進行處理。
我們有兩種方式提交任務:
dispatch_async
和dispatch_sync
。一個是非同步不等待任務完成就返回,另一個是同步任務,需要等待任務完成。這兩種提交任務的方式有所不同:
dispatch_async
:底層運用了執行緒池,會在和當前執行緒不同的執行緒上處理任務。
dispatch_sync
:一般不會新開啟執行緒,而是在當前執行緒執行任務(比較特殊的是main queue,它會利用main runloop 將任務提交到主執行緒來執行),同時,它會阻塞當前執行緒,等待提交的任務執行完畢。當target queue是併發執行緒時,會直接執行任務。而target queue是序列佇列時,會檢測當前執行緒是否已經擁有了該序列佇列,如果答案是肯定的,則會觸發crash,這與老版本GCD中會觸發死鎖不同,因為在新版GCD中,已經加入了這種死鎖檢測機制,從而觸發crash,避免了除錯困難的死鎖的發生。
如下圖所示,當我們在同一執行緒中的序列佇列任務執行期間,再次向該佇列提交任務時,會引發crash。
但是,目前這種檢測死鎖的機制也不是完美的,我們仍可以繞過crash來引發死鎖(是不是沒事找事兒?),具體可見下面的原始碼分析。
dispatch_sync
void
dispatch_sync(dispatch_queue_t dq, dispatch_block_t work)
{
dispatch_sync_f(dq, work, _dispatch_Block_invoke(work));
}
void
dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
{
// 序列佇列走這裡
return dispatch_barrier_sync_f(dq, ctxt, func);
}
// 並行佇列走這裡
_dispatch_sync_invoke_and_complete(dq, ctxt, func);
}
先來看一下序列佇列會執行的dispatch_barrier_sync_f
:
DISPATCH_NOINLINE
void
dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func)
{
dispatch_tid tid = _dispatch_tid_self(); // 獲取當前thread id
if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(dq, tid))) { // 當前執行緒嘗試繫結獲取序列佇列的lock
return _dispatch_sync_f_slow(dq, ctxt, func, DISPATCH_OBJ_BARRIER_BIT); // 執行緒獲取不到queue的lock,則序列入隊等待,當前執行緒阻塞
}
// 不需要等待,則走這裡
_dispatch_queue_barrier_sync_invoke_and_complete(dq, ctxt, func);
}
我們重點看一下執行緒是如何嘗試獲取序列佇列lock的,這很重要,這一步是後面的死鎖檢測的基礎:
DISPATCH_ALWAYS_INLINE DISPATCH_WARN_RESULT
static inline bool
_dispatch_queue_try_acquire_barrier_sync(dispatch_queue_t dq, uint32_t tid)
{
uint64_t init = DISPATCH_QUEUE_STATE_INIT_VALUE(dq->dq_width);
uint64_t value = DISPATCH_QUEUE_WIDTH_FULL_BIT | DISPATCH_QUEUE_IN_BARRIER |
_dispatch_lock_value_from_tid(tid); // _dispatch_lock_value_from_tid 會去取tid二進位制數的2到31位 作為值(從0位算起)
uint64_t old_state, new_state;
// 這裡面有一堆巨集定義的原子操作,事實是
// 嘗試將new_state賦值給dq.dq_state。 首先會用原子操作(atomic_load_explicit)取當前dq_state的值,作為old_state。如果old_state 不是dq_state的預設值(init | role), 則賦值失敗,返回false(這說明之前已經有人更改過dq_state,在序列佇列中,一次僅允許一個人更改dq_state), 獲取lock失敗。否則dq_state賦值為new_state(利用原子操作atomic_compare_exchange_weak_explicit 做賦值), 返回true,獲取lock成功。
return os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, acquire, {
uint64_t role = old_state & DISPATCH_QUEUE_ROLE_MASK;
if (old_state != (init | role)) { // 如果dq_state已經被修改過,則直接返回false,不更新dq_state為new_state
os_atomic_rmw_loop_give_up(break);
}
new_state = value | role;
});
}
上面程式碼會去取dispatch_queue_t
中的dq_state
值。當這個dq_state沒有被別人修改過,即第一次被修改時,會將dq_state
設定為new_state, 並返回true。此時,在new_state中標記了當前的queue被lock,同時記錄了lock 當前queue的執行緒tid。
如果dq_state
已經被修改過了, 則函式返回false,同時,保持dq_state
值不變。
看過了_dispatch_queue_try_acquire_barrier_sync
的內部實現,我們再回到上一級dispatch_barrier_sync_f
中:
DISPATCH_NOINLINE
void
dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func)
{
dispatch_tid tid = _dispatch_tid_self(); // 獲取當前thread id
if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(dq, tid))) { // 當前執行緒嘗試繫結獲取序列佇列的lock
return _dispatch_sync_f_slow(dq, ctxt, func, DISPATCH_OBJ_BARRIER_BIT); // 執行緒獲取不到queue的lock,則序列入隊等待,當前執行緒阻塞
}
...
}
如果_dispatch_queue_try_acquire_barrier_sync
返回了false,則會進入到_dispatch_sync_f_slow
中,在這裡會嘗試等待序列佇列中上一個任務執行完畢:
DISPATCH_NOINLINE
static void
_dispatch_sync_f_slow(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func, uintptr_t dc_flags)
{
if (unlikely(!dq->do_targetq)) { // 如果dq沒有target queue,走這裡。這種情況多半不會發生,因為所有自定義建立的queue都有target queue是root queue之一
return _dispatch_sync_function_invoke(dq, ctxt, func);
}
// 多數會走這裡
_dispatch_sync_wait(dq, ctxt, func, dc_flags, dq, dc_flags);
}
我們來看一下_dispatch_sync_wait
的實現的超級簡略版:
DISPATCH_NOINLINE
static void
_dispatch_sync_wait(dispatch_queue_t top_dq, void *ctxt,
dispatch_function_t func, uintptr_t top_dc_flags,
dispatch_queue_t dq, uintptr_t dc_flags)
{
pthread_priority_t pp = _dispatch_get_priority();
dispatch_tid tid = _dispatch_tid_self();
dispatch_qos_t qos;
uint64_t dq_state;
// Step 1. 檢測是否會發生死鎖,若會發生死鎖,則直接crash
dq_state = _dispatch_sync_wait_prepare(dq);
// 如果當前的執行緒已經擁有目標queue,這時候在呼叫_dispatch_sync_wait,則會觸發crash
// 這裡的判斷邏輯是lock的woner是否是tid(這裡因為在dq_state的lock裡面加入了tid的值,所有能夠自動識別出死鎖的情況:同一個序列佇列被同一個執行緒做兩次lock)
if (unlikely(_dq_state_drain_locked_by(dq_state, tid))) {
DISPATCH_CLIENT_CRASH((uintptr_t)dq_state,
"dispatch_sync called on queue "
"already owned by current thread");
}
// Step2. _dispatch_queue_push_sync_waiter(dq, &dsc, qos); // 將要執行的任務入隊
// Step3. 等待前面的task執行完畢
if (dsc.dc_data == DISPATCH_WLH_ANON) {
// 等待執行緒事件,等待完成(進入dispach_sync的模式)
_dispatch_thread_event_wait(&dsc.dsc_event); // 訊號量等待
_dispatch_thread_event_destroy(&dsc.dsc_event); // 等待結束 銷燬thread event
// If _dispatch_sync_waiter_wake() gave this thread an override,
// ensure that the root queue sees it.
if (dsc.dsc_override_qos > dsc.dsc_override_qos_floor) {
_dispatch_set_basepri_override_qos(dsc.dsc_override_qos);
}
} else {
_dispatch_event_loop_wait_for_ownership(&dsc);
}
// Step4.
// 等待結束,執行client程式碼
_dispatch_sync_invoke_and_complete_recurse(top_dq, ctxt, func,top_dc_flags);
}
這裡我們重點看一下,GCD在Step1中是如何檢測死鎖的。其最終會呼叫函式
DISPATCH_ALWAYS_INLINE
static inline bool
_dispatch_lock_is_locked_by(dispatch_lock lock_value, dispatch_tid tid)
{
// equivalent to _dispatch_lock_owner(lock_value) == tid
return ((lock_value ^ tid) & DLOCK_OWNER_MASK) == 0;
}
lock_value
就是dq_state
,一個32位的整數。通過判斷((lock_value ^ tid) & DLOCK_OWNER_MASK)
是否為0,來判斷當前的序列佇列是否已被同一個執行緒所獲取。如果當前佇列已經被當前執行緒獲取,即當前執行緒在執行一個序列任務中,如果此時我們在阻塞等待一個新的序列任務,則會發生死鎖。因此,在新版GCD中,當((lock_value ^ tid) & DLOCK_OWNER_MASK) == 0
時,就會主動觸發crash來避免死鎖。
序列佇列死鎖的情況是:
執行緒A在序列佇列dq中執行序列任務task1的過程中,如果再向dq中投遞序列任務task2,同時還要求必須阻塞當前執行緒,來等待task2結束(呼叫dispatch_sync投遞task2),那麼這時候會發生死鎖。
因為這時候task1還沒有結束,序列佇列不會取執行task2,而我們又要在當前執行緒等待task2的結束才肯繼續執行task1,即task1在等task2,而task2也在等task1,迴圈等待,形成死鎖。
總結一下,GCD會發生死鎖的情況必須同時滿足3個條件,才會形成task1和task2相互等待的情況:
- 序列佇列正在執行task1
- 在task1中又向序列佇列投遞了task2
- task2是以
dispatch_sync
方式投遞的,會阻塞當前執行緒
其實在GCD的死鎖檢測中,並沒有完全覆蓋以上3個條件。因為GCD對於條件2,附加加了條件限制,即task2是在task1的執行執行緒中提交的
。而條件2其實是沒有這個限制的,task2可以在和task1不同的執行緒中提交,同樣可以造成死鎖。因此,在這種情況下的死鎖,GCD是檢測不出來的,也就不會crash,僅僅是死鎖。
以下是GCD會檢測出的死鎖以及不會檢測出的死鎖,可以自己體會一下:
// 序列佇列死鎖crash的例子(在同個執行緒的序列佇列任務執行過程中,再次傳送dispatch_sync 任務到序列佇列,會crash)
//==============================
dispatch_queue_t sQ = dispatch_queue_create("st0", 0);
dispatch_async(sQ, ^{
NSLog(@"Enter");
dispatch_sync(sQ, ^{ // 這裡會crash
NSLog(@"sync task");
});
});
// 序列死鎖的例子(這裡不會crash,線上程A執行序列任務task1的過程中,又線上程B中投遞了一個task2到序列佇列同時使用dispatch_sync等待,死鎖,但GCD不會測出)
//==============================
dispatch_queue_t sQ1 = dispatch_queue_create("st01", 0);
dispatch_async(sQ1, ^{
NSLog(@"Enter");
dispatch_sync(dispatch_get_main_queue(), ^{
dispatch_sync(sQ1, ^{
NSArray *a = [NSArray new];
NSLog(@"Enter again %@", a);
});
});
NSLog(@"Done");
});
在邏輯結構良好的情況下,序列佇列不會發生死鎖,而只是task1,task2依次執行:
// 序列佇列等待的例子1
//==============================
dispatch_queue_t sQ1 = dispatch_queue_create("st01", 0);
dispatch_async(sQ1, ^{
NSLog(@"Enter");
sleep(5);
NSLog(@"Done");
});
dispatch_sync(sQ1, ^{
NSLog(@"It is my turn");
});
再來看並行佇列會走的分支:
DISPATCH_NOINLINE
static void
_dispatch_sync_invoke_and_complete(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func)
{
_dispatch_sync_function_invoke_inline(dq, ctxt, func); // call clinet function
_dispatch_queue_non_barrier_complete(dq); // 結束
}
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_sync_function_invoke_inline(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func)
{
dispatch_thread_frame_s dtf;
_dispatch_thread_frame_push(&dtf, dq); // 保護現場
_dispatch_client_callout(ctxt, func); // 回撥到client
_dispatch_perfmon_workitem_inc();
_dispatch_thread_frame_pop(&dtf);
}
可見,並行佇列不會建立執行緒取執行dispatch_sync命令。
dispatch_async
自定義序列佇列的async派發
dispatch_queue_t sq1 = dispatch_queue_create("sq1", NULL);
dispatch_async(sq1, ^{
NSLog(@"Serial aysnc task");
});
他的呼叫堆疊是:
我們來看一下原始碼:
void
dispatch_async(dispatch_queue_t dq, dispatch_block_t work)
{
dispatch_continuation_t dc = _dispatch_continuation_alloc();
// 設定標誌位
uintptr_t dc_flags = DISPATCH_OBJ_CONSUME_BIT;
// 將work打包成dispatch_continuation_t
_dispatch_continuation_init(dc, dq, work, 0, 0, dc_flags);
_dispatch_continuation_async(dq, dc);
}
無論dq是什麼型別的queue,GCD首先會將work打包成dispatch_continuation_t
型別,然後呼叫方法_dispatch_continuation_async
。
在繼續深入之前,先來看一下work是如何打包的:
static inline void
_dispatch_continuation_init(dispatch_continuation_t dc,
dispatch_queue_class_t dqu, dispatch_block_t work,
pthread_priority_t pp, dispatch_block_flags_t flags, uintptr_t dc_flags)
{
dc->dc_flags = dc_flags | DISPATCH_OBJ_BLOCK_BIT;
// 將work封裝到dispatch_continuation_t中
dc->dc_ctxt = _dispatch_Block_copy(work);
_dispatch_continuation_priority_set(dc, pp, flags);
if (unlikely(_dispatch_block_has_private_data(work))) {
// always sets dc_func & dc_voucher
// may update dc_priority & do_vtable
return _dispatch_continuation_init_slow(dc, dqu, flags);
}
if (dc_flags & DISPATCH_OBJ_CONSUME_BIT) { // 之前的flag 被設定為DISPATCH_OBJ_CONSUME_BIT,因此會走這裡
dc->dc_func = _dispatch_call_block_and_release; // 這裡是設定dc的功能函式:1. 執行block 2. release block物件
} else {
dc->dc_func = _dispatch_Block_invoke(work);
}
_dispatch_continuation_voucher_set(dc, dqu, flags);
}
我們重點關注dc->dc_func = _dispatch_call_block_and_release
方法,它會在dispatch_continuation_t dc
被執行時呼叫:
void
_dispatch_call_block_and_release(void *block)
{
void (^b)(void) = block;
b();
Block_release(b);
}
dc_func
的邏輯也很簡單:執行block,釋放block。
看完了work是如何打包成dispatch_continuation_t
的,我們回過頭來繼續看_dispatch_continuation_async
,它接受兩個引數:work queue以及dispatch_continuation_t 形式的work:
void
_dispatch_continuation_async(dispatch_queue_t dq, dispatch_continuation_t dc)
{
_dispatch_continuation_async2(dq, dc,
dc->dc_flags & DISPATCH_OBJ_BARRIER_BIT);
}
static inline void
_dispatch_continuation_async2(dispatch_queue_t dq, dispatch_continuation_t dc,
bool barrier)
{
// 如果是用barrier插進來的任務或者是序列佇列,直接將任務加入到佇列
// #define DISPATCH_QUEUE_USES_REDIRECTION(width) \
// ({ uint16_t _width = (width); \
// _width > 1 && _width < DISPATCH_QUEUE_WIDTH_POOL; })
if (fastpath(barrier || !DISPATCH_QUEUE_USES_REDIRECTION(dq->dq_width)))
return _dispatch_continuation_push(dq, dc); // 入隊
}
return _dispatch_async_f2(dq, dc); // 並行佇列走這裡
}
執行到_dispatch_continuation_async2
時,就出現了分支:(1)序列(barrier)執行_dispatch_continuation_push (2)並行執行_dispatch_async_f2
我們這裡關注的是自定義序列佇列的分支,因此繼續看_dispatch_continuation_push
這一支。
static void
_dispatch_continuation_push(dispatch_queue_t dq, dispatch_continuation_t dc)
{
dx_push(dq, dc, _dispatch_continuation_override_qos(dq, dc));
}
// dx_push是一個巨集定義:
#define dx_push(x, y, z) dx_vtable(x)->do_push(x, y, z)
#define dx_vtable(x) (&(x)->do_vtable->_os_obj_vtable)
會呼叫dq的do_push
方法,可以在init.c
中檢視到do_push
的定義:
DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_serial, queue,
.do_type = DISPATCH_QUEUE_SERIAL_TYPE,
.do_kind = "serial-queue",
.do_dispose = _dispatch_queue_dispose,
.do_suspend = _dispatch_queue_suspend,
.do_resume = _dispatch_queue_resume,
.do_finalize_activation = _dispatch_queue_finalize_activation,
.do_push = _dispatch_queue_push,
.do_invoke = _dispatch_queue_invoke,
.do_wakeup = _dispatch_queue_wakeup,
.do_debug = dispatch_queue_debug,
.do_set_targetq = _dispatch_queue_set_target_queue,
);
檢視_dispatch_queue_push
的定義:
void
_dispatch_queue_push(dispatch_queue_t dq, dispatch_object_t dou,
dispatch_qos_t qos)
{
_dispatch_queue_push_inline(dq, dou, qos);
}
#define _dispatch_queue_push_inline _dispatch_trace_queue_push_inline
static inline void
_dispatch_trace_queue_push_inline(dispatch_queue_t dq, dispatch_object_t _tail,
dispatch_qos_t qos)
{
if (slowpath(DISPATCH_QUEUE_PUSH_ENABLED())) {
struct dispatch_object_s *dou = _tail._do;
_dispatch_trace_continuation(dq, dou, DISPATCH_QUEUE_PUSH);
}
_dispatch_introspection_queue_push(dq, _tail); // 第一個push似乎是為了監聽dq入隊(enqueue)的訊息
_dispatch_queue_push_inline(dq, _tail, qos); // 第二個push才是將dq入隊, 這裡的_tail,實質是_dispatch_continuation_t 型別
}
static inline void
_dispatch_queue_push_inline(dispatch_queue_t dq, dispatch_object_t _tail,
dispatch_qos_t qos)
{
struct dispatch_object_s *tail = _tail._do;
dispatch_wakeup_flags_t flags = 0;
bool overriding = _dispatch_queue_need_override_retain(dq, qos);
if (unlikely(_dispatch_queue_push_update_tail(dq, tail))) { // 將tail放入到dq中
if (!overriding) _dispatch_retain_2(dq->_as_os_obj);
_dispatch_queue_push_update_head(dq, tail);
flags = DISPATCH_WAKEUP_CONSUME_2 | DISPATCH_WAKEUP_MAKE_DIRTY;
} else if (overriding) {
flags = DISPATCH_WAKEUP_CONSUME_2;
} else {
return;
}
return dx_wakeup(dq, qos, flags);
}
這裡,會將我們提交的任務,放到dq的隊尾。將任務入隊後,則呼叫dx_wakeup
方法喚醒dq:
#define dx_wakeup(x, y, z) dx_vtable(x)->do_wakeup(x, y, z)
同樣,檢視init.c
檔案,do_wakeup
定義:
void
_dispatch_queue_wakeup(dispatch_queue_t dq, dispatch_qos_t qos,
dispatch_wakeup_flags_t flags)
{
dispatch_queue_wakeup_target_t target = DISPATCH_QUEUE_WAKEUP_NONE;
if (unlikely(flags & DISPATCH_WAKEUP_BARRIER_COMPLETE)) {
return _dispatch_queue_barrier_complete(dq, qos, flags);
}
if (_dispatch_queue_class_probe(dq)) { // 如果dq中有任務,則target = DISPATCH_QUEUE_WAKEUP_TARGET. 當我們第一次進入_dispatch_queue_wakeup時,dq是我們自定義的dq,會進入這裡
target = DISPATCH_QUEUE_WAKEUP_TARGET;
}
return _dispatch_queue_class_wakeup(dq, qos, flags, target);
}
當dq是我們自定義時,因為之前我們已經將任務入隊,因此dq中肯定有任務,因此target
被設定為了 DISPATCH_QUEUE_WAKEUP_TARGET
。
由於第一次進入時target != NULL, 因此我們刪除無關程式碼:
void
_dispatch_queue_class_wakeup(dispatch_queue_t dq, dispatch_qos_t qos,
dispatch_wakeup_flags_t flags, dispatch_queue_wakeup_target_t target)
{
dispatch_assert(target != DISPATCH_QUEUE_WAKEUP_WAIT_FOR_EVENT);
if (target && !(flags & DISPATCH_WAKEUP_CONSUME_2)) {
_dispatch_retain_2(dq);
flags |= DISPATCH_WAKEUP_CONSUME_2;
}
// 這裡target 如果dq是root queue大概率為null,否則,target == DISPATCH_QUEUE_WAKEUP_TARGET, 呼叫_dispatch_queue_push_queue,將自定義dq入隊,然後會在呼叫一遍wake up,最終在root queue中執行方法
if (target) {
uint64_t old_state, new_state, enqueue = DISPATCH_QUEUE_ENQUEUED;
qos = _dispatch_queue_override_qos(dq, qos);
os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, release, {
new_state = _dq_state_merge_qos(old_state, qos);
if (likely(!_dq_state_is_suspended(old_state) &&
!_dq_state_is_enqueued(old_state) &&
(!_dq_state_drain_locked(old_state) ||
(enqueue != DISPATCH_QUEUE_ENQUEUED_ON_MGR &&
_dq_state_is_base_wlh(old_state))))) {
new_state |= enqueue;
}
if (flags & DISPATCH_WAKEUP_MAKE_DIRTY) {
new_state |= DISPATCH_QUEUE_DIRTY;
} else if (new_state == old_state) {
os_atomic_rmw_loop_give_up(goto done);
}
});
if (likely((old_state ^ new_state) & enqueue)) {
dispatch_queue_t tq;
if (target == DISPATCH_QUEUE_WAKEUP_TARGET) {
os_atomic_thread_fence(dependency);
tq = os_atomic_load_with_dependency_on2o(dq, do_targetq,
(long)new_state);
}
dispatch_assert(_dq_state_is_enqueued(new_state));
return _dispatch_queue_push_queue(tq, dq, new_state); // 將dq push到target queue中,並再次呼叫wake up 方法,tq作為dq傳入
}
}
第一次進入wake up
方法時,GCD會呼叫_dispatch_queue_push_queue
方法將自定義dq入隊到target queue中,即root queue中:
static inline void
_dispatch_queue_push_queue(dispatch_queue_t tq, dispatch_queue_t dq,
uint64_t dq_state)
{
return dx_push(tq, dq, _dq_state_max_qos(dq_state));
}
這裡又呼叫了dx_push
方法,會將我們自定義的dq加入到target queue中。
那麼,我們的work,會在root queue中什麼時候被執行呢?
我們會看一下呼叫堆疊:
其實,root queue中維護了一個執行緒池,當執行緒執行方法時,會呼叫_dispatch_worker_thread3
方法。為什麼會呼叫thread3方法?執行緒池是如何建立的?這些問題我們稍後再提,現在,我們先看_dispatch_worker_thread3
的實現:
static void
_dispatch_worker_thread3(pthread_priority_t pp)
{
bool overcommit = pp & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG;
dispatch_queue_t dq;
pp &= _PTHREAD_PRIORITY_OVERCOMMIT_FLAG | ~_PTHREAD_PRIORITY_FLAGS_MASK;
_dispatch_thread_setspecific(dispatch_priority_key, (void *)(uintptr_t)pp);
dq = _dispatch_get_root_queue(_dispatch_qos_from_pp(pp), overcommit); // 根據thread pripority和是否overcommit,取出root queue陣列中對應的root queue
return _dispatch_worker_thread4(dq); // 最終會呼叫它
}
static void
_dispatch_worker_thread4(void *context)
{
dispatch_queue_t dq = context;
dispatch_root_queue_context_t qc = dq->do_ctxt;
_dispatch_introspection_thread_add();
int pending = os_atomic_dec2o(qc, dgq_pending, relaxed);
dispatch_assert(pending >= 0);
_dispatch_root_queue_drain(dq, _dispatch_get_priority()); // 將root queue的所有任務都drain(傾倒),並執行
_dispatch_voucher_debug("root queue clear", NULL);
_dispatch_reset_voucher(NULL, DISPATCH_THREAD_PARK);
}
我們來看root queue是如何被‘傾倒
’的:
static void
_dispatch_root_queue_drain(dispatch_queue_t dq, pthread_priority_t pp)
{
#if DISPATCH_DEBUG
dispatch_queue_t cq;
if (slowpath(cq = _dispatch_queue_get_current())) {
DISPATCH_INTERNAL_CRASH(cq, "Premature thread recycling");
}
#endif
_dispatch_queue_set_current(dq); // 設定dispatch thread的當前queue是dq
dispatch_priority_t pri = dq->dq_priority;
if (!pri) pri = _dispatch_priority_from_pp(pp);
dispatch_priority_t old_dbp = _dispatch_set_basepri(pri);
_dispatch_adopt_wlh_anon();
struct dispatch_object_s *item;
bool reset = false;
dispatch_invoke_context_s dic = { };
#if DISPATCH_COCOA_COMPAT
_dispatch_last_resort_autorelease_pool_push(&dic);
#endif // DISPATCH_COCOA_COMPAT
dispatch_invoke_flags_t flags = DISPATCH_INVOKE_WORKER_DRAIN |
DISPATCH_INVOKE_REDIRECTING_DRAIN;
_dispatch_queue_drain_init_narrowing_check_deadline(&dic, pri);
_dispatch_perfmon_start();
while ((item = fastpath(_dispatch_root_queue_drain_one(dq)))) { // 拿出queue中的一個item
if (reset) _dispatch_wqthread_override_reset();
_dispatch_continuation_pop_inline(item, &dic, flags, dq); // 執行這個item
reset = _dispatch_reset_basepri_override();
if (unlikely(_dispatch_queue_drain_should_narrow(&dic))) {
break;
}
}
// overcommit or not. worker thread
if (pri & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG) {
_dispatch_perfmon_end(perfmon_thread_worker_oc);
} else {
_dispatch_perfmon_end(perfmon_thread_worker_non_oc);
}
#if DISPATCH_COCOA_COMPAT
_dispatch_last_resort_autorelease_pool_pop(&dic);
#endif // DISPATCH_COCOA_COMPAT
_dispatch_reset_wlh();
_dispatch_reset_basepri(old_dbp);
_dispatch_reset_basepri_override();
_dispatch_queue_set_current(NULL); // 設定dispatch thread的當前queue是NULL
}
程式碼很多,核心是中間的while迴圈:
while ((item = fastpath(_dispatch_root_queue_drain_one(dq)))) { // 拿出queue中的一個item
if (reset) _dispatch_wqthread_override_reset();
_dispatch_continuation_pop_inline(item, &dic, flags, dq); // 執行這個item
reset = _dispatch_reset_basepri_override();
if (unlikely(_dispatch_queue_drain_should_narrow(&dic))) {
break;
}
}
通過while迴圈,GCD每次從root queue中取出一個queue item,並呼叫_dispatch_continuation_pop_inline
執行它,直到root queue中的item全部清空為止。
我們來看一下queue item是如何被執行的,這裡的queue item,應該是一個dispatch queue:
static inline void
_dispatch_continuation_pop_inline(dispatch_object_t dou,
dispatch_invoke_context_t dic, dispatch_invoke_flags_t flags,
dispatch_queue_t dq)
{
dispatch_pthread_root_queue_observer_hooks_t observer_hooks =
_dispatch_get_pthread_root_queue_observer_hooks();
if (observer_hooks) observer_hooks->queue_will_execute(dq);
_dispatch_trace_continuation_pop(dq, dou);
flags &= _DISPATCH_INVOKE_PROPAGATE_MASK;
if (_dispatch_object_has_vtable(dou)) { // 到這裡,我們提交的任務,才被queue執行。簡直是百轉千回啊!!!!
dx_invoke(dou._do, dic, flags);
} else {
_dispatch_continuation_invoke_inline(dou, DISPATCH_NO_VOUCHER, flags);
}
if (observer_hooks) observer_hooks->queue_did_execute(dq);
}
這裡dx_invoke
是一個巨集,它會呼叫_dispatch_queue_invoke
方法, 結合呼叫堆疊,其最後會呼叫
_dispatch_queue_serial_drain
。
OK,上面就是dispatch async序列佇列的執行步驟,總結一下就是:
將work打包成dispatch_continuation_t
, 然後將dq入隊到響應的root queue中,root queue中的執行緒池中的執行緒會被喚醒,執行執行緒函式_dispatch_worker_thread3
,root queue會被傾倒,執行queue中的任務。
讓我們再看一下 dispatch_async 到並行佇列的情況:
回到上面序列佇列和並行佇列分支的地方:
static inline void
_dispatch_continuation_async2(dispatch_queue_t dq, dispatch_continuation_t dc,
bool barrier)
{
// 如果是用barrier插進來的任務或者是序列佇列,直接將任務加入到佇列
// #define DISPATCH_QUEUE_USES_REDIRECTION(width) \
// ({ uint16_t _width = (width); \
// _width > 1 && _width < DISPATCH_QUEUE_WIDTH_POOL; })
if (fastpath(barrier || !DISPATCH_QUEUE_USES_REDIRECTION(dq->dq_width)))
return _dispatch_continuation_push(dq, dc); // 入隊
}
return _dispatch_async_f2(dq, dc); // 並行佇列走這裡
}
並行佇列會走_dispatch_async_f2
:
static void
_dispatch_async_f2(dispatch_queue_t dq, dispatch_continuation_t dc)
{
// <rdar://problem/24738102&24743140> reserving non barrier width
// doesn't fail if only the ENQUEUED bit is set (unlike its barrier width
// equivalent), so we have to check that this thread hasn't enqueued
// anything ahead of this call or we can break ordering
if (slowpath(dq->dq_items_tail)) {
return _dispatch_continuation_push(dq, dc);
}
if (slowpath(!_dispatch_queue_try_acquire_async(dq))) {
return _dispatch_continuation_push(dq, dc);
}
// async 重定向,任務的執行由自動定義queue轉入root queue
return _dispatch_async_f_redirect(dq, dc,
_dispatch_continuation_override_qos(dq, dc));
}
這裡,會呼叫_dispatch_async_f_redirect
,同樣的,會將dq重定向到root queue
中。
static void
_dispatch_async_f_redirect(dispatch_queue_t dq,
dispatch_object_t dou, dispatch_qos_t qos)
{
if (!slowpath(_dispatch_object_is_redirection(dou))) {
dou._dc = _dispatch_async_redirect_wrap(dq, dou);
}
// 將dq替換為root queue
dq = dq->do_targetq;
// 這裡一般不會進入,主要是將dq替換為最終的targetq
while (slowpath(DISPATCH_QUEUE_USES_REDIRECTION(dq->dq_width))) {
if (!fastpath(_dispatch_queue_try_acquire_async(dq))) {
break;
}
if (!dou._dc->dc_ctxt) {
// find first queue in descending target queue order that has
// an autorelease frequency set, and use that as the frequency for
// this continuation.
dou._dc->dc_ctxt = (void *)
(uintptr_t)_dispatch_queue_autorelease_frequency(dq);
}
dq = dq->do_targetq;
}
// 任務入隊,展開巨集定義:
// #define dx_push(x, y, z) dx_vtable(x)->do_push(x, y, z)
// #define dx_vtable(x) (&(x)->do_vtable->_os_obj_vtable)
// 由於此時的x實質上是root queue,可以檢視init.c 中的
// do_push 實質會呼叫 _dispatch_root_queue_push
dx_push(dq, dou, qos);
}
這裡又出現了dx_push
,檢視init.c
,實質會呼叫_dispatch_root_queue_push
:
void
_dispatch_root_queue_push(dispatch_queue_t rq, dispatch_object_t dou,
dispatch_qos_t qos)
{
#if DISPATCH_USE_KEVENT_WORKQUEUE
dispatch_deferred_items_t ddi = _dispatch_deferred_items_get();
if (unlikely(ddi && ddi->ddi_can_stash)) {
dispatch_object_t old_dou = ddi->ddi_stashed_dou;
dispatch_priority_t rq_overcommit;
rq_overcommit = rq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
if (likely(!old_dou._do || rq_overcommit)) {
dispatch_queue_t old_rq = ddi->ddi_stashed_rq;
dispatch_qos_t old_qos = ddi->ddi_stashed_qos;
ddi->ddi_stashed_rq = rq;
ddi->ddi_stashed_dou = dou;
ddi->ddi_stashed_qos = qos;
_dispatch_debug("deferring item %p, rq %p, qos %d",
dou._do, rq, qos);
if (rq_overcommit) {
ddi->ddi_can_stash = false;
}
if (likely(!old_dou._do)) {
return;
}
// push the previously stashed item
qos = old_qos;
rq = old_rq;
dou = old_dou;
}
}
#endif
#if HAVE_PTHREAD_WORKQUEUE_QOS
if (_dispatch_root_queue_push_needs_override(rq, qos)) { // 判斷root queue的優先順序和 自定義優先順序是否相等,不相等,進入if(一般不相等)
return _dispatch_root_queue_push_override(rq, dou, qos);
}
#else
(void)qos;
#endif
_dispatch_root_queue_push_inline(rq, dou, dou, 1);
}
會呼叫_dispatch_root_queue_push_override
:
static void
_dispatch_root_queue_push_override(dispatch_queue_t orig_rq,
dispatch_object_t dou, dispatch_qos_t qos)
{
bool overcommit = orig_rq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
dispatch_queue_t rq = _dispatch_get_root_queue(qos, overcommit); // 根據優先順序,獲取root queue
dispatch_continuation_t dc = dou._dc;
if (_dispatch_object_is_redirection(dc)) {
// no double-wrap is needed, _dispatch_async_redirect_invoke will do
// the right thing
dc->dc_func = (void *)orig_rq;
} else {
dc = _dispatch_continuation_alloc();
dc->do_vtable = DC_VTABLE(OVERRIDE_OWNING);
// fake that we queued `dou` on `orig_rq` for introspection purposes
_dispatch_trace_continuation_push(orig_rq, dou);
dc->dc_ctxt = dc;
dc->dc_other = orig_rq;
dc->dc_data = dou._do;
dc->dc_priority = DISPATCH_NO_PRIORITY;
dc->dc_voucher = DISPATCH_NO_VOUCHER;
}
_dispatch_root_queue_push_inline(rq, dc, dc, 1); // 又會呼叫這裡
}
static inline void
_dispatch_root_queue_push_inline(dispatch_queue_t dq, dispatch_object_t _head,
dispatch_object_t _tail, int n)
{
struct dispatch_object_s *head = _head._do, *tail = _tail._do;
// 當queue為空,且需要設定header時,會進入到這裡。這裡應該是第一次使用root queue的時候會進入一次
if (unlikely(_dispatch_queue_push_update_tail_list(dq, head, tail))) { // 嘗試更新dq的list,如果是第一次,list 為空,返回false
_dispatch_queue_push_update_head(dq, head); // 設定queue 頭
return _dispatch_global_queue_poke(dq, n, 0); // 這裡啟用root queue,這裡的n是入隊dq個數,是1
}
}
上面的判斷很重要:
if (unlikely(_dispatch_queue_push_update_tail_list(dq, head, tail))) { // 嘗試更新dq的list,如果是第一次,list 為空,返回false
_dispatch_queue_push_update_head(dq, head); // 設定queue 頭
return _dispatch_global_queue_poke(dq, n, 0); // 這裡啟用root queue,這裡的n是入隊dq個數,是1
}
在執行新的任務時,GCD會嘗試更新root queue的任務列表。如果是第一次向root queue投遞任務,則此時的任務列表是空,更新任務列表失敗,則會進入_dispatch_global_queue_poke
來啟用root queue:
void
_dispatch_global_queue_poke(dispatch_queue_t dq, int n, int floor)
{
if (!_dispatch_queue_class_probe(dq)) { // 如果還有要執行的,直接返回
return;
}
#if DISPATCH_USE_WORKQUEUES
dispatch_root_queue_context_t qc = dq->do_ctxt;
if (
#if DISPATCH_USE_PTHREAD_POOL
(qc->dgq_kworkqueue != (void*)(~0ul)) &&
#endif
!os_atomic_cmpxchg2o(qc, dgq_pending, 0, n, relaxed)) {
_dispatch_root_queue_debug("worker thread request still pending for "
"global queue: %p", dq);
return;
}
#endif // DISPATCH_USE_WORKQUEUES
return _dispatch_global_queue_poke_slow(dq, n, floor);
}
poke 會進入第二階段 _dispatch_global_queue_poke_slow
:
static void
_dispatch_global_queue_poke_slow(dispatch_queue_t dq, int n, int floor)
{
dispatch_root_queue_context_t qc = dq->do_ctxt;
int remaining = n; // remaining 表示要執行的任務數量 1
int r = ENOSYS;
// step1. 先初始化root queues 包括初始化XUN 的workqueue
_dispatch_root_queues_init();
_dispatch_debug_root_queue(dq, __func__);
#if DISPATCH_USE_WORKQUEUES
#if DISPATCH_USE_PTHREAD_POOL
if (qc->dgq_kworkqueue != (void*)(~0ul))
#endif
{
_dispatch_root_queue_debug("requesting new worker thread for global "
"queue: %p", dq);
#if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
if (qc->dgq_kworkqueue) {
pthread_workitem_handle_t wh;
unsigned int gen_cnt;
do {
// 呼叫XUN核心的workqueue函式,來維護GCD層的 pthread pool
r = pthread_workqueue_additem_np(qc->dgq_kworkqueue,
_dispatch_worker_thread4, dq, &wh, &gen_cnt);
(void)dispatch_assume_zero(r);
} while (--remaining);
return;
}
#endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
#if HAVE_PTHREAD_WORKQUEUE_QOS
r = _pthread_workqueue_addthreads(remaining,
_dispatch_priority_to_pp(dq->dq_priority));
#elif DISPATCH_USE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
r = pthread_workqueue_addthreads_np(qc->dgq_wq_priority,
qc->dgq_wq_options, remaining);
#endif
(void)dispatch_assume_zero(r);
return;
}
#endif // DISPATCH_USE_WORKQUEUES
#if DISPATCH_USE_PTHREAD_POOL
dispatch_pthread_root_queue_context_t pqc = qc->dgq_ctxt;
if (fastpath(pqc->dpq_thread_mediator.do_vtable)) {
while (dispatch_semaphore_signal(&pqc->dpq_thread_mediator)) { // step2. 喚醒執行緒,來做事情
_dispatch_root_queue_debug("signaled sleeping worker for "
"global queue: %p", dq);
if (!--remaining) { // 如果沒有要處理的dq了,返回
return;
}
}
}
}
說實話,這裡看的不是很清楚,重點是step1, 初始化root queue的方法:_dispatch_root_queues_init
:
void
_dispatch_root_queues_init(void)
{
// 這裡用了dispatch_once_f, 僅會執行一次
static dispatch_once_t _dispatch_root_queues_pred;
dispatch_once_f(&_dispatch_root_queues_pred, NULL,
_dispatch_root_queues_init_once);
}
static void
_dispatch_root_queues_init_once(void *context DISPATCH_UNUSED)
{
int wq_supported;
_dispatch_fork_becomes_unsafe();
if (!_dispatch_root_queues_init_workq(&wq_supported)) {
#if DISPATCH_ENABLE_THREAD_POOL
size_t i;
for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) {
bool overcommit = true;
#if TARGET_OS_EMBEDDED || (DISPATCH_USE_INTERNAL_WORKQUEUE && HAVE_DISPATCH_WORKQ_MONITORING)
// some software hangs if the non-overcommitting queues do not
// overcommit when threads block. Someday, this behavior should
// apply to all platforms
if (!(i & 1)) {
overcommit = false;
}
#endif
_dispatch_root_queue_init_pthread_pool(
&_dispatch_root_queue_contexts[i], 0, overcommit);
}
#else
DISPATCH_INTERNAL_CRASH((errno << 16) | wq_supported,
"Root queue initialization failed");
#endif // DISPATCH_ENABLE_THREAD_POOL
}
}
這個初始化函式有兩個分支,GCD首先會呼叫_dispatch_root_queues_init_workq
來初始化root queues,如果不成功,才使用_dispatch_root_queue_init_pthread_pool
。 這裡可以看出,GCD是優先使用XUN核心提供的workqueue,而非使用使用者層的執行緒池。 我們這裡重點關注GCD使用workqueue的情況:
static inline bool
_dispatch_root_queues_init_workq(int *wq_supported)
{
int r; (void)r;
bool result = false;
*wq_supported = 0;
#if DISPATCH_USE_WORKQUEUES
bool disable_wq = false; (void)disable_wq;
#if DISPATCH_ENABLE_THREAD_POOL && DISPATCH_DEBUG
disable_wq = slowpath(getenv(
相關推薦
GCD原始碼吐血分析(2)——dispatch_async/dispatch_sync
上一章中,我們知道了獲取GCD queue的底層實現。獲取到queue後,就需要將任務提交到queue中進行處理。 我們有兩種方式提交任務: dispatch_async和dispatch_sync。一個是非同步不等待任務完成就返回,另一個是同步任務,需要等待任務完成。這兩種提交任務
GCD原始碼吐血分析(1)——GCD Queue
看了快半個月的GCD原始碼,只能說太變態了。 先來吐槽一下:一個函式,呼叫棧都是十幾層…… 為了效率,程式碼使用了純C語言,但是為了模擬面向物件中的繼承,虛擬函式等,定義了一層層的巨集定義,看一個struct的定義要繞過來繞過去…… 網上的資料極少,有的那幾篇,還都是用舊版本的GCD在
完整詳解 swift GCD系列(一)dispatch_async;dispatch_sync;dispatch_async_f;dispatch_sync_f
為什麼要寫這個系列,因為百度了一下,找了很多都是些片面的Blog,拷貝來拷貝去的,寫的也很粗糙。
所以,我要寫這個系列,儘量把官網文件中GCD的強大功能完整的表達出來。方便自己,也方便別人,如果發現有問題,歡迎提出
本教程的計劃:在完整的看過GCD的官方文件之後,我實在想
Netty Pipeline原始碼分析(2)
原文連結:wangwei.one/posts/netty…
前面 ,我們分析了Netty Pipeline的初始化及節點新增與刪除邏輯。接下來,我們將來分析Pipeline的事件傳播機制。
Netty版本:4.1.30
inBound事件傳播
示例
我們通過下面這個例子來演示Ne
lucene原始碼分析(2)讀取過程例項
1.官方提供的程式碼demo
Analyzer analyzer = new StandardAnalyzer();
// Store the index in memory:
Directory directory = new RAMDirec
x264裡的2pass指的是什麼意思 x264原始碼分析2 encode
分享一下我老師大神的人工智慧教程!零基礎,通俗易懂!http://blog.csdn.net/jiangjunshow
也歡迎大家轉載本篇文章。分享知識,造福人民,實現我們中華民族偉大復興!
 
Shiro原始碼分析(2) - 會話管理器(SessionManager)
本文在於分析Shiro原始碼,對於新學習的朋友可以參考
[開濤部落格](http://jinnianshilongnian.iteye.com/blog/2018398)進行學習。
本文對Shiro中的SessionManager進行分析,SessionMan
《2.uboot和系統移植-第6部分-2.6.uboot原始碼分析2-啟動第二階段》
《2.uboot和系統移植-第6部分-2.6.uboot原始碼分析2-啟動第二階段》
第一部分、章節目錄 2.6.1.start_armboot函式簡介 2.6.2.start_armboot解析1 2.6.3.記憶體使用排布 2.6.4.start_armboot解析2 2.6.5.s
am335x 核心原始碼分析2 LCD移植
1、/arch/arm/mach-omap2/board-am335xevm.c/lcdc_init(){得到LCD硬體引數struct da8xx_lcdc_platform_data} -> am33xx_register_lcdc() -> omap_device_
MyBatis原始碼分析-2-基礎支援層-反射模組-TypeParameterResolver/ObjectFactory
TypeParameterResolver:
TypeParameterResolver的功能是:當存在複雜的繼承關係以及泛型定義時, TypeParameterResolver 可以幫助我們解析欄位、方法引數或方法返回值的型別。TypeParameterResolver 是在Refelctor
Spark2.2.2原始碼解析: 2.啟動master節點流程分析
本文主要說明在啟動master節點的時候,程式碼的流程走向。
授予檔案執行許可權
chmod755
兩個目錄裡的檔案:
/workspace/spark-2.2.2/bin --所有檔案
/workspace/spark-2.2.2/sb
muduo原始碼分析(2) --記憶體分配
寫在前面:
這個原始碼是分析libevent-2.0.20-stable, 並非最新版本的libevent,作者並沒有全看原始碼,在這裡會推薦以下參考的一些網站,也歡迎大家在不足的地方提出來進行討論。
什麼都沒包裝的記憶體管理
預設情況下,l
Minix3原始碼分析(2)——系統初始化
minix3的啟動牽扯到幾次控制權轉移,它們發生在mpx386.s中的組合語言例程和start.c及main.c中的C語言例程之間。
彙編程式碼需要做許多工作,包括建立一個 棧幀 以便為C編譯器編譯的程式碼提供適當的環境,複製處理器所使用的表格來定義儲存器段,建
Spring5原始碼分析系列(四)Spring5原始碼分析2
本文緊接上文Spring5原始碼分析1,講解基於XML的依賴注入,文章參考自Tom老師視訊,下一篇文章將介紹基於Annotation的依賴注入。
基於XML的依賴注入
1、依賴注入發生的時間
當SpringIOC容器完成了Bean定義資源的定位、載入和解析註冊以後,IO
JDK1.8ArrayList原始碼分析2
E get(int index)
因為ArrayList是採用陣列結構來儲存的,所以它的get方法非常簡單,先是判斷一下有沒有越界,之後就可以直接通過陣列下標來獲取元素了,所以get的時間複雜度是O(1)。
/**
* Returns the
谷歌瀏覽器的原始碼分析 2
這麼大的工程,我從哪裡開始呢?我認為從介面開始,這樣才可以快速地深入研究。下面就可以先嚐試修改一個chrome的關於對話方塊,上一次看到它是英語的,那麼我就來把它改成中文的吧,這樣有目標了。從chrome的工程裡可以看到它是支援多種語言的,在Windows平臺上支援多語言的標準做法,就是寫多個語言的DL
3.24 vchain原始碼分析2
接下來是合約的第二部分,直接上程式碼,註釋都在程式碼中
// Contract to sell and distribute VEN tokens
// 分發VEN 代幣
contract VENSale is Owned{
/// chart of stage t
x264裡的2pass指的是什麼意思 x264原始碼分析2 encode
A:x264裡的2pass指的是什麼意思?另外stat是什麼意思, 比如有個引數--stats <string> Filename for 2 pass stats [/"%s/"]/n", defaults->rc.psz_stat_out )
新人分享——hadoop原始碼分析2
這裡配置好了這個類,獲取outputFormat
-之後就是會去讀取切片的元資料資訊,然後獲取reduce數量,之後會設定作業進度,然後獲取可執行的maptask
執行maptask
在這裡跑任務,並且監控
我們來看maptaskRunable的run方法
這裡設定了一堆屬性在裡面
這裡map.r
Fabric 1.0原始碼分析(2) blockfile(區塊檔案儲存)
Fabric 1.0原始碼筆記 之 blockfile(區塊檔案儲存)
1、blockfile概述
blockfile,即Fabric區塊鏈區塊檔案儲存,預設目錄/var/hyperledger/production/ledgersData/chains,含in