1. 程式人生 > >高併發學習筆記(八)

高併發學習筆記(八)

今年企業對Java開發的市場需求,你看懂了嗎? >>>   

四、同步器的實現(四)

4.Semaphore

    Semaphore是一個訊號計數量,用於維護一個資源的使用許可量,如果維護的資源使用許可超出會阻塞每一個請求該資源的許可(acquire),直到又有可用的。Semaphore通常用於限制訪問資源的執行緒數目,下面是一個使用示例:

/**
* Semaphore使用示例,Semaphore代表廁所坑位
* Created by bzhang on 2019/3/21.
*/
public class TestSemaphore {
      private Semaphore lock = new Semaphore(3);      //廁所坑位

      public void goToToilet(){
            try {
                  lock.acquire();   //嘗試獲取一個位置
                  System.out.println(Thread.currentThread().getName()+"正在使用廁所");
                  TimeUnit.SECONDS.sleep(2);
                  System.out.println(Thread.currentThread().getName()+"上完廁所,很開心");
            } catch (InterruptedException e) {
                  e.printStackTrace();
            }finally {
                  lock.release();   //釋放一個位置
            }
      }

      public static void main(String[] args) {
            TestSemaphore test = new TestSemaphore();
            ExecutorService pool = Executors.newCachedThreadPool();     //快取執行緒池
            for (int i = 0;i < 11;i++){
                  pool.submit(new Runnable() {
                        @Override
                        public void run() {
                              test.goToToilet();
                        }
                  });
            }
            pool.shutdown();    //關閉執行緒池
      }
}

    Semaphore的使用與ReentrantLock和synchronized的使用很相似,只是鎖的使用一般都是獨佔的,即一次只允許一個執行緒執行。而Semaphore則是許可多個執行緒進行訪問,當Semaphore只許可一個執行緒訪問時,也就退化成鎖。

    Semaphore的底層也是由AQS同步器實現的:

public class Semaphore implements java.io.Serializable {
        //AQS同步的佇列引用
    private final Sync sync;
     
    //由構造方法可以看出Semaphore也分為公平模式和非公平模式
    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

    //根據fair建立公平同步佇列或是非公平同步佇列
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }
}

 

    Semaphore的繼承關係如下圖,是由三個內部類Sync,NonfairSync與FairSync共同來實現Semaphore的訊號計數功能。

    下面先來看公平與非公平模式的實現原始碼:

//非公平模式的實現
static final class NonfairSync extends Sync {
    NonfairSync(int permits) {
        super(permits);
    }

    //重寫的實AQS中的tryAcquireShared方法,說明使用的是共享模式
    //嘗試獲取鎖
    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);    //呼叫父類的非公平模式下嘗試獲取鎖方法
    }
}

//公平模式的實現
static final class FairSync extends Sync {
    FairSync(int permits) {
        super(permits);
    }

    //嘗試獲取鎖
    protected int tryAcquireShared(int acquires) {
        for (;;) {    //自旋
            //同步佇列中是否存在排在當前執行緒前面的執行緒結點
            //也就是說當前執行緒是不是等待最久的執行緒,不是的話,直接不讓獲取鎖
            if (hasQueuedPredecessors())    
                return -1;
            int available = getState();    //獲取同步狀態
            int remaining = available - acquires;    //將要更新的同步狀態值
            //remaining小於0,說明沒有可用的資源了,返回獲取失敗
            //remaining大於等於0且CAS方式更新狀態失敗,則表示有可用資源,繼續迴圈嘗試更新直到成功
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
}

//公平鎖與非公平鎖父類
abstract static class Sync extends AbstractQueuedSynchronizer {
    Sync(int permits) {
        setState(permits);
    }

    //獲取剩餘許可資源數
    final int getPermits() {
        return getState();
    }

    //非公平嘗試獲取鎖,與公平模式基本相似,只是不需要判斷當前執行緒是不是等待最久的執行緒
    final int nonfairTryAcquireShared(int acquires) {
        for (;;) {
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }

    //釋放鎖資源
    protected final boolean tryReleaseShared(int releases) {
        for (;;) {
            int current = getState();    
            int next = current + releases;
            if (next < current)     //超出最大許可值,拋異常    
                throw new Error("Maximum permit count exceeded");
            if (compareAndSetState(current, next))
                return true;
        }
    }

    //減少許可數,即假設當前許可數是5,呼叫該方法會時許可數變為5-reductions
    final void reducePermits(int reductions) {
        for (;;) {
            int current = getState();
            int next = current - reductions;
            if (next > current) // underflow
                throw new Error("Permit count underflow");
            if (compareAndSetState(current, next))
                return;
        }
    }

    //獲取可立即使用的許可數
    final int drainPermits() {
        for (;;) {
            int current = getState();
            if (current == 0 || compareAndSetState(current, 0))
                return current;
        }
    }
}

    瞭解了底層實現,再看看Semaphore獲取許可和釋放許可的過程:

//獲取一個許可,在提供一個許可前一直將執行緒阻塞,或執行緒被中斷
public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

//AQS中的方法,可被中斷的獲取鎖的方法
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();

    //tryAcquireShared呼叫的是公平鎖FairSync或NonfairSync中的重寫方法
    //tryAcquireShared可能失敗,進入doAcquireSharedInterruptibly方法進行自旋或加入同步佇列
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

//嘗試獲取一個許可,在有可用的許可前將其阻塞,且不可被中斷
public void acquireUninterruptibly() {
    sync.acquireShared(1);
}

//AQS中的方法
public final void acquireShared(int arg) {
    //嘗試獲取許可失敗的話,會進行自旋或加入到同步佇列中等待獲取鎖
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);    //AQS中已分析過,不再深入
}

//嘗試獲取一個許可
public boolean tryAcquire() {
    return sync.nonfairTryAcquireShared(1) >= 0;    //只進行一次方式,失敗後直接返回+
}

//在一定時間內嘗試獲取許可
public boolean tryAcquire(long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

//AQS中的在一定時間內嘗試獲取鎖的方法
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    //本質上還是先呼叫tryAcquireShared方法,不成功再在一定時間去自旋或加入同步佇列等待獲取許可
    return tryAcquireShared(arg) >= 0 ||
        doAcquireSharedNanos(arg, nanosTimeout);
}

//獲取多個許可,在提供permits許可前一直將執行緒阻塞,或執行緒被中斷
public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}

//獲取給定數目的許可,在提供這些許可前一直將執行緒阻塞,不可被中斷
public void acquireUninterruptibly(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireShared(permits);
}

//嘗試獲取給定數目的許可,值嘗試一次,失敗直接返回
public boolean tryAcquire(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    return sync.nonfairTryAcquireShared(permits) >= 0;
}

//在給定的時間內嘗試獲取給定數目的許可數,超時返回失敗
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
    throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}

//釋放許可的過程

//釋放一個許可
public void release() {
    sync.releaseShared(1);
}

//AQS中的方法
public final boolean releaseShared(int arg) {
    //嘗試釋放一個許可,不成功則進入doReleaseShared繼續嘗試釋放許可
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

//釋放給定數目的許可
public void release(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.releaseShared(permits);
}