1. 程式人生 > >執行緒同步 各個關鍵字和方法的使用

執行緒同步 各個關鍵字和方法的使用



1、volatile關鍵詞:用來對共享變數的訪問進行同步,上一次寫入操作的結果對下一次讀取操作是肯定可見的。(在寫入volatile變數值之後,CPU快取中的內容會被寫回記憶體;在讀取volatile變數時,CPU快取中的對應內容會被置為失效,重新從主存中進行讀取),volatile不使用鎖,效能優於synchronized關鍵詞。

用來確保對一個變數的修改被正確地傳播到其他執行緒中。

例子:A執行緒是Worker,一直跑迴圈,B執行緒呼叫setDone(true),A執行緒即停止任務

[java] view plaincopyprint?
  1. publicclass Worker{  
  2.    privatevolatileboolean done;  
  3.    publicvoid setDone(boolean done){  
  4.       this.done = done;  
  5.    }  
  6.    publicvoid work(){  
  7.       while(!done){  
  8.          //執行任務;
  9.       }  
  10.    }  
  11. }  
public class Worker{
   private volatile boolean done;
   public void setDone(boolean done){
      this.done = done;
   }
   public void work(){
      while(!done){
         //執行任務;
      }
   }
}
例子:錯誤使用。因為沒有鎖的支援,volatile的修改不能依賴於當前值,當前值可能在其他執行緒中被修改。(Worker是直接賦新值與當前值無關) [java] view plaincopyprint?
  1. publicclass Counter {  
  2.     publicvolatilestaticint count = 0;  
  3.     publicstaticvoid inc() {  
  4.         //這裡延遲1毫秒,使得結果明顯
  5.         try {  
  6.             Thread.sleep(1);  
  7.         } catch (InterruptedException e) {  
  8.         }  
  9.         count++;  
  10.     }  
  11.     publicstaticvoid main(String[] args) {  
  12.         //同時啟動1000個執行緒,去進行i++計算,看看實際結果
  13.         for (int i = 0; i < 1000; i++) {  
  14.             new Thread(new Runnable() {  
  15.                 @Override
  16.                 publicvoid run() {  
  17.                     Counter.inc();  
  18.                 }  
  19.             }).start();  
  20.         }  
  21.         //這裡每次執行的值都有可能不同,可能不為1000
  22.         System.out.println("執行結果:Counter.count=" + Counter.count);  
  23.     }  
  24. }  
public class Counter {
    public volatile static int count = 0;
    public static void inc() {
        //這裡延遲1毫秒,使得結果明顯
        try {
            Thread.sleep(1);
        } catch (InterruptedException e) {
        }
        count++;
    }
    public static void main(String[] args) {
        //同時啟動1000個執行緒,去進行i++計算,看看實際結果
        for (int i = 0; i < 1000; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    Counter.inc();
                }
            }).start();
        }
        //這裡每次執行的值都有可能不同,可能不為1000
        System.out.println("執行結果:Counter.count=" + Counter.count);
    }
}
2、final關鍵詞
final關鍵詞宣告的域的值只能被初始化一次,一般在構造方法中初始化。。(在多執行緒開發中,final域通常用來實現不可變物件)

當物件中的共享變數的值不可能發生變化時,在多執行緒中也就不需要同步機制來進行處理,故在多執行緒開發中應儘可能使用不可變物件

另外,在程式碼執行時,final域的值可以被儲存在暫存器中,而不用從主存中頻繁重新讀取。

3、java基本型別的原子操作

1)基本型別,引用型別的複製引用是原子操作;(即一條指令完成)

2)long與double的賦值,引用是可以分割的,非原子操作;

3)要線上程間共享long或double的欄位時,必須在synchronized中操作,或是宣告成volatile

三、Java提供的執行緒同步方式

1、synchronized關鍵字

方法或程式碼塊的互斥性來完成實際上的一個原子操作。(方法或程式碼塊在被一個執行緒呼叫時,其他執行緒處於等待狀態)

所有的Java物件都有一個與synchronzied關聯的監視器物件(monitor),允許執行緒在該監視器物件上進行加鎖和解鎖操作。

a、靜態方法:Java類對應的Class類的物件所關聯的監視器物件。

b、例項方法:當前物件例項所關聯的監視器物件。

c、程式碼塊:程式碼塊宣告中的物件所關聯的監視器物件。

注:當鎖被釋放,對共享變數的修改會寫入主存;當活得鎖,CPU快取中的內容被置為無效。編譯器在處理synchronized方法或程式碼塊,不會把其中包含的程式碼移動到synchronized方法或程式碼塊之外,從而避免了由於程式碼重排而造成的問題。

