阻塞和非阻塞佇列的併發安全原理是什麼?
ArrayBlockingQueue 原始碼分析
我們首先看一下 ArrayBlockingQueue 的原始碼,ArrayBlockingQueue 有以下幾個重要的屬性:
- 第一個就是最核心的、用於儲存元素的 Object 型別的陣列;然
- 後它還會有兩個位置變數,分別是 takeIndex 和 putIndex,這兩個變數就是用來標明下一次讀取和寫入位置的;
- 另外還有一個 count 用來計數,它所記錄的就是佇列中的元素個數。
另外,我們再來看下面這三個變數:
這三個變數也非常關鍵,第一個就是一個 ReentrantLock,而下面兩個 Condition 分別是由 ReentrantLock 產生出來的,這三個變數就是我們實現執行緒安全最核心的工具。
ArrayBlockingQueue 實現併發同步的原理就是利用 ReentrantLock 和它的兩個 Condition,讀操作和寫操作都需要先獲取到 ReentrantLock 獨佔鎖才能進行下一步操作。進行讀操作時如果佇列為空,執行緒就會進入到讀執行緒專屬的 notEmpty 的 Condition 的佇列中去排隊,等待寫執行緒寫入新的元素;同理,如果佇列已滿,這個時候寫操作的執行緒會進入到寫執行緒專屬的 notFull 佇列中去排隊,等待讀執行緒將佇列元素移除並騰出空間。
下面,我們來分析一下最重要的 put 方法:
在 put 方法中,首先用 checkNotNull 方法去檢查插入的元素是不是 null。如果不是 null,我們會用 ReentrantLock 上鎖,並且上鎖方法是 lock.lockInterruptibly()。
在獲取鎖的同時是可以響應中斷的,這也正是我們的阻塞佇列在呼叫 put 方法時,在嘗試獲取鎖但還沒拿到鎖的期間可以響應中斷的底層原因。
緊接著 ,是一個非常經典的 try finally 程式碼塊,finally 中會去解鎖,try 中會有一個 while 迴圈,它會檢查當前佇列是不是已經滿了,也就是 count 是否等於陣列的長度。
如果等於就代表已經滿了,於是我們便會進行等待,直到有空餘的時候,我們才會執行下一步操作,呼叫 enqueue 方法讓元素進入佇列,最後用 unlock 方法解鎖。
你看到這段程式碼不知道是否眼熟,我在用 Condition 實現生產者/消費者模式的時候,寫過一個 put 方法,程式碼如下:
可以看出,這兩個方法幾乎是一模一樣的,所以我們自己用 Condition 實現生產者/消費者模式,實際上其本質就是自己實現了簡易版的 BlockingQueue。
你可以對比一下這兩個 put 方法的實現,這樣對 Condition 的理解就會更加深刻。
和 ArrayBlockingQueue 類似,其他各種阻塞佇列如 LinkedBlockingQueue、PriorityBlockingQueue、DelayQueue、DelayedWorkQueue 等一系列 BlockingQueue 的內部也是利用了 ReentrantLock 來保證執行緒安全,只不過細節有差異,比如: LinkedBlockingQueue 的內部有兩把鎖,分別鎖住佇列的頭和尾,比共用同一把鎖的效率更高,不過總體思想都是類似的。
非阻塞佇列ConcurrentLinkedQueue
看完阻塞佇列之後,我們就來看看非阻塞佇列 ConcurrentLinkedQueue。顧名思義,ConcurrentLinkedQueue 是使用連結串列作為其資料結構的,我們來看一下關鍵方法 offer 的原始碼:
在檢查完空判斷之後,可以看到它整個是一個大的 for 迴圈,而且是一個非常明顯的死迴圈。
在這個迴圈中有一個非常亮眼的 p.casNext 方法,這個方法正是利用了 CAS 來操作的,而且這個死迴圈去配合 CAS 也就是典型的樂觀鎖的思想。我們就來看一下 p.casNext 方法的具體實現,其方法程式碼如下:
可以看出這裡運用了 UNSAFE.compareAndSwapObject 方法來完成 CAS 操作,而 compareAndSwapObject 是一個 native 方法,最終會利用 CPU 的 CAS 指令保證其不可中斷。
可以看出,非阻塞佇列 ConcurrentLinkedQueue 使用 :CAS 非阻塞演算法 + 不停重試,來實現執行緒安全,適合用在不需要阻塞功能,且併發不是特別劇烈的場景。
總結
本阻塞佇列最主要是利用了 ReentrantLock 以及它的 Condition 來實現,而非阻塞佇列則是利用 CAS 方法實現執行緒安全。