1. 程式人生 > >死磕 java同步系列之Semaphore原始碼解析

死磕 java同步系列之Semaphore原始碼解析

問題

(1)Semaphore是什麼?

(2)Semaphore具有哪些特性?

(3)Semaphore通常使用在什麼場景中?

(4)Semaphore的許可次數是否可以動態增減?

(5)Semaphore如何實現限流?

簡介

Semaphore,訊號量,它儲存了一系列的許可(permits),每次呼叫acquire()都將消耗一個許可,每次呼叫release()都將歸還一個許可。

特性

Semaphore通常用於限制同一時間對共享資源的訪問次數上,也就是常說的限流。

下面我們一起來學習Java中Semaphore是如何實現的。

類結構

Semaphore

Semaphore中包含了一個實現了AQS的同步器Sync,以及它的兩個子類FairSync和NonFairSync,這說明Semaphore也是區分公平模式和非公平模式的。

原始碼分析

基於之前對於ReentrantLock和ReentrantReadWriteLock的分析,這篇文章相對來說比較簡單,之前講過的一些方法將直接略過,有興趣的可以拉到文章底部檢視之前的文章。

內部類Sync

// java.util.concurrent.Semaphore.Sync
abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 1192457210091910933L;
    // 構造方法,傳入許可次數,放入state中
    Sync(int permits) {
        setState(permits);
    }
    // 獲取許可次數
    final int getPermits() {
        return getState();
    }
    // 非公平模式嘗試獲取許可
    final int nonfairTryAcquireShared(int acquires) {
        for (;;) {
            // 看看還有幾個許可
            int available = getState();
            // 減去這次需要獲取的許可還剩下幾個許可
            int remaining = available - acquires;
            // 如果剩餘許可小於0了則直接返回
            // 如果剩餘許可不小於0,則嘗試原子更新state的值,成功了返回剩餘許可
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
    // 釋放許可
    protected final boolean tryReleaseShared(int releases) {
        for (;;) {
            // 看看還有幾個許可
            int current = getState();
            // 加上這次釋放的許可
            int next = current + releases;
            // 檢測溢位
            if (next < current) // overflow
                throw new Error("Maximum permit count exceeded");
            // 如果原子更新state的值成功,就說明釋放許可成功,則返回true
            if (compareAndSetState(current, next))
                return true;
        }
    }
    // 減少許可
    final void reducePermits(int reductions) {
        for (;;) {
            // 看看還有幾個許可
            int current = getState();
            // 減去將要減少的許可
            int next = current - reductions;
            // 檢測舉出
            if (next > current) // underflow
                throw new Error("Permit count underflow");
            // 原子更新state的值,成功了返回true
            if (compareAndSetState(current, next))
                return;
        }
    }
    // 銷燬許可
    final int drainPermits() {
        for (;;) {
            // 看看還有幾個許可
            int current = getState();
            // 如果為0,直接返回
            // 如果不為0,把state原子更新為0
            if (current == 0 || compareAndSetState(current, 0))
                return current;
        }
    }
}

通過Sync的幾個實現方法,我們獲取到以下幾點資訊:

(1)許可是在構造方法時傳入的;

(2)許可存放在狀態變數state中;

(3)嘗試獲取一個許可的時候,則state的值減1;

(4)當state的值為0的時候,則無法再獲取許可;

(5)釋放一個許可的時候,則state的值加1;

(6)許可的個數可以動態改變;

內部類NonfairSync

// java.util.concurrent.Semaphore.NonfairSync
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -2694183684443567898L;
    // 構造方法,呼叫父類的構造方法
    NonfairSync(int permits) {
        super(permits);
    }
    // 嘗試獲取許可,呼叫父類的nonfairTryAcquireShared()方法
    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }
}

非公平模式下,直接呼叫父類的nonfairTryAcquireShared()嘗試獲取許可。

內部類FairSync