例:以下方法getNext()和getNextV2() 都獲得了當前例項所關聯的監視器物件

[java] view plaincopyprint?
  1. publicclass SynchronizedIdGenerator{  
  2.    privateint value = 0;  
  3.    publicsynchronizedint getNext(){  
  4.       return value++;  
  5.    }  
  6.    publicint getNextV2(){  
  7.       synchronized(this){  
  8.          return value++;  
  9.       }  
  10.    }  
  11. }  
public class SynchronizedIdGenerator{
   private int value = 0;
   public synchronized int getNext(){
      return value++;
   }
   public int getNextV2(){
      synchronized(this){
         return value++;
      }
   }
}

2、Object類的wait、notify和notifyAll方法

生產者和消費者模式,判斷緩衝區是否滿來消費,緩衝區是否空來生產的邏輯。如果用while 和 volatile也可以做,不過本質上會讓執行緒處於忙等待,佔用CPU時間,對效能造成影響。

Obj.wait(),與Obj.notify()必須要與synchronized(Obj)一起使用,也就是wait,與notify是針對已經獲取了Obj鎖進行操作,從語法角度來說就是Obj.wait(),Obj.notify必須在synchronized(Obj){...}語句塊內。從功能上來說wait就是說執行緒在獲取物件鎖後,主動釋放物件鎖,同時本執行緒休眠。直到有其它執行緒呼叫物件的notify()喚醒該執行緒,才能繼續獲取物件鎖,並繼續執行。相應的notify()就是對物件鎖的喚醒操作。但有一點需要注意的是notify()呼叫後,並不是馬上就釋放物件鎖的,而是在相應的synchronized(){}語句塊執行結束,自動釋放鎖後,JVM會在wait()物件鎖的執行緒中隨機選取一執行緒,賦予其物件鎖,喚醒執行緒,繼續執行。這樣就提供了線上程間同步、喚醒的操作。Thread.sleep()與Object.wait()二者都可以暫停當前執行緒,釋放CPU控制權,主要的區別在於Object.wait()在釋放CPU同時,釋放了物件鎖的控制。

工作流程:

a、Consumer執行緒A 來 看產品,發現產品為空,呼叫產品物件的wait(),執行緒A進入產品物件的等待池並釋放產品的鎖。

b、Producer執行緒B獲得產品的鎖,執行產品的notifyAll(),Consumer執行緒A從產品的等待池進入鎖池,Producer執行緒B生產產品,然後退出釋放鎖。

c、Consumer執行緒A獲得產品鎖,進入執行,發現有產品,消費產品,然後退出。

例子:

package com.page.bjsxt.cude;

public class Test {

	public static Object object = new Object();

	public static void main(String[] args) {
		Thread1 thread1 = new Thread1();
		Thread2 thread2 = new Thread2();
		thread1.start();
		thread2.start();
	}

	static class Thread1 extends Thread {
		@Override
		public void run() {
			synchronized (object) {
				try {
					object.wait();//釋放鎖,該執行緒等待,直到被喚醒
					System.out.println("synchronized (object) 語句塊執行結束...");
					
				} catch (InterruptedException e) {
				}
				System.out.println("執行緒" + Thread.currentThread().getName()
						+ "獲取到了鎖");
			}
		}
	}

	static class Thread2 extends Thread {
		@Override
		public void run() {
			synchronized (object) {
				object.notify();//喚醒object.wait() 並將此synchronized語句塊執行結束;
				System.out.println("執行緒" + Thread.currentThread().getName()
						+ "呼叫了object.notify()");
			}
			System.out.println("執行緒" + Thread.currentThread().getName() + "釋放了鎖");
		}
	}

}
public synchronized String pop(){
  this.notifyAll();// 喚醒物件等待池中的所有執行緒,可能喚醒的就是 生產者(當生產者發現產品滿,就會進入物件的等待池,這裡程式碼省略,基本略同)
   while(index == -1){//如果發現沒產品,就釋放鎖,進入物件等待池
      this.wait();
   }//當生產者生產完後,消費者從this.wait()方法再開始執行,第一次還會執行迴圈,萬一產品還是為空,則再等待,所以這裡必須用while迴圈,不能用if
   String good = buffer[index];
   buffer[index] = null;
   index--;
   return good;// 消費完產品,退出。
}

注:wait()方法有超時和不超時之分,超時的在經過一段時間,執行緒還在物件的等待池中,那麼執行緒也會推出等待狀態。

3、執行緒狀態轉換:

已經廢棄的方法:stop、suspend、resume、destroy,這些方法在實現上時不安全的。

執行緒的狀態:NEW、RUNNABLE、BLOCKED、WAITING、TIMED_WAITING(有超時的等待)、TERMINATED。

