1. 程式人生 > IOS開發 >iOS原始碼解析: dispatch_once是如何實現的?

iOS原始碼解析: dispatch_once是如何實現的?

在之前的一篇文章 iOS原始碼解析: NotificationCenter是如何實現的? 中,順便介紹了在dispatch_once時使用跨執行緒操作而導致死鎖的情況。本文基於dispatch_once的原始碼,進一步介紹一下iOS習以為常的單例模式。看似非常簡單,不過實際要考慮下邊幾個關鍵點:

  1. 懶載入
  2. 執行緒安全
  3. 編譯器指令重排優化
  4. 可繼承、方法可override

Java的單例模式

最早接觸的是Java中的幾種單例寫法,當時覺得非常神奇。一步步改進的過程值得好好思考。

1 lazy loading & 非執行緒安全

public class Singleton {
	private
static Singleton instance; private Singleton() {} public static Singleton sharedInstance() { if (instance == null) { instance = new Singleton(); } return instance; } } 複製程式碼

嚴格來說,這種非執行緒安全的方式,根本算不上單例。

2 lazy loading & 執行緒安全

public class Singleton {
	private static Singleton instance;
	private
Singleton()
{} public static synchronized Singleton sharedInstance() { if (instance == null) { instance = new Singleton(); } return instance; } } 複製程式碼

加上synchronized,能夠保證執行緒安全。但所有的sharedInstance使用都加了鎖,效率低下。

3 non lazy loading & 執行緒安全

以上的lazy loading俗稱懶漢模式,僅在使用到的時候才去初始化instance變數。

而下邊的這種俗稱餓漢模式,instance在類載入的時候就例項化了。

public class Singleton {
	private static Singleton instance = new Singleton();
	private Singleton() {}
	public static Singleton sharedInstance() {
		return instance;
	}
}
複製程式碼

餓漢模式是執行緒安全的,但卻失去了lazy loading的效果。有時候提前初始化一些不必要的例項物件,甚至會嚴重影響效能。

4 靜態內部類 & 執行緒安全

public class Singleton {
	private static class SingletonHolder {
		private static final Singleton singleton = new Singleton();
	}

	private Singleton() {}
	public static final Singleton sharedInstance() {
		return SingletonHolder.singleton;
	}
}
複製程式碼

這種方式引入了一個內部類,避免了在Singleton載入的時候就初始化一個例項物件。從而兼顧了lazy loading和執行緒安全。

5 列舉 & 執行緒安全

public enum Singleton {
	INSTANCE;
	public void myMethod() {
		System.out.println("myMethod");
	}
}
複製程式碼

這種方式可以說是Java單例的終極寫法,但卻無法繼承了。

6 lazy loading & 雙重校驗鎖

基於方式2的優化版本,主要優化synchronized的使用:

public class Singleton {
	private static Singleton instance;
	private Singleton() {}
	public static Singleton sharedInstance() {
		if (instance == null) {
			synchronized (Singleton.class) {
				if (instance == null) {
					instance = new Singleton();
				}
			}
		}
		return instance;
	}
}
複製程式碼

這個雙重校驗很關鍵,尤其是內部的 if (instance == null) 同樣是必不可少的。多執行緒同時呼叫sharedInstance,雖然有加鎖,但加鎖的程式碼塊中如果沒有雙重校驗,依然會執行初始化操作。

這種方式已經非常安全了,但依然會有極低概率出現問題。***instance = new Singleton();**8 這句程式碼,並非是原子操作。實際上,這句程式碼做了以下三件事:

  1. 給instance分配一塊記憶體
  2. 呼叫Singleton的建構函式來初始化一個例項A
  3. 將instance指向初始化的例項A,此時instance就不是null了

JVM的編譯器存在執行重排的優化,使得以上的2和3的執行順序可能會變,即最終執行順序可能是1-2-3或1-3-2。如果是1-3-2,則3執行完畢、2未執行之前,這個臨界狀態是很危險的。這時的instance不是null,指向的是一塊未初始化的記憶體區域。假設此時其他執行緒呼叫sharedInstance函式,剛好執行到了外層的 if (instance == null) 判斷,instance非null,則將這個未初始化的記憶體返回了。

總結一下:對instance的寫操作未完成,其他執行緒就對其進行了讀操作。因此確保 instance的寫操作 為原子操作即可。

7 volatile

volatile關鍵字的作用是禁止指令重排,對instance的寫操作會有一個記憶體屏障。確保了6中的執行順序始終為1-2-3。即

public class Singleton {
	private static volatile Singleton instance;
	private Singleton() {}
	public static Singleton sharedInstance() {
		if (instance == null) {
			synchronized (Singleton.class) {
				if (instance == null) {
					instance = new Singleton();
				}
			}
		}
		return instance;
	}
}
複製程式碼