// java.util.concurrent.Semaphore.FairSync
static final class FairSync extends Sync {
    private static final long serialVersionUID = 2014338818796000944L;
    // 構造方法,呼叫父類的構造方法
    FairSync(int permits) {
        super(permits);
    }
    // 嘗試獲取許可
    protected int tryAcquireShared(int acquires) {
        for (;;) {
            // 公平模式需要檢測是否前面有排隊的
            // 如果有排隊的直接返回失敗
            if (hasQueuedPredecessors())
                return -1;
            // 沒有排隊的再嘗試更新state的值
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
}

公平模式下,先檢測前面是否有排隊的,如果有排隊的則獲取許可失敗,進入佇列排隊,否則嘗試原子更新state的值。

構造方法

// 構造方法,建立時要傳入許可次數,預設使用非公平模式
public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}
// 構造方法,需要傳入許可次數,及是否公平模式
public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

建立Semaphore時需要傳入許可次數。

Semaphore預設也是非公平模式,但是你可以呼叫第二個構造方法宣告其為公平模式。

下面的方法在學習過前面的內容看來都比較簡單,彤哥這裡只列舉Semaphore支援的一些功能了。

以下的方法都是針對非公平模式來描述。

acquire()方法

public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

獲取一個許可,預設使用的是可中斷方式,如果嘗試獲取許可失敗,會進入AQS的佇列中排隊。

acquireUninterruptibly()方法

public void acquireUninterruptibly() {
    sync.acquireShared(1);
}

獲取一個許可,非中斷方式,如果嘗試獲取許可失敗,會進入AQS的佇列中排隊。

tryAcquire()方法

public boolean tryAcquire() {
    return sync.nonfairTryAcquireShared(1) >= 0;
}

嘗試獲取一個許可,使用Sync的非公平模式嘗試獲取許可方法,不論是否獲取到許可都返回,只嘗試一次,不會進入佇列排隊。

tryAcquire(long timeout, TimeUnit unit)方法

public boolean tryAcquire(long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

嘗試獲取一個許可,先嚐試一次獲取許可,如果失敗則會等待timeout時間,這段時間內都沒有獲取到許可,則返回false,否則返回true;

release()方法

public void release() {
    sync.releaseShared(1);
}

釋放一個許可,釋放一個許可時state的值會加1,並且會喚醒下一個等待獲取許可的執行緒。

acquire(int permits)方法

public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}

一次獲取多個許可,可中斷方式。

acquireUninterruptibly(int permits)方法

public void acquireUninterruptibly(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireShared(permits);
}

一次獲取多個許可,非中斷方式。

tryAcquire(int permits)方法

public boolean tryAcquire(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    return sync.nonfairTryAcquireShared(permits) >= 0;
}

一次嘗試獲取多個許可,只嘗試一次。

tryAcquire(int permits, long timeout, TimeUnit unit)方法

public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
    throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}

嘗試獲取多個許可,並會等待timeout時間,這段時間沒獲取到許可則返回false,否則返回true。

release(int permits)方法

public void release(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.releaseShared(permits);
}

一次釋放多個許可,state的值會相應增加permits的數量。

availablePermits()方法

public int availablePermits() {
    return sync.getPermits();
}

獲取可用的許可次數。

drainPermits()方法

public int drainPermits() {
    return sync.drainPermits();
}

銷燬當前可用的許可次數,對於已經獲取的許可沒有影響,會把當前剩餘的許可全部銷燬。

reducePermits(int reduction)方法

protected void reducePermits(int reduction) {
    if (reduction < 0) throw new IllegalArgumentException();
    sync.reducePermits(reduction);
}

減少許可的次數。

總結

(1)Semaphore,也叫訊號量,通常用於控制同一時刻對共享資源的訪問上,也就是限流場景;

(2)Semaphore的內部實現是基於AQS的共享鎖來實現的;

(3)Semaphore初始化的時候需要指定許可的次數,許可的次數是儲存在state中;

(4)獲取一個許可時,則state值減1;

(5)釋放一個許可時,則state值加1;

(6)可以動態減少n個許可;

(7)可以動態增加n個許可嗎?

彩蛋

(1)如何動態增加n個許可?

答:呼叫release(int permits)即可。我們知道釋放許可的時候state的值會相應增加,再回頭看看釋放許可的原始碼,發現與ReentrantLock的釋放鎖還是有點區別的,Semaphore釋放許可的時候並不會檢查當前執行緒有沒有獲取過許可,所以可以呼叫釋放許可的方法動態增加一些許可。

(2)如何實現限流?

答:限流,即在流量突然增大的時候,上層要能夠限制住突然的大流量對下游服務的衝擊,在分散式系統中限流一般做在閘道器層,當然在個別功能中也可以自己簡單地來限流,比如秒殺場景,假如只有10個商品需要秒殺,那麼,服務本身可以限制同時只進來100個請求,其它請求全部作廢,這樣服務的壓力也不會太大。

