訊號量、柵欄和倒計數器
阿新 • • 發佈:2020-12-02
訊號量、柵欄和倒計數器
學習材料來源於網路
如有侵權,聯絡刪除
Semaphore
簡稱訊號量,可以控制多個執行緒的資源爭搶許可。
- acquire:獲取一個許可,如果沒有就等待,
- release:釋放一個許可。
- availablePermits:方法得到可用的許可數目
適用場景:
1、程式碼併發處理限流
如hystrix框架
示例1:
// 訊號量機制 public class SemaphoreDemo { public static void main(String[] args) { int guestCount = 9; // 客人數量 Semaphore semaphore = new Semaphore(5); // 手牌數量,限制請求數量 for (int i = 0; i < guestCount; i++) { String vipNo = "vip-00" + i; new Thread( () -> { try { semaphore.acquire(); // 獲取令牌 SemaphoreDemo.service(vipNo); semaphore.release(); // 釋放令牌 } catch (InterruptedException e) { e.printStackTrace(); } }) .start(); } } // 限流 控制5個執行緒 同時訪問 public static void service(String vipNo) throws InterruptedException { System.out.println("樓上出來迎接貴賓一位,貴賓編號" + vipNo + ",..."); Thread.sleep(new Random().nextInt(3000)); System.out.println("歡送貴賓出門,貴賓編號" + vipNo); } }
自定義AQS實現訊號量
示例2
SemaphoreAqs
package icu.shaoyayu.multithreading.chapter6; import java.util.Iterator; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.LockSupport; /** * @author shaoyayu * @E_Mail * @Version 1.0.0 * @readme : */ public class SemaphoreAqs { // acquire、 acquireShared : 定義了資源爭用的邏輯,如果沒拿到,則等待。 // tryAcquire、 tryAcquireShared : 實際執行佔用資源的操作,如何判定一個由使用者具體去實現。 // release、 releaseShared : 定義釋放資源的邏輯,釋放之後,通知後續節點進行爭搶。 // tryRelease、 tryReleaseShared: 實際執行資源釋放的操作,具體的AQS使用者去實現。 // 1、 如何判斷一個資源的擁有者 public volatile AtomicReference<Thread> owner = new AtomicReference<>(); // 儲存 正在等待的執行緒 public volatile LinkedBlockingQueue<Thread> waiters = new LinkedBlockingQueue<>(); // 記錄資源狀態 public volatile AtomicInteger state = new AtomicInteger(0); // 共享資源佔用的邏輯,返回資源的佔用情況 public int tryAcquireShared(){ throw new UnsupportedOperationException(); } public void acquireShared(){ boolean addQ = true; while(tryAcquireShared() < 0) { if (addQ) { // 沒拿到鎖,加入到等待集合 waiters.offer(Thread.currentThread()); addQ = false; } else { // 阻塞 掛起當前的執行緒,不要繼續往下跑了 LockSupport.park(); // 偽喚醒,就是非unpark喚醒的 } } waiters.remove(Thread.currentThread()); // 把執行緒移除 } public boolean tryReleaseShared(){ throw new UnsupportedOperationException(); } public void releaseShared(){ if (tryReleaseShared()) { // 通知等待者 Iterator<Thread> iterator = waiters.iterator(); while (iterator.hasNext()) { Thread next = iterator.next(); LockSupport.unpark(next); // 喚醒 } } } // 獨佔資源相關的程式碼 public boolean tryAcquire() { // 交給使用者去實現。 模板方法設計模式 throw new UnsupportedOperationException(); } public void acquire() { boolean addQ = true; while (!tryAcquire()) { if (addQ) { // 沒拿到鎖,加入到等待集合 waiters.offer(Thread.currentThread()); addQ = false; } else { // 阻塞 掛起當前的執行緒,不要繼續往下跑了 LockSupport.park(); // 偽喚醒,就是非unpark喚醒的 } } waiters.remove(Thread.currentThread()); // 把執行緒移除 } public boolean tryRelease() { throw new UnsupportedOperationException(); } public void release() { // 定義了 釋放資源之後要做的操作 if (tryRelease()) { // 通知等待者 Iterator<Thread> iterator = waiters.iterator(); while (iterator.hasNext()) { Thread next = iterator.next(); LockSupport.unpark(next); // 喚醒 } } } public AtomicInteger getState() { return state; } public void setState(AtomicInteger state) { this.state = state; } }
SimpleSemaphore
package icu.shaoyayu.multithreading.chapter6; /** * @author shaoyayu * @E_Mail * @Version 1.0.0 * @readme : */ public class SimpleSemaphore { SemaphoreAqs aqs = new SemaphoreAqs() { @Override public int tryAcquireShared() { // 訊號量獲取, 數量 - 1 for(;;) { int count = getState().get(); int n = count - 1; if(count <= 0 || n < 0) { return -1; } if(getState().compareAndSet(count, n)) { return 1; } } } @Override public boolean tryReleaseShared() { // state + 1 return getState().incrementAndGet() >= 0; } }; /** 許可數量 */ public SimpleSemaphore(int count) { aqs.getState().set(count); // 設定資源的狀態 } public void acquire() { aqs.acquireShared(); } // 獲取令牌 public void release() { aqs.releaseShared(); } // 釋放令牌 }
SemaphoreDemo
package icu.shaoyayu.multithreading.chapter6;
import java.util.Random;
/**
* @author shaoyayu
* @E_Mail
* @Version 1.0.0
* @readme :
*/
// 訊號量機制
public class SemaphoreDemo {
public static void main(String[] args) {
int guestCount = 9; // 客人數量
SimpleSemaphore semaphore = new SimpleSemaphore(5); // 手牌數量,限制請求數量
for (int i = 0; i < guestCount; i++) {
String vipNo = "vip-00" + i;
new Thread(
() -> {
try {
semaphore.acquire(); // 獲取令牌
SemaphoreDemo.service(vipNo);
semaphore.release(); // 釋放令牌
} catch (InterruptedException e) {
e.printStackTrace();
}
})
.start();
}
}
// 限流 控制5個執行緒 同時訪問
public static void service(String vipNo) throws InterruptedException {
System.out.println("樓上出來迎接貴賓一位,貴賓編號" + vipNo + ",...");
Thread.sleep(new Random().nextInt(3000));
System.out.println("歡送貴賓出門,貴賓編號" + vipNo);
}
}
CountDownLatch
倒計數器
建立物件時,傳入指定數值作為執行緒參與的數量;
- await:方法等待計數器值變為0,在這之前,執行緒進入等待狀態;
- countdown:計數器數值減一,直到為0;
經常用於等待其他執行緒執行到某一節點,再繼續執行當前執行緒程式碼
使用場景示例:
1、統計執行緒執行的情況;
2、壓力測試中,使用countDownLatch實現最大程度的併發處理;
3、多個執行緒之間,相互通訊,比如執行緒非同步呼叫完介面,結果通知;
示例3
public static void main(String[] args) throws InterruptedException {
// 一個請求,後臺需要呼叫多個介面 查詢資料
CountDownLatch cdLdemo = new CountDownLatch(10); // 建立,計數數值
for (int i = 0; i < 10; i++) { // 啟動九個執行緒,最後一個兩秒後啟動
int finalI = i;
new Thread(() -> {
// 參與計數
try {
// 等待計數器為0
Thread.sleep(2000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("我是" + Thread.currentThread() + ".我執行介面-" + finalI +"呼叫了");
// 不影響後續操作
cdLdemo.countDown();
}).start();
}
cdLdemo.await(); //等待其他執行緒的執行結束以後
System.out.println("全部執行完畢.我來召喚神龍");
}
自定義AQS實現訊號量
示例4
package icu.shaoyayu.multithreading.chapter6;
import java.util.Iterator;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
/**
* @author shaoyayu
* @E_Mail
* @Version 1.0.0
* @readme :
*/
public class SimpleCDLAqs {
// 同步資源狀態
volatile AtomicInteger state = new AtomicInteger(0);
// 當前鎖的擁有者
protected volatile AtomicReference<Thread> owner = new AtomicReference<>();
// java q 執行緒安全
public volatile LinkedBlockingQueue<Thread> waiters = new LinkedBlockingQueue<>();
public void acquire() {
boolean addQ = true;
while (!tryAcquire()) {
if (addQ) {
// 塞到等待鎖的集合中
waiters.offer(Thread.currentThread());
addQ = false;
} else {
// 掛起這個執行緒
LockSupport.park();
// 後續,等待其他執行緒釋放鎖,收到通知之後繼續迴圈
}
}
waiters.remove(Thread.currentThread());
}
public void release() {
// cas 修改 owner 擁有者
if (tryRelease()) {
Iterator<Thread> iterator = waiters.iterator();
while (iterator.hasNext()) {
Thread waiter = iterator.next();
LockSupport.unpark(waiter); // 喚醒執行緒繼續 搶鎖
}
}
}
// 判斷量夠不夠
public void acquireShared() {
boolean addQ = true;
while (tryAcquireShared() < 0) {
if (addQ) {
// 塞到等待鎖的集合中
waiters.offer(Thread.currentThread());
addQ = false;
} else {
// 掛起這個執行緒
LockSupport.park();
// 後續,等待其他執行緒釋放鎖,收到通知之後繼續迴圈
}
}
waiters.remove(Thread.currentThread());
}
public void releaseShared() {
// cas 修改 owner 擁有者
if (tryReleaseShared()) {
Iterator<Thread> iterator = waiters.iterator();
while (iterator.hasNext()) {
Thread waiter = iterator.next();
LockSupport.unpark(waiter); // 喚醒執行緒繼續 搶鎖
}
}
}
public boolean tryAcquire() {
throw new UnsupportedOperationException();
}
public boolean tryRelease() {
throw new UnsupportedOperationException();
}
public int tryAcquireShared() {
throw new UnsupportedOperationException();
}
public boolean tryReleaseShared() {
throw new UnsupportedOperationException();
}
public AtomicInteger getState() {
return state;
}
public void setState(AtomicInteger state) {
this.state = state;
}
}
package icu.shaoyayu.multithreading.chapter6;
import java.util.Iterator;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
/**
* @author shaoyayu
* @E_Mail
* @Version 1.0.0
* @readme :
*/
public class SimpleCDLAqs {
// 同步資源狀態
volatile AtomicInteger state = new AtomicInteger(0);
// 當前鎖的擁有者
protected volatile AtomicReference<Thread> owner = new AtomicReference<>();
// java q 執行緒安全
public volatile LinkedBlockingQueue<Thread> waiters = new LinkedBlockingQueue<>();
public void acquire() {
boolean addQ = true;
while (!tryAcquire()) {
if (addQ) {
// 塞到等待鎖的集合中
waiters.offer(Thread.currentThread());
addQ = false;
} else {
// 掛起這個執行緒
LockSupport.park();
// 後續,等待其他執行緒釋放鎖,收到通知之後繼續迴圈
}
}
waiters.remove(Thread.currentThread());
}
public void release() {
// cas 修改 owner 擁有者
if (tryRelease()) {
Iterator<Thread> iterator = waiters.iterator();
while (iterator.hasNext()) {
Thread waiter = iterator.next();
LockSupport.unpark(waiter); // 喚醒執行緒繼續 搶鎖
}
}
}
// 判斷量夠不夠
public void acquireShared() {
boolean addQ = true;
while (tryAcquireShared() < 0) {
if (addQ) {
// 塞到等待鎖的集合中
waiters.offer(Thread.currentThread());
addQ = false;
} else {
// 掛起這個執行緒
LockSupport.park();
// 後續,等待其他執行緒釋放鎖,收到通知之後繼續迴圈
}
}
waiters.remove(Thread.currentThread());
}
public void releaseShared() {
// cas 修改 owner 擁有者
if (tryReleaseShared()) {
Iterator<Thread> iterator = waiters.iterator();
while (iterator.hasNext()) {
Thread waiter = iterator.next();
LockSupport.unpark(waiter); // 喚醒執行緒繼續 搶鎖
}
}
}
public boolean tryAcquire() {
throw new UnsupportedOperationException();
}
public boolean tryRelease() {
throw new UnsupportedOperationException();
}
public int tryAcquireShared() {
throw new UnsupportedOperationException();
}
public boolean tryReleaseShared() {
throw new UnsupportedOperationException();
}
public AtomicInteger getState() {
return state;
}
public void setState(AtomicInteger state) {
this.state = state;
}
}
CyclicBarrier
執行緒柵欄
建立物件時,指定柵欄執行緒數量。
await:等指定數量的執行緒都處於等待狀態時,繼續執行後續程式碼。
barrierAction:執行緒數量到了指定量之後,自動觸發執行指定任務。
CounDownLatch重要區別在於,CyclicBarrier物件可多次觸發執行;
典型場景:
1、資料量比較大時,實現批量插入資料到資料庫;
2、資料統計,30個執行緒統計30天資料,全部統計完畢後,執行彙總;
示例5
package icu.shaoyayu.multithreading.chapter6;
import java.util.ArrayList;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* @author shaoyayu
* @E_Mail
* @Version 1.0.0
* @readme :
*/
public class CyclicBarrierTest {
public static void main(String[] args) throws InterruptedException {
LinkedBlockingQueue<String> sqls = new LinkedBlockingQueue<>();
// 任務1+2+3...1000 拆分為100個任務(1+..10, 11+20) -> 100執行緒去處理。
// 每當有4個執行緒處於await狀態的時候,則會觸發barrierAction執行
CyclicBarrier barrier = new CyclicBarrier(4, new Runnable() {
@Override
public void run() {
// 這是每滿足4次資料庫操作,就觸發一次批量執行
System.out.println("有4個執行緒執行了,開始批量插入: " + Thread.currentThread());
for (int i = 0; i < 4; i++) {
System.out.println(sqls.poll());
}
}
});
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
sqls.add("data - " + Thread.currentThread()); // 快取起來
Thread.sleep(1000L); // 模擬資料庫操作耗時
barrier.await(); // 等待柵欄開啟,有4個執行緒都執行到這段程式碼的時候,才會繼續往下執行
System.out.println(Thread.currentThread() + "插入完畢");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
Thread.sleep(2000);
}
}
執行結果
分析。這種任務的4個執行緒組合跑一趟,但是在這個第的問題是執行緒8和執行緒9會一直阻塞等待,