訊號量解決經典執行緒同步問題
Dijkstra建議設兩種操作:Down和Up。對一訊號量執行Down操作是檢查其值是否大於0;若是則將其值減1(即用掉一個儲存的喚醒訊號)並繼續。若值為0,則程序將睡眠,而且此時Down操作並未結束。檢查數值,改變數值以及可能發生的睡眠操作均作為一個單一的、不可分割的原子操作(atomic action)完成。即保證一旦一個訊號量操作開始,則在操作完成或阻塞之前別的程序均不允許訪問該訊號量。這種原子性對於解決同步間題和避免競爭條件是非常重要的。
Up操作遞增訊號量的值。如果一個或多個程序在該訊號量上睡眠,無法完成一個先前的Down操作、則由系統選擇其中的一個(例如,隨機挑選)並允許其完成它的Down操作。於是,對一個有程序在其上睡眠的訊號量執行一次Up操作之後,該訊號量的值仍舊是0.但在其上睡眠的程序卻少了一個。遞增訊號量的值和喚醒一個程序同樣也是不可分割的。不會有程序因執行Up而阻塞。
JDK的併發包就給我提供了一個類似的訊號量類——Semaphore
,其中的acquire()和release()方法就相當於Down和Up操作,這兩個方法的特殊之處在於對Semaphore物件的操作都是原子操作,由作業系統底層來支援。下面我們講的四種經典執行緒同步問題,都是使用了Java編碼。
1、生產者-消費者問題
由一個大小固定的倉庫,生產者將生產的產品放入倉庫,如果倉庫滿,則生成者被阻塞。消費者將倉庫中的產品拿出來消耗,如果倉庫空,則消費者被阻塞。
import java.util.concurrent.Semaphore; class Signs{ static Semaphore empty=new Semaphore(10); //訊號量:記錄倉庫空的位置 static Semaphore full=new Semaphore(0); //訊號量:記錄倉庫滿的位置 static Semaphore mutex=new Semaphore(1); //臨界區互斥訪問訊號量(二進位制訊號量),相當於互斥鎖。 } class Producer implements Runnable{ public void run(){ try { while(true){ Signs.empty.acquire(); //遞減倉庫空訊號量 Signs.mutex.acquire(); //進入臨界區 System.out.println("生成一個產品放入倉庫"); Signs.mutex.release(); //離開臨界區 Signs.full.release(); //遞增倉庫滿訊號量 Thread.currentThread().sleep(100); } } catch (InterruptedException e) { e.printStackTrace(); } } } class Consumer implements Runnable{ public void run(){ try { while(true){ Signs.full.acquire(); //遞減倉庫滿訊號量 Signs.mutex.acquire(); System.out.println("從倉庫拿出一個產品消費"); Signs.mutex.release(); Signs.empty.release(); //遞增倉庫空訊號量 Thread.currentThread().sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } } } public class Test{ public static void main(String[] args) { new Thread(new Producer()).start(); new Thread(new Consumer()).start(); } }
2、哲學家進餐問題
在1965年,Dijkstra提出並解決了一個他稱之為哲學家進餐的同步問題。從那時起,每個發明新的同步原語的人都希望通過解決哲學家進餐間題來展示其同步原語的精妙之處。這個問題可以簡單地描述:五個哲學家圍坐在一張圓桌周圍,每個哲學家面前都有一碟通心麵,由於麵條很滑,所以要兩把叉子才能夾住。相鄰兩個碟子之間有一把叉子。
哲學家的生活包括兩種活動:即吃飯和思考。當一個哲學家覺得餓時,他就試圖去取他左邊和右邊的叉子。如果成功地獲得兩把叉子,他就開始吃飯,吃完以後放下叉子繼續思考。
import java.util.concurrent.Semaphore; class Signs{ final static int THINKING=0; //哲學家的狀態THINGING final static int EATING=1; //哲學家的狀態EATING static int[] status=new int[5]; //哲學家的狀態,預設都是THINGING static Semaphore[] s=null; //訊號量:記錄哲學家是否可以進餐,不能進餐則堵塞 static Semaphore mutex=new Semaphore(1); //臨界區互斥訪問訊號量(二進位制訊號量),相當於互斥鎖 //初始化每個哲學家的進餐訊號量,預設值都不能進餐 static{ s=new Semaphore[5]; for(int i=0;i<s.length;i++) s[i]=new Semaphore(0); }; } class Philosopher implements Runnable{ private int pid; //當前哲學家的序號 private int lid; //坐在左手邊的哲學家序號 private int rid; //坐在右手邊的哲學家序號 Philosopher(int id){ this.pid=id; this.lid=(id+4)%5; this.rid=(id+1)%5; } private void test(int pid){ //如果當前哲學家左右手邊的人都沒有吃飯,則當前哲學家可以進餐 if(Signs.status[pid]==Signs.THINKING&&Signs.status[lid]!=Signs.EATING&&Signs.status[rid]!=Signs.EATING){ Signs.status[pid]=Signs.EATING; //此時當前哲學家執行緒可以進餐,但其他哲學家執行緒很可能無法進餐 Signs.s[pid].release(); //釋放一個許可 } } public void run(){ try { //嘗試拿起叉子準備進餐 Signs.mutex.acquire(); test(pid); Signs.mutex.release(); //判斷當前哲學家的進餐訊號量,如果不能許可進餐,則當前執行緒阻塞 Signs.s[pid].acquire(); System.out.println("#"+pid+" 號哲學家正在進餐..."); //放下叉子,並喚醒旁邊兩個被阻塞進餐的哲學家,讓他們嘗試進餐 Signs.mutex.acquire(); Signs.status[pid]=Signs.THINKING; test(lid); //讓左手邊的哲學家嘗試拿起叉子,如果可以,則釋放這個哲學家的訊號量許可 test(rid); //同上 Signs.mutex.release(); } catch (InterruptedException e) { e.printStackTrace(); } } } public class Test{ public static void main(String[] args) { new Thread(new Philosopher(0)).start(); new Thread(new Philosopher(1)).start(); new Thread(new Philosopher(2)).start(); new Thread(new Philosopher(3)).start(); new Thread(new Philosopher(4)).start(); } }
3、讀者-寫者問題
讀者一寫者問題(Courtois et al., 1971)為資料庫訪問建立了一個模型。例如,設想一個飛機定票系統,其中有許多競爭的程序試圖讀寫其中的資料。多個程序同時讀是可以接受的,但如果一個程序正在寫資料庫、則所有的其他程序都不能訪問資料庫,即使讀操作也不行。
import java.util.concurrent.Semaphore;
class Sign{
static Semaphore db=new Semaphore(1); //訊號量:控制對資料庫的訪問
static Semaphore mutex=new Semaphore(1); //訊號量:控制對臨界區的訪問
static int rc=0; //記錄正在讀或者想要讀的程序數
}
class Reader implements Runnable{
public void run(){
try {
//互斥對rc的操作
Sign.mutex.acquire();
Sign.rc++; //又多了一個讀執行緒
if(Sign.rc==1) Sign.db.acquire(); //如果是第一個讀程序開始讀取DB,則請求一個許可,使得寫程序無法操作DB
Sign.mutex.release();
//無臨界區控制,多個讀執行緒都可以操作DB
System.out.println("[R] "+Thread.currentThread().getName()+": read data....");
Thread.sleep(100);
//互斥對rc的操作
Sign.mutex.acquire();
Sign.rc--;
if(Sign.rc==0) Sign.db.release(); //如果最後一個讀程序讀完了,則釋放許可,讓寫程序有機會操作DB
Sign.mutex.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Writer implements Runnable{
public void run(){
try {
//與讀操作互斥訪問DB
Sign.db.acquire();
System.out.println("[W] "+Thread.currentThread().getName()+": write data....");
Thread.sleep(100);
Sign.db.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class Test {
public static void main(String[] args){
new Thread(new Reader()).start();
new Thread(new Reader()).start();
new Thread(new Writer()).start();
}
}
4、理髮師問題
另一個經典的問題發生在理髮店裡。理髮店裡有一位理髮師,一把理髮椅和n把供等候理髮的顧客坐的椅子。如果沒有顧客,理髮師便在理髮椅上睡覺,當一個顧客到來時,他必須先叫醒理髮師,如果理髮師正在理髮時又有顧客來到,則如果有空椅子可坐,他們就坐下來等;如果沒有空椅子,他就離開。
Java程式碼:</pre><pre class="java" name="code">import java.util.concurrent.Semaphore;
class Sign{
final static int CHAIRS=5; //椅子數量
static int waiting=0; //等待理髮的顧客數
static Semaphore consumers=new Semaphore(0); //訊號量:等候服務的顧客數
static Semaphore barber=new Semaphore(0); //訊號量:等待顧客的理髮師數
static Semaphore mutex=new Semaphore(1); //訊號量:控制對臨界區的訪問
}
class Barber implements Runnable{
public void run(){
try {
while(true){
Sign.consumers.acquire(); //如果顧客數是0,則理髮師睡覺
//互斥操作顧客等待waiting
Sign.mutex.acquire();
Sign.waiting--;
Sign.barber.release(); //一個理髮師現在開始理髮
Sign.mutex.release();
System.out.println("理髮師"+Thread.currentThread().getName()+"正在理髮...");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Customer implements Runnable{
public void run(){
try {
//互斥操作顧客等待waiting
Sign.mutex.acquire();
//如果還有椅子空餘,則顧客坐在椅子上等待
if(Sign.waiting<Sign.CHAIRS){
Sign.waiting++;
Sign.consumers.release(); //顧客理
Sign.mutex.release();
Sign.barber.acquire(); //如果理髮師數量為0,則顧客等待
System.out.println("顧客"+Thread.currentThread().getName()+"正在被理髮...");
}else{
Sign.mutex.release(); //理髮店滿人,顧客走人
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class Test {
public static void main(String[] args){
new Thread(new Barber()).start();
new Thread(new Customer()).start();
new Thread(new Customer()).start();
new Thread(new Customer()).start();
}
}