使用Semaphore就可以直接針對這個功能來限流,以下是程式碼實現:

public class SemaphoreTest {
    public static final Semaphore SEMAPHORE = new Semaphore(100);
    public static final AtomicInteger failCount = new AtomicInteger(0);
    public static final AtomicInteger successCount = new AtomicInteger(0);

    public static void main(String[] args) {
        for (int i = 0; i < 1000; i++) {
            new Thread(()->seckill()).start();
        }
    }

    public static boolean seckill() {
        if (!SEMAPHORE.tryAcquire()) {
            System.out.println("no permits, count="+failCount.incrementAndGet());
            return false;
        }

        try {
            // 處理業務邏輯
            Thread.sleep(2000);
            System.out.println("seckill success, count="+successCount.incrementAndGet());
        } catch (InterruptedException e) {
            // todo 處理異常
            e.printStackTrace();
        } finally {
            SEMAPHORE.release();
        }
        return true;
    }
}

推薦閱讀

1、 死磕 java同步系列之開篇

2、 死磕 java魔法類之Unsafe解析

3、 死磕 java同步系列之JMM(Java Memory Model)

4、 死磕 java同步系列之volatile解析

5、 死磕 java同步系列之synchronized解析

6、 死磕 java同步系列之自己動手寫一個鎖Lock

7、 死磕 java同步系列之AQS起篇

8、 死磕 java同步系列之ReentrantLock原始碼解析(一)——公平鎖、非公平鎖

9、 死磕 java同步系列之ReentrantLock原始碼解析(二)——條件鎖

10、 死磕 java同步系列之ReentrantLock VS synchronized

11、 死磕 java同步系列之ReentrantReadWriteLock原始碼解析

歡迎關注我的公眾號“彤哥讀原始碼”,檢視更多原始碼系列文章, 與彤哥一起暢遊原始碼的海洋。

相關推薦

java同步系列Semaphore原始碼解析