講了這麼多,實際可以根據使用場景選擇 方式5或者方式7 即可。下邊來看看iOS中的情況。

iOS中的單例模式

Objective-C

Objective-C中的單例寫法如下,這個太常見了沒什麼可說的

@implementation MyObject

+ (instancetype)sharedInstance {
    static MyObject *instance;
    static dispatch_once_t onceToken;
    dispatch_once(&onceToken,^{
        instance = [[MyObject alloc] init];
    });
    return instance;
}

@end
複製程式碼

Swift

Swift預設沒有dispatch_once,可以使用static let即可實現單例。不過這樣也就沒有了lazy loading的效果,即餓漢模式。

class SwiftyMediator {
    static let shared = SwiftyMediator()
	private init() {}
}
複製程式碼

而如果想在業務中使用dispatch_once的類似作用,可以採用如下方式:

public extension DispatchQueue {
    private static var onceTokens = [String]()
    
    class func once(token: String,block: () -> Void) {
        objc_sync_enter(self)
        defer { objc_sync_exit(self) }
        
        if onceTokens.contains(token) {
            return
        }
        
        onceTokens.append(token)
        block()
    }
}
複製程式碼

dispatch_once的底層實現

dispatch_once的底層實現其實並不複雜:

void
dispatch_once(dispatch_once_t *val,dispatch_block_t block)
{
	dispatch_once_f(val,block,_dispatch_Block_invoke(block));
}
複製程式碼
#define _dispatch_Block_invoke(bb) \
		( (dispatch_function_t) ((struct Block_layout *)bb)->invoke )

typedef void (*dispatch_function_t)(void *_Nullable);
複製程式碼

dispatch_function_t就是一個函式指標。***_dispatch_Block_invoke(block)*** 實際上將block轉為 ***struct Block_layout ****,將其invoke函式轉為dispatch_function_t函式指標。

dispatch_once_f

dispatch_once_f的主體流程就是一個if判斷,可以簡單理解為 首次if判斷返回YES,進入執行;後來if判斷返回NO,進入等待流程

void
dispatch_once_f(dispatch_once_t *val,void *ctxt,dispatch_function_t func)
{
	dispatch_once_gate_t l = (dispatch_once_gate_t)val;

#if !DISPATCH_ONCE_INLINE_FASTPATH || DISPATCH_ONCE_USE_QUIESCENT_COUNTER
	uintptr_t v = os_atomic_load(&l->dgo_once,acquire);
	if (likely(v == DLOCK_ONCE_DONE)) {
		return;
	}
#if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
	if (likely(DISPATCH_ONCE_IS_GEN(v))) {
		return _dispatch_once_mark_done_if_quiesced(l,v);
	}
#endif
#endif
	if (_dispatch_once_gate_tryenter(l)) {
		return _dispatch_once_callout(l,ctxt,func);
	}
	return _dispatch_once_wait(l);
}
複製程式碼

在dispatch_once_f的最初,實際上有先判斷 &l->dgo_once 地址中儲存的值。顯然若該值為DLOCK_ONCE_DONE,即為once已經執行過了,程式碼也就直接return了。而這個值DLOCK_ONCE_DONE在後續很多地方有用到。

uintptr_t v = os_atomic_load(&l->dgo_once,acquire);
if (likely(v == DLOCK_ONCE_DONE)) {
	return;
}
複製程式碼

如果該值不為DLOCK_ONCE_DONE,則第一次呼叫時,***_dispatch_once_gate_tryenter(l)*** 可以進入,則執行 ***return _dispatch_once_callout(l,func);***。後續的呼叫,則執行 ***return _dispatch_once_wait(l);***,這就是once的原理。

而它是如何保證多執行緒下的安全性和once特性呢,看一下_dispatch_once_gate_tryenter的實現:

typedef struct dispatch_once_gate_s {
	union {
		dispatch_gate_s dgo_gate;
		uintptr_t dgo_once;
	};
} dispatch_once_gate_s,*dispatch_once_gate_t;

#define DLOCK_ONCE_UNLOCKED	((uintptr_t)0)
#define DLOCK_ONCE_DONE		(~(uintptr_t)0)

static inline bool
_dispatch_once_gate_tryenter(dispatch_once_gate_t l)
{
	return os_atomic_cmpxchg(&l->dgo_once,DLOCK_ONCE_UNLOCKED,(uintptr_t)_dispatch_lock_value_for_self(),relaxed);
}
複製程式碼

DLOCK_ONCE_UNLOCKED與DLOCK_ONCE_DONE對應,分別代表dispatch_once執行前後的標記狀態。

