1. 程式人生 > 其它 >【TcaplusDB知識庫】表操作—如何查詢事務詳情

【TcaplusDB知識庫】表操作—如何查詢事務詳情

Semaphore

目錄

1、Semaphore概念

Semaphore,俗稱訊號量,它是作業系統中PV操作的原語在java的實現,它也是基於AbstractQueuedSynchronizer實現的。

Semaphore的功能非常強大,大小為1的訊號量就類似於互斥鎖,通過同時只能有一個執行緒獲取訊號量實現。大小為n(n>0)的訊號量可以實現限流的功能,它可以實現只能有n個執行緒同時獲取訊號量

1.1、PV操作

PV操作是作業系統一種實現程序互斥與同步的有效方法。PV操作與訊號量(S)的處理相關,P表示通過的意思,V表示釋放的意思。用PV操作來管理共享資源時,首先要確保PV操作自身執行的正確性。

P操作的主要動作是

①S減1;

②若S減1後仍大於或等於0,則程序繼續執行;

③若S減1後小於0,則該程序被阻塞後放入等待該訊號量的等待佇列中,然後轉程序排程。

V操作的主要動作是

①S加1;

②若相加後結果大於0,則程序繼續執行;

③若相加後結果小於或等於0,則從該訊號的等待佇列中釋放一個等待程序,然後再返回原程序繼續執行或轉程序排程。

2、Semaphore 常用方法

2.1、構造器確定資源數目

permits 表示許可證的數量(資源數)

fair 表示公平性,如果這個設為 true 的話,下次執行的執行緒會是等待最久的執行緒 。

    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }


    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

檢視原始碼進去之後,可以看到最終到了:

    protected final void setState(int newState) {
        state = newState;
    }

可以看到傳入進去的引數將會作為資源數目的多少來進行定義。

3、常用方法

public void acquire() throws InterruptedException
public boolean tryAcquire()
public void release()
public int availablePermits()
public final int getQueueLength()
public final boolean hasQueuedThreads()
protected void reducePermits(int reduction)
protected Collection<Thread> getQueuedThreads()
方法名稱 語義
acquire() 表示阻塞並獲取許可
tryAcquire() 方法在沒有許可的情況下會立即返回 false,要獲取許可的執行緒不會阻塞
release() 表示釋放許可
int availablePermits() 返回此訊號量中當前可用的許可證數。
int getQueueLength() 返回正在等待獲取許可證的執行緒數。
boolean hasQueuedThreads() 是否有執行緒正在等待獲取許可證。
void reducePermit(int reduction) 減少 reduction 個許可證
Collection getQueuedThreads() 返回所有等待獲取許可證的執行緒集合

4、應用場景

可以用於做流量控制,特別是公用資源有限的應用場景

示例:

/**
 * @Description 有三個許可,五個執行緒來搶。效果:三個執行緒執行成功,另外兩個執行緒進行排隊
 * @Author liguang
 * @Date 2022/03/18/11:13
 */
public class SemphoreTestOne {
    public static void main(String[] args) {
        Semaphore windows = new Semaphore(3);
        for (int i = 0; i < 5; i++) {
            new Thread(()->{
                try {
                    windows.acquire();
                    System.out.println("開始賣票");
                    Thread.sleep(5000);
                    System.out.println("購票成功");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                  windows.release();
                }
            }).start();
        }
    }
}

控制檯展示:

開始賣票
開始賣票
開始賣票
-----------------停頓5s鍾----------------    
購票成功
購票成功
開始賣票
開始賣票
購票成功

效果上:許可只有三個,但是有多個執行緒,那麼多個執行緒來獲取得到三個有限的資源的時候,對於已經獲取得到了的執行緒來說,是沒有任何影響,而沒有獲取得到的執行緒只能夠排隊等待資源。

5、原始碼分析

這裡將會涉及到共享鎖的一些概念。

從兩行程式碼中來進行分析:

 Semaphore windows = new Semaphore(3);
try {
    windows.acquire();
    xxxxx(業務方法);
} catch (InterruptedException e) {
    e.printStackTrace();
} finally {
    windows.release();
}

那麼在acquire和release就可以實現這樣的邏輯,那麼看下具體的實現邏輯。

5.1、acquire

只需要來分析這段程式碼即可:

        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);

那麼看下嘗試獲取得到共享鎖,當對應的值小於0的時候,才會去將執行緒進行阻塞。

首先看下嘗試獲取得到共享鎖的方法(非公平場景下來進行分析):

final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();
        int remaining = available - acquires;
        // 1、當許可的數量小於0(表示的當前資源不足);2、大於等於0的時候,表示資源還是足夠的,可以分配給當前執行緒
        if (remaining < 0 || compareAndSetState(available, remaining))
            // 許可剩餘數量
            return remaining;
    }
}

對於沒有獲取得到成功的,那麼將繼續迴圈操作。最終將返回許可的剩餘數量。

那麼在回到上面的方法中來,只有當許可小於0的時候,才會將對應的執行緒進行阻塞。而大於等於0的,則是直接獲取得到資源進行操作了

    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        // 這裡和之前的還是有區別的,這裡使用到節點的另外一種狀態。SHARED,共享狀態。構建一個共享佇列來進行建立
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    // 如果是頭節點!那麼這裡再來嘗試獲取得到一次鎖,如果得到的
                    int r = tryAcquireShared(arg);
                    // 如果獲取得到的資源大於等於0,那麼將會進行下面的操作
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                // 和之前的都是一樣的!
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

那麼下面只需要關注一下setHeadAndPropagate方法即可

    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head;
        setHead(node);
        if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            // 在這裡要來喚醒下一個節點。這裡直接在這裡來進行喚醒了!
            if (s == null || s.isShared())
                // 可以看到釋放鎖也是在這裡來進行操作的。在這裡直接來通知後面的排隊的執行緒來進行操作。
                // 喚醒後面的執行緒來搶鎖
                doReleaseShared();
        }
    }

5.2、release

釋放也是非常簡單的,直接上原始碼:

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

那麼繼續看下後面的方法:

        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

直接獲取得到原來的值,然後進行相加重新設定+1即可。

看看釋放鎖的過程:

    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            // 表示當前是有節點的
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

6、總結

這個原始碼比較簡單,所以不過來過多解釋。無非是檢測到了狀態是SHARED並且不為null的,直接進行喚醒來嘗試通知下一個執行緒即可。

但是使用的時候需要注意的一點是:要有效率的來獲取得到許可數量。

如下所示:

/**
 * @Description 獲取得到許可過大。注意這裡的許可數量設定
 * @Author liguang
 * @Date 2022/03/18/11:13
 */
public class SemphoreTestTwo {
    public static void main(String[] args) {
        
        Semaphore windows = new Semaphore(5);
        for (int i = 0; i < 5; i++) {
            new Thread(()->{
                try {
                    windows.acquire(6);
                    System.out.println("開始賣票");
                    Thread.sleep(5000);
                    System.out.println("購票成功");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                  windows.release();
                }
            }).start();
        }
    }
}

這種操作最終將會導致所有的執行緒都將會在阻塞佇列中進行排隊,等待被啟用。無法被喚醒,除非是因為操作不當,因為中斷引起。