a、方法sleep()進入的阻塞狀態,不會釋放物件的鎖(即大家一起睡,誰也別想執行程式碼),所以不要讓sleep方法處在synchronized方法或程式碼塊中,否則造成其他等待獲取鎖的執行緒長時間處於等待。

b、方法join()則是主執行緒等待子執行緒完成,再往下執行。例如main方法新建兩個執行緒A和B

[java] view plaincopyprint?
  1. publicstaticvoid main(String[] args) throws InterruptedException {    
  2. Thread t1 = new Thread(new ThreadTesterA());    
  3. Thread t2 = new Thread(new ThreadTesterB());    
  4. t1.start();    
  5. t1.join(); // 等t1執行完再往下執行
  6. t2.start();    
  7. t2.join(); // 在虛擬機器執行中,這句可能被忽略
  8. }  
public static void main(String[] args) throws InterruptedException {  
Thread t1 = new Thread(new ThreadTesterA());  
Thread t2 = new Thread(new ThreadTesterB());  
t1.start();  
t1.join(); // 等t1執行完再往下執行
t2.start();  
t2.join(); // 在虛擬機器執行中,這句可能被忽略
}

c、方法interrupt(),向被呼叫的物件執行緒發起中斷請求。如執行緒A通過呼叫執行緒B的d的interrupt方法來發出中斷請求,執行緒B來處理這個請求,當然也可以忽略,這不是必須的。Object類的wait()、Thread類的join()和sleep方法都會丟擲受檢異常java.lang.InterruptedException,通過interrupt方法中斷該執行緒會導致執行緒離開等待狀態。對於wait()呼叫來說,執行緒需要重新獲取監視器物件上的鎖之後才能丟擲InterruptedException異常,並致以異常的處理邏輯。

可以通過Thread類的isInterrupted方法來判斷是否有中斷請求發生,通常可以利用這個方法來判斷是否退出執行緒(類似上面的volatitle修飾符的例子);

Thread類還有個方法Interrupted(),該方法不但可以判斷當前執行緒是否被中斷,還會清楚執行緒內部的中斷標記,如果返回true,即曾被請求中斷,同時呼叫完後,清除中斷標記。

如果一個執行緒在某個物件的等待池,那麼notify和interrupt 都可以使該執行緒從等待池中被移除。如果同時發生,那麼看實際發生順序。如果是notify先,那照常喚醒,沒影響。如果是interrupt先,並且虛擬機器選擇讓該執行緒中斷,那麼即使nofity,也會忽略該執行緒,而喚醒等待池中的另一個執行緒。

e、yield(),嘗試讓出所佔有的CPU資源,讓其他執行緒獲取執行機會,對作業系統上的排程器來說是一個訊號,不一定立即切換執行緒。(在實際開發中,測試階段頻繁呼叫yeid方法使執行緒切換更頻繁,從而讓一些多執行緒相關的錯誤更容易暴露出來)。


四、非阻塞方式

執行緒之間同步機制的核心是監視物件上的鎖,競爭鎖來獲得執行程式碼的機會。當一個物件獲取物件的鎖,然後其他嘗試獲取鎖的物件會處於等待狀態,這種鎖機制的實現方式很大程度限制了多執行緒程式的吞吐量和效能(執行緒阻塞),且會帶來死鎖(執行緒A有a物件鎖,等著獲取b物件鎖,執行緒B有b物件鎖,等待獲取a物件鎖)和優先順序倒置(優先順序低的執行緒獲得鎖,優先順序高的只能等待對方釋放鎖)等問題。

如果能不阻塞執行緒,又能保證多執行緒程式的正確性,就能有更好的效能。

在程式中,對共享變數的使用一般遵循一定的模式,即讀取、修改和寫入三步組成。之前碰到的問題是,這三步執行中可能執行緒執行切換,造成非原子操作。鎖機制是把這三步變成一個原子操作。

目前CPU本身實現 將這三步 合起來 形成一個原子操作,無需執行緒鎖機制干預,常見的指令是“比較和替換”(compare and swap,CAS),這個指令會先比較某個記憶體地址的當前值是不是指定的舊指,如果是,就用新值替換,否則什麼也不做,指令返回的結果是記憶體地址的當前值。通過CAS指令可以實現不依賴鎖機制的非阻塞演算法。一般做法是把CAS指令的呼叫放在一個無限迴圈中,不斷嘗試,知道CAS指令成功完成修改。

java.util.concurrent.atomic包中提供了CAS指令。(不是所有CPU都支援CAS,在某些平臺,java.util.concurrent.atomic的實現仍然是鎖機制)