os_atomic_cmpxchg是一個 比較+交換 的原子操作。比較 &l->dgo_once 的值是否等於 DLOCK_ONCE_UNLOCKED,若是則將 (uintptr_t)_dispatch_lock_value_for_self() 賦值給 &l->dgo_once。即這個原子操作確保了dispatch_once的執行緒安全。

#define DLOCK_OWNER_MASK			((dispatch_lock)0xfffffffc)

static inline dispatch_lock
_dispatch_lock_value_from_tid(dispatch_tid tid)
{
	return tid & DLOCK_OWNER_MASK;
}

DISPATCH_ALWAYS_INLINE
static inline dispatch_lock
_dispatch_lock_value_for_self(void)
{
	return _dispatch_lock_value_from_tid(_dispatch_tid_self());
}
複製程式碼

(uintptr_t)_dispatch_lock_value_for_self() 的返回值在 _dispatch_lock_is_locked 函式中也同樣用到,用於加鎖。

_dispatch_once_wait

而對於非首次的執行,是如何等待,並返回該block執行後生成的sharedInstance物件呢?

void
_dispatch_once_wait(dispatch_once_gate_t dgo)
{
	dispatch_lock self = _dispatch_lock_value_for_self();
	uintptr_t old_v,new_v;
	dispatch_lock *lock = &dgo->dgo_gate.dgl_lock;
	uint32_t timeout = 1;

	for (;;) {
		os_atomic_rmw_loop(&dgo->dgo_once,old_v,new_v,relaxed,{
			if (likely(old_v == DLOCK_ONCE_DONE)) {
				os_atomic_rmw_loop_give_up(return);
			}
#if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
			if (DISPATCH_ONCE_IS_GEN(old_v)) {
				os_atomic_rmw_loop_give_up({
					os_atomic_thread_fence(acquire);
					return _dispatch_once_mark_done_if_quiesced(dgo,old_v);
				});
			}
#endif
			new_v = old_v | (uintptr_t)DLOCK_WAITERS_BIT;
			if (new_v == old_v) os_atomic_rmw_loop_give_up(break);
		});
		if (unlikely(_dispatch_lock_is_locked_by((dispatch_lock)old_v,self))) {
			DISPATCH_CLIENT_CRASH(0,"trying to lock recursively");
		}
#if HAVE_UL_UNFAIR_LOCK
		_dispatch_unfair_lock_wait(lock,(dispatch_lock)new_v,0,DLOCK_LOCK_NONE);
#elif HAVE_FUTEX
		_dispatch_futex_wait(lock,NULL,FUTEX_PRIVATE_FLAG);
#else
		_dispatch_thread_switch(new_v,flags,timeout++);
#endif
		(void)timeout;
	}
}
複製程式碼

os_atomic_rmw_loop用於從作業系統底層獲取狀態,使用 os_atomic_rmw_loop_give_up 來執行返回操作。即不停查詢 &dgo->dgo_once 的值,若變為DLOCK_ONCE_DONE,則呼叫 os_atomic_rmw_loop_give_up(return); 退出等待。

_dispatch_once_callout

首次進入dispatch_once,會執行_dispatch_once_callout的流程,即呼叫該block。傳入的第三個引數func即為之前包裝好的dispatch_function_t函式指標。

static void
_dispatch_once_callout(dispatch_once_gate_t l,dispatch_function_t func)
{
	_dispatch_client_callout(ctxt,func);
	_dispatch_once_gate_broadcast(l);
}
複製程式碼

_dispatch_client_callout就是實際執行block操作的地方:

void
_dispatch_client_callout(void *ctxt,dispatch_function_t f)
{
	_dispatch_get_tsd_base();
	void *u = _dispatch_get_unwind_tsd();
	if (likely(!u)) return f(ctxt);
	_dispatch_set_unwind_tsd(NULL);
	f(ctxt);
	_dispatch_free_unwind_tsd();
	_dispatch_set_unwind_tsd(u);
}
複製程式碼

實際執行block即呼叫 f(ctxt); 函式。

Thread-specific data(TSD)是執行緒私有的資料,包含TSD的一些函式用於向執行緒(thread)物件中儲存和獲取資料。如CFRunLoopGetMain()函式,呼叫_CFRunLoopGet0(),在其中即利用了TSD介面從thread中得到runloop物件。

這裡的 _dispatch_get_tsd_base(); 也獲取執行緒的私有資料。而 _dispatch_get_unwind_tsd、_dispatch_set_unwind_tsd和_dispatch_free_unwind_tsd 看來就是用於確保 f(ctxt) 執行的執行緒安全。

_dispatch_once_gate_broadcast

