【TcaplusDB知識庫】表操作—如何查詢事務詳情
Semaphore
目錄1、Semaphore概念
Semaphore,俗稱訊號量,它是作業系統中PV操作的原語在java的實現,它也是基於AbstractQueuedSynchronizer實現的。
Semaphore的功能非常強大,大小為1的訊號量就類似於互斥鎖,通過同時只能有一個執行緒獲取訊號量實現。大小為n(n>0)的訊號量可以實現限流的功能,它可以實現只能有n個執行緒同時獲取訊號量
1.1、PV操作
PV操作是作業系統一種實現程序互斥與同步的有效方法。PV操作與訊號量(S)的處理相關,P表示通過的意思,V表示釋放的意思。用PV操作來管理共享資源時,首先要確保PV操作自身執行的正確性。
P操作的主要動作是:
①S減1;
②若S減1後仍大於或等於0,則程序繼續執行;
③若S減1後小於0,則該程序被阻塞後放入等待該訊號量的等待佇列中,然後轉程序排程。
V操作的主要動作是:
①S加1;
②若相加後結果大於0,則程序繼續執行;
③若相加後結果小於或等於0,則從該訊號的等待佇列中釋放一個等待程序,然後再返回原程序繼續執行或轉程序排程。
2、Semaphore 常用方法
2.1、構造器確定資源數目
permits 表示許可證的數量(資源數)
fair 表示公平性,如果這個設為 true 的話,下次執行的執行緒會是等待最久的執行緒 。
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
檢視原始碼進去之後,可以看到最終到了:
protected final void setState(int newState) {
state = newState;
}
可以看到傳入進去的引數將會作為資源數目的多少來進行定義。
3、常用方法
public void acquire() throws InterruptedException
public boolean tryAcquire()
public void release()
public int availablePermits()
public final int getQueueLength()
public final boolean hasQueuedThreads()
protected void reducePermits(int reduction)
protected Collection<Thread> getQueuedThreads()
方法名稱 | 語義 | |
---|---|---|
acquire() | 表示阻塞並獲取許可 | |
tryAcquire() | 方法在沒有許可的情況下會立即返回 false,要獲取許可的執行緒不會阻塞 | |
release() | 表示釋放許可 | |
int availablePermits() | 返回此訊號量中當前可用的許可證數。 | |
int getQueueLength() | 返回正在等待獲取許可證的執行緒數。 | |
boolean hasQueuedThreads(): | 是否有執行緒正在等待獲取許可證。 | |
void reducePermit(int reduction) | 減少 reduction 個許可證 | |
Collection getQueuedThreads() | 返回所有等待獲取許可證的執行緒集合 |
4、應用場景
可以用於做流量控制,特別是公用資源有限的應用場景
示例:
/**
* @Description 有三個許可,五個執行緒來搶。效果:三個執行緒執行成功,另外兩個執行緒進行排隊
* @Author liguang
* @Date 2022/03/18/11:13
*/
public class SemphoreTestOne {
public static void main(String[] args) {
Semaphore windows = new Semaphore(3);
for (int i = 0; i < 5; i++) {
new Thread(()->{
try {
windows.acquire();
System.out.println("開始賣票");
Thread.sleep(5000);
System.out.println("購票成功");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
windows.release();
}
}).start();
}
}
}
控制檯展示:
開始賣票
開始賣票
開始賣票
-----------------停頓5s鍾----------------
購票成功
購票成功
開始賣票
開始賣票
購票成功
效果上:許可只有三個,但是有多個執行緒,那麼多個執行緒來獲取得到三個有限的資源的時候,對於已經獲取得到了的執行緒來說,是沒有任何影響,而沒有獲取得到的執行緒只能夠排隊等待資源。
5、原始碼分析
這裡將會涉及到共享鎖的一些概念。
從兩行程式碼中來進行分析:
Semaphore windows = new Semaphore(3);
try {
windows.acquire();
xxxxx(業務方法);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
windows.release();
}
那麼在acquire和release就可以實現這樣的邏輯,那麼看下具體的實現邏輯。
5.1、acquire
只需要來分析這段程式碼即可:
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
那麼看下嘗試獲取得到共享鎖,當對應的值小於0的時候,才會去將執行緒進行阻塞。
首先看下嘗試獲取得到共享鎖的方法(非公平場景下來進行分析):
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
// 1、當許可的數量小於0(表示的當前資源不足);2、大於等於0的時候,表示資源還是足夠的,可以分配給當前執行緒
if (remaining < 0 || compareAndSetState(available, remaining))
// 許可剩餘數量
return remaining;
}
}
對於沒有獲取得到成功的,那麼將繼續迴圈操作。最終將返回許可的剩餘數量。
那麼在回到上面的方法中來,只有當許可小於0的時候,才會將對應的執行緒進行阻塞。而大於等於0的,則是直接獲取得到資源進行操作了
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 這裡和之前的還是有區別的,這裡使用到節點的另外一種狀態。SHARED,共享狀態。構建一個共享佇列來進行建立
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
// 如果是頭節點!那麼這裡再來嘗試獲取得到一次鎖,如果得到的
int r = tryAcquireShared(arg);
// 如果獲取得到的資源大於等於0,那麼將會進行下面的操作
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 和之前的都是一樣的!
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
那麼下面只需要關注一下setHeadAndPropagate方法即可
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// 在這裡要來喚醒下一個節點。這裡直接在這裡來進行喚醒了!
if (s == null || s.isShared())
// 可以看到釋放鎖也是在這裡來進行操作的。在這裡直接來通知後面的排隊的執行緒來進行操作。
// 喚醒後面的執行緒來搶鎖
doReleaseShared();
}
}
5.2、release
釋放也是非常簡單的,直接上原始碼:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
那麼繼續看下後面的方法:
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
直接獲取得到原來的值,然後進行相加重新設定+1即可。
看看釋放鎖的過程:
private void doReleaseShared() {
for (;;) {
Node h = head;
// 表示當前是有節點的
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
6、總結
這個原始碼比較簡單,所以不過來過多解釋。無非是檢測到了狀態是SHARED並且不為null的,直接進行喚醒來嘗試通知下一個執行緒即可。
但是使用的時候需要注意的一點是:要有效率的來獲取得到許可數量。
如下所示:
/**
* @Description 獲取得到許可過大。注意這裡的許可數量設定
* @Author liguang
* @Date 2022/03/18/11:13
*/
public class SemphoreTestTwo {
public static void main(String[] args) {
Semaphore windows = new Semaphore(5);
for (int i = 0; i < 5; i++) {
new Thread(()->{
try {
windows.acquire(6);
System.out.println("開始賣票");
Thread.sleep(5000);
System.out.println("購票成功");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
windows.release();
}
}).start();
}
}
}
這種操作最終將會導致所有的執行緒都將會在阻塞佇列中進行排隊,等待被啟用。無法被喚醒,除非是因為操作不當,因為中斷引起。