問題 (1)Semaphore是什麼? (2)Semaphore具有哪些特性? (3)Semaphore通常使用在什麼場景中? (

java同步系列ReentrantLock原始碼解析(一)——公平鎖、非公平鎖

問題 (1)重入鎖是什麼? (2)ReentrantLock如何實現重入鎖? (3)ReentrantLock為什麼預設是非公平模式? (4)ReentrantLock除了可重入還有哪些特性? 簡介 Reentrant = Re + entrant,Re是重複、又、再的意思,entrant是enter的名詞或

java同步系列ReentrantLock原始碼解析(二)——條件鎖

問題 (1)條件鎖是什麼? (2)條件鎖適用於什麼場景? (3)條件鎖的await()是在其它執行緒signal()的時候喚醒的嗎? 簡介 條件鎖,是指在獲取鎖之後發現當前業務場景自己無法處理,而需要等待某個條件的出現才可以繼續處理時使用的一種鎖。 比如,在阻塞佇列中,當佇列中沒有元素的時候是無法彈出一個元素

java同步系列ReentrantReadWriteLock原始碼解析

問題 (1)讀寫鎖是什麼? (2)讀寫鎖具有哪些特性? (3)ReentrantReadWriteLock是怎麼實現讀寫鎖的? (4)如何使用ReentrantReadWriteLock實現高效安全的TreeMap? 簡介 讀寫鎖是一種特殊的鎖,它把對共享資源的訪問分為讀訪問和寫訪問,多個執行緒可以同時對共享

java同步系列StampedLock原始碼解析

問題 (1)StampedLock是什麼? (2)StampedLock具有什麼特性? (3)StampedLock是否支援可重入

java同步系列CyclicBarrier原始碼解析——有圖有真相

問題 (1)CyclicBarrier是什麼? (2)CyclicBarrier具有什麼特性? (3)CyclicBarrier與

java同步系列Phaser原始碼解析

問題 (1)Phaser是什麼? (2)Phaser具有哪些特性? (3)Phaser相對於CyclicBarrier和Count

java同步系列開篇

討論 關註 使用 避免死鎖 更新數據 讀寫 上下文切換 monit 缺點 簡介 同步系列,這是彤哥想了好久的名字,本來是準備寫鎖相關的內容,但是java中的CountDownLatch、Semaphore、CyclicBarrier這些類又不屬於鎖,它們和鎖又有很多共同點,

java同步系列JMM(Java Memory Model)

簡介 Java記憶體模型是在硬體記憶體模型上的更高層的抽象,它遮蔽了各種硬體和作業系統訪問的差異性,保證了Java程式在各種平臺下對記憶體的訪問都能達到一致的效果。 硬體記憶體模型 在正式講解Java的記憶體模型之前,我們有必要先了解一下硬體層面的一些東西。 在現代計算機的硬體體系中,CPU的運算速度是非常快

java同步系列volatile解析

問題 (1)volatile是如何保證可見性的? (2)volatile是如何禁止重排序的? (3)volatile的實現原理? (4)volatile的缺陷? 簡介 volatile可以說是Java虛擬機器提供的最輕量級的同步機制了,但是它並不容易被正確地理解,以至於很多人不習慣使用它,遇到多執行緒問題一律

java同步系列synchronized解析

問題 (1)synchronized的特性? (2)synchronized的實現原理? (3)synchronized是否可重入? (4)synchronized是否是公平鎖? (5)synchronized的優化? (6)synchronized的五種使用方式? 簡介 synchronized關鍵字是Ja

java同步系列自己動手寫一個鎖Lock

問題 (1)自己動手寫一個鎖需要哪些知識? (2)自己動手寫一個鎖到底有多簡單? (3)自己能不能寫出來一個完美的鎖? 簡介 本篇文章的目標一是自己動手寫一個鎖,這個鎖的功能很簡單,能進行正常的加鎖、解鎖操作。 本篇文章的目標二是通過自己動手寫一個鎖,能更好地理解後面章節將要學習的AQS及各種同步器實現的原理

java同步系列AQS起篇

問題 (1)AQS是什麼? (2)AQS的定位? (3)AQS的實現原理? (4)基於AQS實現自己的鎖? 簡介 AQS的全稱是AbstractQueuedSynchronizer,它的定位是為Java中幾乎所有的鎖和同步器提供一個基礎框架。 AQS是基於FIFO的佇列實現的,並且內部維護了一個狀態變數sta

java同步系列ReentrantLock VS synchronized——結果可能跟你想的不一樣

問題 (1)ReentrantLock有哪些優點? (2)ReentrantLock有哪些缺點? (3)ReentrantLock

java同步系列AQS終篇(面試)

問題 (1)AQS的定位? (2)AQS的重要組成部分? (3)AQS運用的設計模式? (4)AQS的總體流程? 簡介 AQS的全稱是AbstractQueuedSynchronizer,它的定位是為Java中幾乎所有的鎖和同步器提供一個基礎框架。 在之前的章節中,我們一起學習了ReentrantLock、R

java同步系列mysql分散式鎖

問題 (1)什麼是分散式鎖? (2)為什麼需要分散式鎖? (3)mysql如何實現分散式鎖? (4)mysql分散式鎖的優點和缺點? 簡介 隨著併發量的不斷增加,單機的服務遲早要向多節點或者微服務進化,這時候原來單機模式下使用的synchronized或者ReentrantLock將不再適用,我們迫切地需要一

java同步系列zookeeper分散式鎖

(2)zookeeper分散式鎖有哪些優點? (3)zookeeper分散式鎖有哪些缺點? 簡介 zooKeeper是一個分散式的,開放原始碼的分散式應用程式協調服務,它可以為分散式應用提供一致性服務,它是Hadoop和Hbase的重要元件,同時也可以作為配置中心、註冊中心運用在微服務體系中。 本章我們將介

java同步系列redis分散式鎖進化史

(2)redis分散式鎖有哪些優點? (3)redis分散式鎖有哪些缺點? (4)redis實現分散式鎖有沒有現成的輪子可以使用? 簡介 Redis(全稱:Remote Dictionary Server 遠端字典服務)是一個開源的使用ANSI C語言編寫、支援網路、可基於記憶體亦可持久化的日誌型、Key-

java同步系列終結篇

腦圖 下面是關於同步系列的一份腦圖,列舉了主要的知識點和問題點,看過本系列文章的同學可以根據腦圖自行回顧所學的內容,也可以作為面試前的準備。 如果有需要高清無碼原圖的同學,可以關注公眾號“彤哥讀原始碼”,回覆“sync”領取。 總結 所謂同步,就是保證多執行緒(包括多程序)對共享資源的讀寫能夠安全有效的執