猜測一下_dispatch_once_gate_broadcast的作用,應該就是在block執行完畢後修改上邊的&l->dgo_once的值,即標記為dispatch_once已經執行過了,且廣播出去。

static inline void
_dispatch_once_gate_broadcast(dispatch_once_gate_t l)
{
	dispatch_lock value_self = _dispatch_lock_value_for_self();
	uintptr_t v;
#if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
	v = _dispatch_once_mark_quiescing(l);
#else
	v = _dispatch_once_mark_done(l);
#endif
	if (likely((dispatch_lock)v == value_self)) return;
	_dispatch_gate_broadcast_slow(&l->dgo_gate,(dispatch_lock)v);
}
複製程式碼

_dispatch_once_mark_done函式中會呼叫os_atomic_xchg,這是一個原子操作,用於將 &dgo->dgo_once 地址儲存的值,設定為 DLOCK_ONCE_DONE 。此時,once操作即被標記為已執行過了。

atomic_xchg:Swaps the old value stored at location p with new value given by val. Returns old value.

static inline uintptr_t
_dispatch_once_mark_done(dispatch_once_gate_t dgo)
{
	return os_atomic_xchg(&dgo->dgo_once,DLOCK_ONCE_DONE,release);
}
複製程式碼

dispatch_once的注意點

GCD經常會隱含一些容易導致異常甚至直接崩潰的坑,大多是不合理的使用引發的。翻牆掛了導致無法Google,其他搜尋引擎真是垃圾。所以,後邊提到的兩個DISPATCH_CLIENT_CRASH場景,留待後續補充吧。

block中如果執行了主執行緒sync操作,則會導致死鎖

iOS原始碼解析: NotificationCenter是如何實現的? 中,順便介紹了在dispatch_once時使用跨執行緒操作而導致死鎖的情況。

DISPATCH_CLIENT_CRASH(0,"trying to lock recursively");

在_dispatch_once_wait中的for迴圈中有這樣一段程式碼:

if (unlikely(_dispatch_lock_is_locked_by((dispatch_lock)old_v,self))) {
	DISPATCH_CLIENT_CRASH(0,"trying to lock recursively");
}
複製程式碼

使用如下程式碼可以觸發這樣的死鎖場景。

@implementation SingletonA

+ (instancetype)sharedInstance {
    static SingletonA *instance;
    static dispatch_once_t onceToken;
    dispatch_once(&onceToken,^{
        instance = [[SingletonA alloc] init];
    });
    return instance;
}

- (instancetype)init
{
    self = [super init];
    if (self) {
        [SingletonB sharedInstance];
    }
    return self;
}

@end

@implementation SingletonB

+ (instancetype)sharedInstance {
    static SingletonB *instance;
    static dispatch_once_t onceToken;
    dispatch_once(&onceToken,^{
        instance = [[SingletonB alloc] init];
    });
    return instance;
}

- (instancetype)init
{
    self = [super init];
    if (self) {
        [SingletonA sharedInstance];
    }
    return self;
}

@end
複製程式碼

錯誤資訊如下:

Thread 1: EXC_BAD_INSTRUCTION (code=EXC_I386_INVOP,subcode=0x0)
複製程式碼
libdispatch.dylib`_dispatch_once_wait.cold.1:
    0x10e8d047b <+0>:  leaq   0x5c11(%rip),%rcx        ; "BUG IN CLIENT OF LIBDISPATCH: trying to lock recursively"
    0x10e8d0482 <+7>:  movq   %rcx,0x27cc7(%rip)       ; gCRAnnotations + 8
->  0x10e8d0489 <+14>: ud2
複製程式碼

以上只是一個非常簡單的模擬,實際場景當然不會這麼寫。但是要注意多層操作後可能的死鎖。

DISPATCH_CLIENT_CRASH(cur,"lock not owned by current thread");

在_dispatch_once_gate_broadcast中,有這樣一句 _dispatch_gate_broadcast_slow(&l->dgo_gate,(dispatch_lock)v);

void
_dispatch_gate_broadcast_slow(dispatch_gate_t dgl,dispatch_lock cur)
{
	if (unlikely(!_dispatch_lock_is_locked_by_self(cur))) {
		DISPATCH_CLIENT_CRASH(cur,"lock not owned by current thread");
	}

#if HAVE_UL_UNFAIR_LOCK
	_dispatch_unfair_lock_wake(&dgl->dgl_lock,ULF_WAKE_ALL);
#elif HAVE_FUTEX
	_dispatch_futex_wake(&dgl->dgl_lock,INT_MAX,FUTEX_PRIVATE_FLAG);
#else
	(void)dgl;
#endif
}
複製程式碼

參考資料