Bug:LinkedTransferQueue的資料暫失和CPU爆滿以及修復
一個因中斷或者超時的呼叫可能會引起資料丟失和CPU爆滿。
前幾天讀LinkedTransferQueue(以下簡稱ltq)的原始碼,想加深下對鬆弛型雙重佇列的理解,無意中發現了這個問題:),經過仔細檢查後確認了這是個bug,存在於JDK1.7.0_40和剛釋出的JDK8中,去google和oracle官方似乎也沒有搜尋到這個問題。
重現bug:先來重現下這個bug,由於對併發執行緒的執行順序預先不能做任何假設,所以很可能根本就不存在所謂的重現錯誤的“測試用例”,或者說這個測試用例應該是某種“執行順序”。所以我一開始的做法是copy了一份ltq的原始碼,通過某個地方加自旋…但是這種方法畢竟要修改原始碼,後來我發現直接debug進原始碼就可以輕易重現bug了。
LinkedTransferQueue:xfer(E e, boolean haveData, int how, long nanos) if (how != NOW) { // No matches available if (s == null) s = new Node(e, haveData); Node pred = tryAppend(s, haveData); if (pred == null) continue retry; // lost race vs opposite mode if (how != ASYNC) return awaitMatch(s, pred, e, (how == TIMED), nanos); } return e; // not waiting
在以上06行Node pred = tryAppend(s, havaData) 斷點(我是windows下用eclipse除錯);
debug以下程式碼:
public static void main(String[] args) { final BlockingQueue<Long> queue = new LinkedTransferQueue<Long>(); Runnable offerTask = new Runnable(){ public void run(){ queue.offer(8L); System.out.println("offerTask thread has gone!"); } }; Runnable takeTask = new Runnable(){ public void run(){ try { System.out.println(Thread.currentThread().getId() + " " +queue.take()); } catch (InterruptedException e) { e.printStackTrace(); } } }; Runnable takeTaskInterrupted = new Runnable(){ public void run(){ Thread.currentThread().interrupt(); try { System.out.println(Thread.currentThread().getId() + " " +queue.take()); } catch (InterruptedException e) { System.out.println(e + " "+Thread.currentThread().getId()); } } }; new Thread(offerTask).start(); new Thread(takeTask).start(); new Thread(takeTaskInterrupted).start(); }
執行到斷點處之後,在Debug介面裡面有Thread-0、Thread-1、Thread-2三個執行緒分別指代程式碼中的offerTask、takeTask、takeTaskInterrupted三者。現在執行三步:
step 1: Resume Thread-1(沒有輸出,執行緒Thread-1自己掛起,等待資料)
step 2: Resume Thread-2(看到類似於 java.lang.InterruptedException 15 的輸出)
step 3: Resume Thread-0(輸出:offerTask thread has gone!)
offer執行緒已經執行完畢,然後我們的64L呢,明明Thread-1在等待資料,資料丟失了嗎?其實不是,只不過take執行緒現在無法取得offer執行緒提交的資料了。
如果你覺得上面的資料丟失還不是什麼大問題請在上面的示例下新增如下程式碼(和你CPU核心數相同的程式碼行:)
.............. new Thread(takeTask).start(); new Thread(takeTask).start(); new Thread(takeTask).start(); new Thread(takeTask).start();
把上面的3個step重新按順序執行一遍,建議先開啟工作管理員,接著忽略斷點,讓接下來這幾個執行緒跑:)
CPU爆滿了吧…其實是被這幾個執行緒佔據了,你去掉幾行程式碼,CPU使用率會有相應的調整。
所以這個bug可能會引起資料暫時遺失和CPU爆滿, 只不過貌似發生這種情況的概率極低。
原因:為什麼會出現這個bug呢,要想了解原因必須先深入分析ltq內部所使用的資料結構和併發策略,ltq內部採用的是一種非常不同的佇列,即鬆弛型雙重佇列(Dual Queues with Slack)。
資料結構:
鬆弛的意思是說,它的head和tail節點相較於其他併發列隊要求上更放鬆,構造它的目的是減少CAS操作的次數(相應的會增加next域的引用次數),舉個例子:某個瞬間tail指向的節點後面已經有6個節點了(以下圖借用原始碼的註釋),而其他併發佇列真正的尾節點最多隻能是tail的下一個節點。
* head tail
* | |
* v v
* M -> M -> U -> U -> U -> U->U->U->U->U
收縮的方式是大部分情況下不會用tail的next來設定tail節點,而是第一次收縮N個next(N>=2),然後檢視能否2個一次來收縮tail。(head類似,並且head改變一次會導致前“head”節點的next域斷裂即如下圖)
*”prehead” head tail
* | | |
* v v v
* M M-> U -> U -> U -> U->U->U->U->U
雙重是指有兩種型別相互對立的節點(Node.isData==false || true),並且我理解的每種節點都有三種狀態:
1 INIT(節點構造完成,剛進入佇列的狀態)
2 MATCHED(節點備置為“滿足”狀態,即入隊節點標識的執行緒成功取得或者傳遞了資料)
3 CANCELED(節點被置為取消狀態,即入隊節點標識的執行緒因為超時或者中斷決定放棄等待)
(bug的原因就是現有程式碼中將2、3都當做MATCHED處理,後面會看到把3獨立出來就修復了這個問題)
併發策略:
既然使用了鬆弛的雙重佇列,那麼當take、offer等方法被呼叫時執行的策略也稍微不同。
就我們示例中的程式碼的流程來看,Thread-0、Thread-1、Thread-2幾乎同時進入到了xfer的呼叫,發現佇列為空,所以都構造了自己的node希望入隊,於是三者都從tail開始加入自己的node,我們在這裡的順序是Thread-1->Thread-2->Thread-0,因為想要入隊還要和當前的tail節點進行匹配得到“認可”才能嘗試入隊,佇列為空Thread-1理所當然入隊成功並且掛起了自己的執行緒(park)等待相對的呼叫來喚醒自己(unpark),然後Thread-2發現佇列末尾的node和自己是同一型別的,於是通過了測試把自己也加入了佇列,由於本身是中斷的所以讓自己進入MATCHED狀態(bug就是這裡了,上面說過CANCEL被當做MATCHED狀態處理),接著我們提交資料的Thread-0來了,發現末尾節點的型別雖然對立但卻是MATCHED狀態(假如不為MATCHED會有退回再從head來探測一次的機會),所以認為佇列已經為空,前面的呼叫已經被匹配完了,然後把自己的node入隊,這樣就形成了如下所示的場景:
* Thread-1 Thread-2 Thread-0
* | | |
* v v v
* REQUEST -> MATCHED -> DATA
好了, 現在Thread-3來了,先探測尾部發現Thread-0的node是型別相反的,於是退回從頭部開始重新探測,但是又發現Thread-1的node的型別是相同的,於是再次去探測尾部看看能否入隊…….結果造成CPU是停不下來的。
修復:
如上面所說,錯誤的本質在於當尾部的節點是CANCELED(取消)狀態時不能作為被匹配完成的MATCHED狀態處理,應該讓後來者回退到head去重新測試一次所以重點是對原始碼做出如下修改(修改放在註釋中):
static final class Node { final boolean isData; // false if this is a request node volatile Object item; // initially non-null if isData; CASed to match volatile Node next; volatile Thread waiter; // null until waiting /* static final Object CANCEL = new Object(); final void forgetWaiter(){ UNSAFE.putObject(this, waiterOffset, null); } final boolean isCanceled(){ return item == CANCEL; } */
在Node節點程式碼中加入標識取消的物件CANCEL。
private E xfer(E e, boolean haveData, int how, long nanos) { if (item != p && (item != null) == isData /*&& item!=Node.CANCEL*/) { // unmatched if (isData == haveData) // can't match
在xfer函式中新增對於為狀態為取消的判斷。
private E xfer(E e, boolean haveData, int how, long nanos) { Node pred = tryAppend(/*s,*/ haveData); ..... } private Node tryAppend(Node s, boolean haveData) { else if (p.cannotPrecede(/*s, */haveData)) else { /* if(p.isCanceled()) p.forgetContents();*/ if (p != t) { // update if slack now >= 2
新增對於前置節點為取消狀態時當前節點的入隊策略
final boolean cannotPrecede(boolean haveData) { boolean d = isData; Object x; return d != haveData && (x = item) != this && (x != null) == d; } final boolean cannotPrecede(Node node, boolean haveData) { boolean d = isData; if(d != haveData){ Object x = item; if(x != this && (x!=null) == d && x!= Node.CANCEL) return true; if(item == CANCEL){ if(node.next != this){ node.next = this; return true; } this.forgetContents(); } } node.next = null; return false; }
這一步是關鍵, 當我們入隊時發現前置節點是型別對立並且取消狀態時,我們就需要多一次的回退探測,所以借用了一下next域來標識這個CANCEL節點,下次經過時或者可以確認它可以當做MATCHED處理(它前面沒有INIT節點)或者已經有別的節點粘接在它後面,我們就進而處理那個節點,總之當我們總是能夠得到正確的行為。
private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) { if ((w.isInterrupted() || (timed && nanos <= 0)) && s.casItem(e, /*Node.CANCEL*/)) { // cancel unsplice(pred, s); return e; }
這一處關鍵點把item的值從原來的s本身修改為我們新增的CANCEL。
額 程式碼好亂,關於這個bug定位應該沒問題,後面的原因很多方面都沒講,剩下的還有很多處大大小小的修改=_=,整個修改之後的LinkedTransferQueue在github上,大家有興趣的話可以參考下,已經通過了 JSR166測試套件。