1. 程式人生 > >訊號量Semaphore的實現

訊號量Semaphore的實現

這節我們來看一下訊號量的實現方式,下面是我們本次的入口程式碼,我們先看一下非公平的方式是怎麼做的:

Semaphore semaphore = new Semaphore(1);
semaphore.acquire(1);
semaphore.release(1);

首先來看一下它的構造方法:

public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

它是基於非公平框架實現的:

Sync(int permits) {
            setState(permits);
        }

最後呼叫父類的構造方法儲存了我們傳入的資源的個數,下面就開始看只存在一個執行緒獲取資源的時候流程是怎樣的:

public void acquire(int permits) throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireSharedInterruptibly(permits);
    }

在共享模式下獲得資源,如果獲取資源的執行緒被中斷了,那麼就終止資源獲取:

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

先嚐試在共享模式下獲取資源,會返回獲取資源後還剩下的資源的數量,如果小於0,說明有一部分資源不能獲取到,導致資源獲取失敗,需要等待其他執行緒釋放資源後再獲取

protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }

非公平模式下嘗試獲取資源:

final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
            	//獲取還剩下的可獲取的資源的數量
                int available = getState();
                //計算如果當前執行緒獲取到了所需要的資源後還剩下多少資源
                int remaining = available - acquires;
                //如果計算後剩下的資源小於0,說明當前執行緒不能獲取資源,否則通過CAS方式獲取資源
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

嘗試獲取資源主要做了三件事: 1、獲取還剩下的可獲取的資源的數量 2、計算如果當前執行緒獲取到了所需要的資源後還剩下多少資源 3、如果計算後剩下的資源小於0,說明當前執行緒不能獲取資源,否則通過CAS方式獲取資源 返回後就可用通過返回值是否小於0來判斷是否成功獲取到了資源,如果獲取成功什麼也不做,如果沒獲取到放到同步佇列裡面:

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        //建立一個共享的結點新增到同步佇列裡面
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
            	//獲得當前結點的前驅結點
                final Node p = node.predecessor();
                //如果前驅結點是頭結點
                if (p == head) {
                	//那麼當前結點可以嘗試獲取資源
                    int r = tryAcquireShared(arg);
                    //如果返回值大於等於0,說明成功獲取到了資源
                    if (r >= 0) {
                    	//將當前結點設定為頭結點並傳播
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
               //調整同步佇列中node結點的狀態並判斷是否應該被掛起
         	  //並判斷是否需要被中斷,如果中斷直接丟擲異常,當前結點請求也就結束
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

其實這個方法和可重入鎖做的事情差不多,首先建立一個共享結點新增到同步佇列中,進入死迴圈不斷嘗試獲取資源,中間過程會改變自己的狀態來選擇是否應該掛起:

 private Node addWaiter(Node mode) {
 		//建立一個結點
        Node node = new Node(Thread.currentThread(), mode);
        //新增到同步佇列中
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

首先建立一個結點,如果這是第一次建立同步佇列的結點需要通過enq方法入隊,否則通過CAS方式直接新增到佇列中:

Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

構造方法中與當前執行緒進行了繫結。 我們假設當前建立的結點能獲取到資源會進入這個方法:

private void setHeadAndPropagate(Node node, int propagate) {
		//記錄舊的頭節點
        Node h = head; 
        //將當前結點設為頭節點
        setHead(node);
        //如果剩餘的資源大於0
        if (propagate > 0 || 
        	//如果舊的頭結點為空
       		 h == null ||
       		 //如果舊的頭節點狀態小於0
      		   h.waitStatus < 0 ||
      		   	//如果新的頭節點為空
           		 (h = head) == null || 
           		 //如果新的頭節點狀態小於0
          		    h.waitStatus < 0) {
            
            Node s = node.next;
            if (s == null || s.isShared())
            	//喚醒下一個結點
                doReleaseShared();
        }
    }

在這個方法中修改頭節點並且嘗試喚醒下一個頭節點:

private void doReleaseShared() {

        for (;;) {
        	//獲得頭節點
            Node h = head;
            //如果同步佇列中存在需要獲取資源的結點
            if (h != null && h != tail) {
            	//獲取頭結點的狀態
                int ws = h.waitStatus;
                //修改頭節點的狀態並喚醒
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            // 如果頭結點發生變化,則繼續迴圈。否則,退出迴圈。
            if (h == head)                   // loop if head changed
                break;
        }
    }

保證釋放動作(向同步等待佇列尾部)傳遞,即使沒有其他正在進行的請求或釋放動作。如果頭節點的後繼節點需要喚醒,那麼執行喚醒動作;如果不需要,將頭結點的等待狀態設定為PROPAGATE保證喚醒傳遞。另外,為了防止過程中有新節點進入(佇列),這裡必需做迴圈,所以,和其他unparkSuccessor方法使用方式不一樣的是,如果(頭結點)等待狀態設定失敗,重新檢測。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        //如果前驅結點已經是喚醒模式直接返回
        if (ws == Node.SIGNAL)
            return true;
        //如果前驅結點是取消狀態,需要重構同步佇列
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
           //否則將前驅結點設定為喚醒模式
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

如果前驅結點已經是喚醒模式了,那麼當前結點可以放心的掛起,如果前驅結點被取消了,那麼就要修改佇列的連結關係,否則通過CAS將前驅結點設定為喚醒模式。 接下來我們看一下是如何釋放資源的:

public void release(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.releaseShared(permits);
    }

呼叫同步框架是釋放資源的方法:

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

首先會嘗試釋放資源,釋放成功後喚醒同步佇列的下一個結點:

protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

通過CAS的方式將資源還回去。 訊號量的公平方式和非公平方式的區別:

protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

公平方式在嘗試獲取資源的時候之後同步佇列中沒有結點才會嘗試去獲取,而非公平方式會直接去嘗試獲取。