atomic包中提供的Java類分成三類:

1、支援以原子操作來進行更新的資料型別的Java類(AtomicBoolean、AtomicInteger、AtomicReference),在記憶體模型相關的語義上,這四個類的物件類似於volatile變數。

類中的常用方法:

a、compareAndSet:接受兩個引數,一個是期望的舊值,一個是替換的新值。

b、weakCompareAndSet:效果同compareAndSet(JSR中表示weak原子方式讀取和有條件地寫入變數但建立任何 happen-before 排序,但在原始碼中和compareAndSet完全一樣,所以並沒有按JSR實現)

c、get和set:分別用來直接獲取和設定變數的值。

d、lazySet:與set類似,但允許編譯器把lazySet方法的呼叫與後面的指令進行重排,因此對值得設定操作有可能被推遲。

例:

[java] view plaincopyprint?
  1. publicclass AtomicIdGenerator{  
  2.    privatefinal AtomicInter counter = new AtomicInteger(0);  
  3.    publicint getNext(){  
  4.       return counter.getAndIncrement();  
  5.    }  
  6. }  
  7. // getAndIncrement方法的內部實現方式,這也是CAS方法的一般模式,CAS方法不一定成功,所以包裝在一個無限迴圈中,直到成功
  8. publicfinalint getAndIncrement(){  
  9.    for(;;){  
  10.       int current = get();  
  11.       int next = current +1;  
  12.       if(compareAndSet(current,next))  
  13.          return current;  
  14.    }  
  15. }  
public class AtomicIdGenerator{
   private final AtomicInter counter = new AtomicInteger(0);
   public int getNext(){
      return counter.getAndIncrement();
   }
}
// getAndIncrement方法的內部實現方式,這也是CAS方法的一般模式,CAS方法不一定成功,所以包裝在一個無限迴圈中,直到成功
public final int getAndIncrement(){
   for(;;){
      int current = get();
      int next = current +1;
      if(compareAndSet(current,next))
         return current;
   }
}
2、提供對陣列型別的變數進行處理的Java類,AtomicIntegerArray、AtomicLongArray和AtomicReferenceArray類。(同上,只是放在類數組裡,呼叫時也只是多了一個操作元素索引的引數)

3、通過反射的方式對任何物件中包含的volatitle變數使用CAS方法,AtomicIntegerFieldUpdater、AtomicLongFieldUpdater、AtomicReferenceFieldUpdater。他們提供了一種方式把CAS的功能擴充套件到了任何Java類中宣告為volatitle的域上。(靈活,但語義較弱,因為物件的volatitle可能被非atomic的其他方式被修改)

[java] view plaincopyprint?
  1. publicclass TreeNode{  
  2.    privatevolatile TreeNode parent;  
  3. // 靜態工廠方法
  4.    privatestaticfinal AtomicReferenceFieldUpdater<TreeNode, TreeNode> parentUpdater = AtomicReferenceFieldUpdater.newUpdater(TreeNode.class,TreeNode.class,"parent");  
  5. publicboolean compareAndSetParent(TreeNode expect, TreeNode update){  
  6.       return parentUpdater.compareAndSet(this, expect, update);  
  7. }  
  8. }  
public class TreeNode{
   private volatile TreeNode parent;
// 靜態工廠方法
   private static final AtomicReferenceFieldUpdater<TreeNode, TreeNode> parentUpdater = AtomicReferenceFieldUpdater.newUpdater(TreeNode.class,TreeNode.class,"parent");
public boolean compareAndSetParent(TreeNode expect, TreeNode update){
      return parentUpdater.compareAndSet(this, expect, update);
}
}
注:java.util.concurrent.atomic包中的Java類屬於比較底層的實現,一般作為java.util.concurrent包中很多非阻塞的資料結構的實現基礎。

比較多的用AtomicBoolean、AtomicInteger、AtomicLong和AtomicReference。在實現執行緒安全的計數器時,AtomicInteger和AtomicLong類時最佳的選擇。

五、高階同步機制(比synchronized更靈活的加鎖機制)

synchronized和volatile,以及wait、notify等方法抽象層次低,在程式開發中使用比較繁瑣,易出錯。

而多執行緒之間的互動來說,存在某些固定的模式,如生產者-消費者和讀者-寫者模式,把這些模式抽象成高層API,使用起來會非常方便。

java.util.concurrent包為多執行緒提供了高層的API,滿足日常開發中的常見需求。

常用介面

1、Lock介面,表示一個鎖方法:

a、lock(),獲取所,如果無法獲取所鎖,會處於等待狀態

b、unlock(),釋放鎖。(一般放在finally程式碼塊中)

