1. 程式人生 > >AQS的不公平鎖原始碼

AQS的不公平鎖原始碼

同步器節點的waitStatus解釋
CANCELLED 取消狀態

SIGNAL -1 等待觸發狀態,前節點可能是head或者前節點為取消狀態CANCELLED

CONDITION -2 等待條件狀態,在等待佇列中

PROPAGATE -3 狀態需要向後傳播


//不公平鎖的lock函式
static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         
*/ final void lock() { //首先嚐試獲得鎖,如果獲得,就將獨佔鎖的內部變數改為當前執行緒 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else //如果沒有獲得鎖,就轉到AQS類的acquire函式中 acquire(1); } protected final boolean tryAcquire(int
acquires) { return nonfairTryAcquire(acquires); } } //AQS類的acquire函式,傳入引數1 public final void acquire(int arg) { if (!tryAcquire(arg) && //檢視是否能夠獲得鎖,就是上面的NonfairSync上面的tryAcquire方法,這個方法重寫了虛類方法,如果不能,將返回false,檢視&&後的方法 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
//ReentrantLock類的nonfairTryAcquire函式 final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { //嘗試獲得鎖,如果成功了就進去執行,這個也體現了不公平鎖的機制 setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { //如果沒有成功,鎖是被自己佔住的,那就可以進入,體現了可重入的機制 int nextc = c + acquires; //計數器加一 if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; //這個時候的情況就是鎖是被別的執行緒佔領的 } //AQS類的addWaiter函式,傳入引數Node.EXCLUSIVE private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // 用快速入列法試驗一下 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { //cas操作設定尾節點 pred.next = node; return node; } } enq(node); //當cas操作不成,或者佇列中沒有別的執行緒的時候用這個 return node; } //AQS類的enq函式,傳入引數等待假如阻塞佇列的執行緒 private Node enq(final Node node) { //死迴圈,不加入不出來 for (;;) { Node t = tail; //獲取尾節點 if (t == null) { // 如果尾節點是空,說明這個是第一個阻塞執行緒 if (compareAndSetHead(new Node())) //設定阻塞執行緒的頭部,一個沒有任何用處的頭部 tail = head; //尾巴就是頭,然後再次迴圈 } else { node.prev = t; if (compareAndSetTail(t, node)) { //cas操作將新的執行緒插入到tail,如果不成功將會再次迴圈過來 t.next = node; return t; //這個返回值在addWaiter函式中沒有用 } } } } //通過呼叫addWaiter函式,AQS將當前執行緒加入到了等待佇列,但是還沒有阻塞當前執行緒的執行,接下來我們就來分析一下acquireQueued函式. //AQS類的acquireQueued函式,傳入引數是已經加入到佇列中的新的node節點 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { //如果新插入的這個節點是head節點的後面節點,說明新來的節點是將要獲取鎖的執行緒,嘗試去獲取鎖 setHead(node); //將自己設定為頭節點 p.next = null; // help GC //原來的節點next置空 failed = false; return interrupted; //因為沒有被中斷所以返回false } if (shouldParkAfterFailedAcquire(p, node) && //判斷是否要進入阻塞狀態.如果`shouldParkAfterFailedAcquire`返回true,表示需要進入阻塞 parkAndCheckInterrupt()) ////呼叫parkAndCheckInterrupt掛起執行緒,等待被喚醒 interrupted = true; } } finally { if (failed) cancelAcquire(node); } } //前面我們已經說過只有前一個節點pred的執行緒狀態為SIGNAL時,當前節點的執行緒才能被掛起。 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. *前一個節點正在等待鎖的釋放,所以這個新的節點要阻塞 */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. * 前一個節點處於取消獲取獨佔性變數的狀態,所以,可以跳過去,返回false */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }