【Java併發工具類】Semaphore
前言
1965年,荷蘭電腦科學家Dijkstra提出的訊號量機制成為一種高效的程序同步機制。這之後的15年,訊號量一直都是併發程式設計領域的終結者。1980年,管程被提出,成為繼訊號量之後的在併發程式設計領域的第二個選擇。目前幾乎所有的語言都支援訊號量機制,Java也不例外。Java中提供了Semaphore
併發工具類來支援訊號量機制。下面我們就來了解Java實現的訊號量機制。
首先介紹訊號量模型,然後介紹如何使用,最後使用訊號量來實現一個限流器。
訊號量模型
訊號量模型圖(圖來自參考[1]):
訊號量模型總結為:一個計數器、一個等待佇列和三個對外呼叫的方法。
計數器和等待佇列時對外透明的,所有我們只能通過三個對外方法來訪問計數器和等待佇列。
init()
:設定計數器的初始值。down()
:計數器的值減一。如果此時計數器的值小於0,則當前執行緒插入等待佇列並阻塞,否則當前執行緒可以繼續執行。up()
:計數器的值加一。如果此時計數器的值小於或者等於0,則喚醒等待佇列中的一個執行緒,並將其從等待佇列中移除。
這三個方法都是原子性的,由實現訊號量模型的方法保證。在Java SDK中,訊號量模型是由java.util.concurrent.Semaphore
實現。
訊號量模型程式碼化大致類似如下:
class Semaphore{ int count; // 計數器 Queue queue; // 等待佇列 // 初始化操作 Semaphore(int c){ this.count=c; } void down(){ this.count--; // 計數器值減一 if(this.count < 0){ // 將當前執行緒插入等待佇列 // 阻塞當前執行緒 } } void up(){ this.count++; // 計數器值加一 if(this.count <= 0) { // 移除等待佇列中的某個執行緒T // 喚醒執行緒T } } }
在訊號量模型中,down()
和up()
這兩個操作也被成為P操作(荷蘭語proberen,測試)和V操作(荷荷蘭語verhogen,增加)。在我學的作業系統教材中(C語言實現),P操作對應wait(),V操作對應singal()。雖然叫法不同,但是語義都是相同的。在Java SDK併發包中,down()
和up()
分別對應於Semaphore中的acquire()
和release()
。
如何使用訊號量
訊號量有時也被稱為紅綠燈,我們想想紅綠燈時怎麼控制交通的,就知道該如何使用訊號量。車輛路過十字路時,需要先檢查是否為綠燈,如果是則通行,否則就等待。想想和加鎖機制有點相似,都是一樣的操作,先檢查是否符合條件(“嘗試獲取”),符合(“獲取到”)則執行緒繼續執行,否則阻塞執行緒。
下面使用累加器的例子來說明如何使用訊號量。
count+=1
操作是個臨界區,只允許一個執行緒執行,即要保證互斥。於是我們在進入臨界區之前,使用down()即Java中的acquire(),在退出之後使用up()即Java中的release()。
static int count;
//初始化訊號量
static final Semaphore s = new Semaphore(1); // 建構函式引數為1,表示只允許一個執行緒進行臨界區。可實現一個互斥鎖的功能。
//用訊號量保證互斥
static void addOne() {
s.acquire(); // 獲取一個許可(可看作加鎖機制中加鎖)
try {
count+=1;
} finally {
s.release(); // 歸還許可(可看做加鎖機制中解鎖)
}
}
完整程式碼如下:
package com.sakura.concrrent;
import java.util.concurrent.Semaphore;
public class SemaphoreTest {
static int count;
static final Semaphore s = new Semaphore(1);
static void addOne() throws InterruptedException {
//只會有一個執行緒將訊號量中的計數器減為1,而另外一個執行緒只能將訊號量中計數器減為-1,導致被阻塞
s.acquire();
try {
count +=1;
System.out.println("Now thread is " + Thread.currentThread() + " and count is " + count);
}finally {
//進入臨界區的執行緒在執行完臨界區程式碼後將訊號量中計數器的值加1然後,此時訊號量中計數器的值為0,則從阻塞佇列中喚醒被阻塞的程序
s.release();
}
}
public static void main(String[] args) {
// 建立兩個執行緒執行
MyThread thread1 = new MyThread();
MyThread thread2 = new MyThread();
thread1.start();
thread2.start();
System.out.println("main thread");
}
}
class MyThread extends Thread{
@Override
public void run() {
super.run();
for(int i=0; i<10; i++) {
try {
SemaphoreTest.addOne();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
執行結果:
如果Semaphore的建構函式引數(許可數量,內建計數器的值)修改一下:
static final Semaphore s = new Semaphore(2);
則計數器值的為2,那麼就允許有兩個執行緒進入臨界區,我們的count值就會出現問題
快速實現一個限流器
當設定訊號量的計數器為1時,可實現一個簡單的互斥鎖功能。但是,我們前面剛介紹過Java SDK中的Lock
,Semaphore的用途顯然不會與Lock一致,不然就重複造輪子了。Semaphore最重要的一個功能便是:可以允許多個執行緒訪問一個臨界區。(上述例子我們就設定了計數器的值為2,可發現thread1和thread2都可進入臨界區。)
我們會在什麼地方遇見這種需求呢?
各種池化資源,例如連線池、物件池、執行緒池等等。例如,資料庫連線池,在同一時刻,一定是允許多個執行緒同時使用連線池,當然,每個連線在被釋放之前,是不允許其他執行緒使用的。
我們設計如下可以允許N個執行緒使用的物件池,我們將訊號量的計數器值設為N,就可以讓N個執行緒同時進行臨界區,多餘的就會被阻塞。(程式碼來自參考[1])
class ObjPool<T, R> {
final List<T> pool; //使用List儲存例項物件
// 用訊號量實現限流器
final Semaphore sem;
// 建構函式
ObjPool(int size, T t){
pool = new Vector<T>(){};
for(int i=0; i<size; i++){
pool.add(t);
}
sem = new Semaphore(size);
}
// 獲取物件池的物件,呼叫 func
R exec(Function<T,R> func) {
T t = null;
sem.acquire(); //允許N個程序同時進入臨界區
try {
//我們需要注意,因為多個進行可以進入臨界區,所以Vector的remove方法是執行緒安全的
t = pool.remove(0);
return func.apply(t); //獲取物件池匯中的一個物件後,呼叫func函式
} finally {
pool.add(t); //離開臨界區之前,將之前獲取的物件放回到池中
sem.release(); //使得計數器加1,如果訊號量中計數器小於等於0,那麼說明有執行緒在等待,此時就會自動喚醒等待執行緒
}
}
}
// 建立物件池
ObjPool<Long, String> pool = new ObjPool<Long, String>(10, 2);
// 通過物件池獲取 t,之後執行
pool.exec(t -> {
System.out.println(t);
return t.toString();
});
小結
記得學習作業系統時,訊號量型別分為了好幾種整型訊號量、記錄型訊號量、AND訊號量以及“訊號量集”(具體瞭解可戳參考[2])。我認為Java SDK中Semaphore應該是記錄型訊號量的實現。不由想起,程式語言是對OS層面操作的一種抽象描述。這句話需要品需要細細品。
參考:
[1] 極客時間專欄王寶令《Java併發程式設計實戰》
[2] 靜水深流.作業系統之訊號量機制總結.https://www.cnblogs.com/IamJiangXiaoKun/p/9464336.h