c、lockInterruptibly(),與lock()類似,但允許當前執行緒在等待獲取鎖的過程中被中斷。(所以要處理InterruptedException)

d、tryLock(),以非阻塞方式獲取鎖,如果無法獲取鎖,則返回false。(tryLock()的另一個過載可以指定超時,如果指定超時,當無法獲取鎖,會等待而阻塞,同時執行緒可以被中斷)

2、ReadWriteLock介面,表示兩個鎖,讀取的共享鎖和寫入的排他鎖。(適合常見的讀者--寫者場景)

ReadWriteLock介面的readLock和writeLock方法來獲取對應的鎖的Lock介面的實現。

在多數執行緒讀取,少數執行緒寫入的情況下,可以提高多執行緒的效能,提高使用該資料結構的吞吐量。

如果是相反的情況,較多的執行緒寫入,則介面會降低效能。

3、ReentrantLock類和ReentrantReadWriteLock,分別為上面兩個介面的實現類。

他們具有重入性:即允許一個執行緒多次獲取同一個鎖(他們會記住上次獲取鎖並且未釋放的執行緒物件,和加鎖的次數,getHoldCount())

同一個執行緒每次獲取鎖,加鎖數+1,每次釋放鎖,加鎖數-1,到0,則該鎖被釋放,可以被其他執行緒獲取。

[java] view plaincopyprint?
  1. publicclass LockIdGenrator{  
  2. //new ReentrantLock(true)是過載,使用更加公平的加鎖機制,在鎖被釋放後,會優先給等待時間最長的執行緒,避免一些執行緒長期無法獲得鎖
  3.    privateint ReentrantLock lock = ReentrantLock();  
  4.    privafte int value = 0;  
  5.    publicint getNext(){  
  6.       lock.lock();      //進來就加鎖,沒有鎖會等待
  7.       try{  
  8.          return value++;//實際操作
  9.       }finally{  
  10.          lock.unlock();//釋放鎖
  11.       }  
  12.    }  
  13. }  
public class LockIdGenrator{
//new ReentrantLock(true)是過載,使用更加公平的加鎖機制,在鎖被釋放後,會優先給等待時間最長的執行緒,避免一些執行緒長期無法獲得鎖
   private int ReentrantLock lock = ReentrantLock();
   privafte int value = 0;
   public int getNext(){
      lock.lock();      //進來就加鎖,沒有鎖會等待
      try{
         return value++;//實際操作
      }finally{
         lock.unlock();//釋放鎖
      }
   }
}
注:重入性減少了鎖在各個執行緒之間的等待,例如便利一個HashMap,每次next()之前加鎖,之後釋放,可以保證一個執行緒一口氣完成便利,而不會每次next()之後釋放鎖,然後和其他執行緒競爭,降低了加鎖的代價, 提供了程式整體的吞吐量。(即,讓一個執行緒一口氣完成任務,再把鎖傳遞給其他執行緒)。
4、Condition介面,Lock介面代替了synchronized,Condition介面替代了object的wait、nofity。

a、await(),使當前執行緒進入等待狀態,知道被喚醒或中斷。過載形式可以指定超時時間。

b、awaitNanos(),以納秒為單位等待。

c、awaitUntil(),指定超時發生的時間點,而不是經過的時間,引數為java.util.Date。

d、awaitUninterruptibly(),前面幾種會響應其他執行緒發出的中斷請求,他會無視,直到被喚醒。

注:與Object類的wait()相同,await()會釋放其所持有的鎖。

e、signal()和signalAll, 相當於 notify和notifyAll

[java] view plaincopyprint?
  1. Lock lock = new ReentrantLock();  
  2. Condition condition = lock.newCondition();  
  3. lock.lock();  
  4. try{  
  5.    while(/*邏輯條件不滿足*/){  
  6.       condition.await();     
  7.    }  
  8. }finally{  
  9.    lock.unlock();  
  10. }  
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
lock.lock();
try{
   while(/*邏輯條件不滿足*/){
      condition.await();   
   }
}finally{
   lock.unlock();
}

六、底層同步器

多執行緒程式中,執行緒之間存在多種不同的同步方式。除了Java標準庫提供的同步方式之外,程式中特有的同步方式需要由開發人員自己來實現。

常見的一種需求是 對有限個共享資源的訪問,比如多臺個人電腦,2臺印表機,當多個執行緒在等待同一個資源時,從公平角度出發,會用FIFO佇列。

 如果程式中的同步方式可以抽象成對有限個資源的訪問,那麼可以使用java.util.concurrent.locks包中的AbstractQueuedSynchronizer類和AbstractQueuedLongSynchronizer類作為實現的基礎,前者用int型別的變數來維護內部狀態,而後者用long型別。(可以將這個變數理解為共享資源個數)

通過getState、setState、和compareAndSetState3個方法更新內部變數的值。

AbstractQueuedSynchronizer類是abstract的,需要覆蓋其中包含的部分方法,通常做法是把其作為一個Java類的內部類,外部類提供具體的同步方式,內部類則作為實現的基礎。有兩種模式,排他模式和共享模式,分別對應方法 tryAcquire()、tryRelease 和 tryAcquireShared、tryReleaseShared,在這些方法中,使用getState、setState、compareAndSetState3個方法來修改內部變數的值,以此來反應資源的狀態。

[java] view plaincopyprint?
  1. publicclass SimpleResourceManager{  
  2.    privatefinal InnerSynchronizer synchronizer;  
  3.    privatestaticclass InnerSynchronizer extends AbstractQueuedSynchronizer{  
  4.       InnerSynchronizer(int numOfResources){  
  5.          setState(numOfResources);  
  6.       }  
  7.       protectedint tryAcquireShared(int acquires){  
  8.          for(;;){  
  9.             int available = getState();  
  10.             int remain = available - acquires;  
  11.             if(remain <0 || comapreAndSetState(available, remain){  
  12.                return remain;  
  13.             }  
  14.          }  
  15.       }  
  16.       protectedbooleantry ReleaseShared(int releases){  
  17.          for(;;){  
  18.             int available = getState();   
  19.             int next = available + releases;   
  20.             if(compareAndSetState(available,next){  
  21.                returntrue;  
  22.             }  
  23.          }  
  24.       }  
  25.    }  
  26.    public SimpleResourceManager(int numOfResources){  
  27.       synchronizer = new InnerSynchronizer(numOfResources);  
  28.    }  
  29.    publicvoid acquire() throws InterruptedException{  
  30.       synchronizer.acquireSharedInterruptibly(1);  
  31.    }        
  32.    pubic void release(){      
  33.       synchronizer.releaseShared(1);  
  34.     }  
  35. }  
public class SimpleResourceManager{
   private final InnerSynchronizer synchronizer;
   private static class InnerSynchronizer extends AbstractQueuedSynchronizer{
      InnerSynchronizer(int numOfResources){
         setState(numOfResources);
      }
      protected int tryAcquireShared(int acquires){
         for(;;){
            int available = getState();
            int remain = available - acquires;
            if(remain <0 || comapreAndSetState(available, remain){
               return remain;
            }
         }
      }
      protected boolean try ReleaseShared(int releases){
         for(;;){
            int available = getState(); 
            int next = available + releases; 
            if(compareAndSetState(available,next){
               return true;
            }
         }
      }
   }
   public SimpleResourceManager(int numOfResources){
      synchronizer = new InnerSynchronizer(numOfResources);
   }
   public void acquire() throws InterruptedException{
      synchronizer.acquireSharedInterruptibly(1);
   }      
   pubic void release(){    
      synchronizer.releaseShared(1);
    }
}

七、高階同步物件(提高開發效率)

atomic和locks包提供的Java類可以滿足基本的互斥和同步訪問的需求,但這些Java類的抽象層次較低,使用比較複雜。

更簡單的做法是使用java.util.concurrent包中的高階同步物件。

1、訊號量。

訊號量一般用來數量有限的資源,每類資源有一個物件的訊號量,訊號量的值表示資源的可用數量。

在使用資源時,需要從該訊號量上獲取許可,成功獲取許可,資源的可用數-1;完成對資源的使用,釋放許可,資源可用數+1; 當資源數為0時,需要獲取資源的執行緒以阻塞的方式來等待資源,或過段時間之後再來檢查資源是否可用。(上面的SimpleResourceManager類實際上時訊號量的一個簡單實現)

java.util.concurrent.Semaphore類,在建立Semaphore類的物件時指定資源的可用數

a、acquire(),以阻塞方式獲取許可

b、tryAcquire(),以非阻塞方式獲取許可

c、release(),釋放許可。

d、accquireUninterruptibly(),accquire()方法獲取許可以的過程可以被中斷,如果不希望被中斷,使用此方法。

[java] view plaincopyprint?
  1. publicclass PrinterManager{  
  2.    privatefinal Semphore semaphore;  
  3.    privatefinal List<Printer> printers = new ArrayList<>():  
  4.    public PrinterManager(Collection<? extends Printer> printers){  
  5.       this.printers.addAll(printers);  
  6.       //這裡過載方法,第二個引數為true,以公平競爭模式,防止執行緒飢餓
  7.       this.semaphore = new Semaphore(this.printers.size(),true);  
  8.    }  
  9.    public Printer acquirePrinter() throws InterruptedException{  
  10.       semaphore.acquire();  
  11.       return getAvailablePrinter();  
  12.    }  
  13.    publicvoid releasePrinter(Printer printer){  
  14.       putBackPrinter(pinter);  
  15.       semaphore.release();  
  16.    }  
  17.    privatesynchronized Printer getAvailablePrinter(){  
  18.       printer result = printers.get(0);  
  19.       printers.remove(0);  
  20.       return result;  
  21.    }  
  22.    privatesynchronizedvoid putBackPrinter(Printer printer){  
  23.       printers.add(printer);  
  24.    }  
  25. }  
public class PrinterManager{
   private final Semphore semaphore;
   private final List<Printer> printers = new ArrayList<>():
   public PrinterManager(Collection<? extends Printer> printers){
      this.printers.addAll(printers);
      //這裡過載方法,第二個引數為true,以公平競爭模式,防止執行緒飢餓
      this.semaphore = new Semaphore(this.printers.size(),true);
   }
   public Printer acquirePrinter() throws InterruptedException{
      semaphore.acquire();
      return getAvailablePrinter();
   }
   public void releasePrinter(Printer printer){
      putBackPrinter(pinter);
      semaphore.release();
   }
   private synchronized Printer getAvailablePrinter(){
      printer result = printers.get(0);
      printers.remove(0);
      return result;
   }
   private synchronized void putBackPrinter(Printer printer){
      printers.add(printer);
   }
}
2、倒數閘門

多執行緒協作時,一個執行緒等待另外的執行緒完成任務才能繼續進行。

java.util.concurrent.CountDownLatch類,建立該類時,指定等待完成的任務數;當一個任務完成,呼叫countDonw(),任務數-1。等待任務完成的執行緒通過await(),進入阻塞狀態,直到任務數量為0。CountDownLatch類為一次性,一旦任務數為0,再呼叫await()不再阻塞當前執行緒,直接返回。

例:

[java] view plaincopyprint?
  1. publicclass PageSizeSorter{  
  2.    // 併發效能遠遠優於HashTable的 Map實現,hashTable做任何操作都需要獲得鎖,同一時間只有有個執行緒能使用,而ConcurrentHashMap是分段加鎖,不同執行緒訪問不同的資料段,完全不受影響,忘記HashTable吧。
  3.    privatestaticfinal ConcurrentHashMap<String , Interger> sizeMap = new ConcurrentHashMap<>();  
  4.    privatestaticclass GetSizeWorker implements Runnable{  
  5.       privatefinal String urlString;  
  6.       public GetSizeWorker(String urlString , CountDownLatch signal){  
  7.          this.urlString = urlStirng;  
  8.          this.signal = signal;  
  9.       }  
  10.       publicvoid run(){  
  11.          try{  
  12.             InputStream is = new URL(urlString).openStream();  
  13.             int size = IOUtils.toByteArray(is).length;  
  14.             sizeMap.put(urlString, size);  
  15.          }catch(IOException e){  
  16.             sizeMap.put(urlString, -1);  
  17.          }finally{  
  18.             signal.countDown()://完成一個任務 , 任務數-1
  19.          }  
  20.       }  
  21.    }  
  22.    privatevoid sort(){  
  23.       List<Entry<String, Integer> list = new ArrayList<sizeMap.entrySet());  
  24.       Collections.slort(list, new Comparator<Entry<String,Integer>>(){  
  25.          publicint compare (Entry<String, Integer> o1, Entry<Sting , Integer> o2){  
  26.             return Integer.compare(o2.getValue(),o1.getValue());  
  27.       };  
  28.       System.out.println(Arrays.deepToString(list.toArray()));  
  29.    }  
  30.    publicvoid sortPageSize(Collection<String> urls) throws InterruptedException{  
  31.       CountDownLatch sortSignal = new CountDownLatch(urls.size());  
  32.       for(String url: urls){  
  33.          new Thread(new GetSizeWorker(url, sortSignal)).start();  
  34.       }  
  35.       sortSignal.await()://主執行緒在這裡等待,任務數歸0,則繼續執行
  36.       sort();  
  37.    }  
  38. }  
public class PageSizeSorter{
   // 併發效能遠遠優於HashTable的 Map實現,hashTable做任何操作都需要獲得鎖,同一時間只有有個執行緒能使用,而ConcurrentHashMap是分段加鎖,不同執行緒訪問不同的資料段,完全不受影響,忘記HashTable吧。
   private static final ConcurrentHashMap<String , Interger> sizeMap = new ConcurrentHashMap<>();
   private static class GetSizeWorker implements Runnable{
      private final String urlString;
      public GetSizeWorker(String urlString , CountDownLatch signal){
         this.urlString = urlStirng;
         this.signal = signal;
      }
      public void run(){
         try{
            InputStream is = new URL(urlString).openStream();
            int size = IOUtils.toByteArray(is).length;
            sizeMap.put(urlString, size);
         }catch(IOException e){
            sizeMap.put(urlString, -1);
         }finally{
            signal.countDown()://完成一個任務 , 任務數-1
         }
      }
   }
   private void sort(){
      List<Entry<String, Integer> list = new ArrayList<sizeMap.entrySet());
      Collections.slort(list, new Comparator<Entry<String,Integer>>(){
         public int compare (Entry<String, Integer> o1, Entry<Sting , Integer> o2){
            return Integer.compare(o2.getValue(),o1.getValue());
      };
      System.out.println(Arrays.deepToString(list.toArray()));
   }
   public void sortPageSize(Collection<String> urls) throws InterruptedException{
      CountDownLatch sortSignal = new CountDownLatch(urls.size());
      for(String url: urls){
         new Thread(new GetSizeWorker(url, sortSignal)).start();
      }
      sortSignal.await()://主執行緒在這裡等待,任務數歸0,則繼續執行
      sort();
   }
}

3、迴圈屏障

迴圈屏障在作用上類似倒數閘門,不過他不像倒數閘門是一次性的,可以迴圈使用。另外,執行緒之間是互相平等的,彼此都需要等待對方完成,當一個執行緒完成自己的任務之後,等待其他執行緒完成。當所有執行緒都完成任務之後,所有執行緒才可以繼續執行。

當執行緒之間需要再次進行互相等待時,可以複用同一個迴圈屏障。

類java.uti.concurrent.CyclicBarrier用來表示迴圈屏障,建立時指定使用該物件的執行緒數目,還可以指定一個Runnable介面的物件作為每次迴圈後執行的動作。(當最後一個執行緒完成任務之後,所有執行緒繼續執行之前,被執行。如果執行緒之間需要更新一些共享的內部狀態,可以利用這個Runnalbe介面的物件來處理)。

每個執行緒任務完成之後,通過呼叫await方法進行等待,當所有執行緒都呼叫await方法之後,處於等待狀態的執行緒都可以繼續執行。在所有執行緒中,只要有一個在等待中被中斷,超時或是其他錯誤,整個迴圈屏障會失敗,所有等待中的其他執行緒丟擲java.uti.concurrent.BrokenBarrierException。

例:每個執行緒負責找一個數字區間的質數,當所有執行緒完成後,如果質數數目不夠,繼續擴大範圍查詢

[java] view plaincopyprint?
  1. publicclass PrimeNumber{  
  2.    privatestaticfinalint TOTAL_COUTN = 5000;  
  3.    privatestaticfinalint RANGE_LENGTH= 200;  
  4.    privatestaticfinalint WORKER_NUMBER = 5;  
  5.    privatestatic volatitle boolean done = false;  
  6.    privatestaticint rangeCount = 0;  
  7.    privatestaticfinal List<Long> results = new ArrayList<Long>():  
  8.    privatestaticfinal CyclicBarrier barrier = new CyclicBarrier(WORKER_NUMBER, new Runnable(){  
  9.       publicvoid run(){  
  10.          if(results.size() >= TOTAL_COUNT){  
  11.             done = true;  
  12.          }  
  13.      }  
  14.    });  
  15.    privatestaticclass PrimeFinder implements Runnable{  
  16.       publicvoid run(){  
  17.          while(!done){// 整個過程在一個 while迴圈下,await()等待,下次迴圈開始,會再次判斷 執行條件
  18.             int range = getNextRange();  
  19.             long start = rang * RANGE_LENGTH;  
  20.             long end = (range + 1) * RANGE_LENGTH;  
  21.             for(long i = start; i<end;i++){  
  22.                if(isPrime(i)){  
  23.                   updateResult(i);  
  24.                }  
  25.             }  
  26.             try{  
  27.                barrier.await();  
  28.             }catch (InterruptedException | BokenBarrierException e){  
  29.                done =  true;  
  30.             }  
  31.          }  
  32.       }  
  33.    }  
  34.    privatesynchronizedstaticvoid updateResult(long value){  
  35.       results.add(value);  
  36.    }  
  37.    privatesynchronizedstaticint getNextRange(){  
  38.       return rangeCount++;  
  39.    }  
  40.    privatestaticboolean isPrime(long number){  
  41.       //找質數的程式碼
  42.    }  
  43.    publicvoid calculate(){  
  44.       for(int i=0;i<WORKER_NUMBER;i++){  
  45.