1. 程式人生 > >Java中多執行緒併發體系知識點彙總

Java中多執行緒併發體系知識點彙總

一、多執行緒

1、作業系統有兩個容易混淆的概念,程序和執行緒。

程序:一個計算機程式的執行例項,包含了需要執行的指令;有自己的獨立地址空間,包含程式內容和資料;不同程序的地址空間是互相隔離的;程序擁有各種資源和狀態資訊,包括開啟的檔案、子程序和訊號處理。

執行緒:表示程式的執行流程,是CPU排程執行的基本單位;執行緒有自己的程式計數器、暫存器、堆疊和幀。同一程序中的執行緒共用相同的地址空間,同時共享進程序鎖擁有的記憶體和其他資源。

2、Java標準庫提供了程序和執行緒相關的API,程序主要包括表示程序的java.lang.Process類和建立程序的java.lang.ProcessBuilder類;

表示執行緒的是java.lang.Thread類,在虛擬機器啟動之後,通常只有Java類的main方法這個普通執行緒執行,執行時可以建立和啟動新的執行緒;還有一類守護執行緒(damon thread),守護執行緒在後臺執行,提供程式執行時所需的服務。當虛擬機器中執行的所有執行緒都是守護執行緒時,虛擬機器終止執行。

3、執行緒間的可見性:一個執行緒對程序中共享的資料的修改,是否對另一個執行緒可見

可見性問題:

a、CPU採用時間片輪轉等不同演算法來對執行緒進行排程

 在CODE上檢視程式碼片派生到我的程式碼片

  1. public class IdGenerator{  
  2.    private int value = 0;  
  3.    public int getNext(){  
  4.       return value++;  
  5.    }  
  6. }  

public class IdGenerator{

private int value = 0;

public int getNext(){

return value++;

}

}

對於IdGenerator的getNext()方法,在多執行緒下不能保證返回值是不重複的:各個執行緒之間相互競爭CPU時間來獲取執行機會,CPU切換可能發生在執行間隙。

以上程式碼getNext()的指令序列:CPU切換可能發生在7條指令之間,多個getNext的指令交織在一起。

 在CODE上檢視程式碼片派生到我的程式碼片

  1. aload_0  
  2. dup  
  3. getfield #12  
  4. dup_x1  
  5. iconst_1  
  6. iadd  
  7. putfield #12  

aload_0

dup

getfield #12

dup_x1

iconst_1

iadd

putfield #12

b、CPU快取:

目前CPU一般採用層次結構的多級快取的架構,有的CPU提供了L1、L2和L3三級快取。當CPU需要讀取主存中某個位置的資料時,會一次檢查各級快取中是否存在對應的資料。如果有,直接從快取中讀取,這比從主存中讀取速度快很多。當CPU需要寫入時,資料先被寫入快取中,之後再某個時間點寫回主存。所以某些時間點上,快取中的資料與主存中的資料可能是不一致。

c、指令順序重排

出行效能考慮,編譯器在編譯時可能會對位元組程式碼的指令順序進行重新排列,以優化指令的執行順序,在單執行緒中不會有問題,但在多執行緒可能產生與可見性相關的問題。

二、Java記憶體模型(Java Memory Model)

遮蔽了CPU快取等細節,只關注主存中的共享變數;關注物件的例項域、靜態域和陣列元素;關注執行緒間的動作。

1、volatile關鍵詞:用來對共享變數的訪問進行同步,上一次寫入操作的結果對下一次讀取操作是肯定可見的。(在寫入volatile變數值之後,CPU快取中的內容會被寫回記憶體;在讀取volatile變數時,CPU快取中的對應內容會被置為失效,重新從主存中進行讀取),volatile不使用鎖,效能優於synchronized關鍵詞。

用來確保對一個變數的修改被正確地傳播到其他執行緒中。

例子:A執行緒是Worker,一直跑迴圈,B執行緒呼叫setDone(true),A執行緒即停止任務

 在CODE上檢視程式碼片派生到我的程式碼片

  1. public class Worker{  
  2.    private volatile boolean done;  
  3.    public void setDone(boolean done){  
  4.       this.done = done;  
  5.    }  
  6.    public void work(){  
  7.       while(!done){  
  8.          //執行任務;  
  9.       }  
  10.    }  
  11. }  

public class Worker{

private volatile boolean done;

public void setDone(boolean done){

this.done = done;

}

public void work(){

while(!done){

//執行任務;

}

}

}

例子:錯誤使用。因為沒有鎖的支援,volatile的修改不能依賴於當前值,當前值可能在其他執行緒中被修改。(Worker是直接賦新值與當前值無關)

 在CODE上檢視程式碼片派生到我的程式碼片

  1. public class Counter {  
  2.     public volatile static int count = 0;  
  3.     public static void inc() {  
  4.         //這裡延遲1毫秒,使得結果明顯  
  5.         try {  
  6.             Thread.sleep(1);  
  7.         } catch (InterruptedException e) {  
  8.         }  
  9.         count++;  
  10.     }  
  11.     public static void main(String[] args) {  
  12.         //同時啟動1000個執行緒,去進行i++計算,看看實際結果  
  13.         for (int i = 0; i < 1000; i++) {  
  14.             new Thread(new Runnable() {  
  15.                 @Override  
  16.                 public void run() {  
  17.                     Counter.inc();  
  18.                 }  
  19.             }).start();  
  20.         }  
  21.         //這裡每次執行的值都有可能不同,可能不為1000  
  22.         System.out.println("執行結果:Counter.count=" + Counter.count);  
  23.     }  
  24. }  

public class Counter {

public volatile static int count = 0;

public static void inc() {

//這裡延遲1毫秒,使得結果明顯

try {

Thread.sleep(1);

} catch (InterruptedException e) {

}

count++;

}

public static void main(String[] args) {

//同時啟動1000個執行緒,去進行i++計算,看看實際結果

for (int i = 0; i < 1000; i++) {

new Thread(new Runnable() {

@Override

public void run() {

Counter.inc();

}

}).start();

}

//這裡每次執行的值都有可能不同,可能不為1000

System.out.println("執行結果:Counter.count=" + Counter.count);

}

}

2、final關鍵詞 final關鍵詞宣告的域的值只能被初始化一次,一般在構造方法中初始化。。(在多執行緒開發中,final域通常用來實現不可變物件)

當物件中的共享變數的值不可能發生變化時,在多執行緒中也就不需要同步機制來進行處理,故在多執行緒開發中應儘可能使用不可變物件

另外,在程式碼執行時,final域的值可以被儲存在暫存器中,而不用從主存中頻繁重新讀取。

3、java基本型別的原子操作

1)基本型別,引用型別的複製引用是原子操作;(即一條指令完成)

2)long與double的賦值,引用是可以分割的,非原子操作;

3)要線上程間共享long或double的欄位時,必須在synchronized中操作,或是宣告成volatile

三、Java提供的執行緒同步方式

1、synchronized關鍵字

方法或程式碼塊的互斥性來完成實際上的一個原子操作。(方法或程式碼塊在被一個執行緒呼叫時,其他執行緒處於等待狀態)

所有的Java物件都有一個與synchronzied關聯的監視器物件(monitor),允許執行緒在該監視器物件上進行加鎖和解鎖操作。

a、靜態方法:Java類對應的Class類的物件所關聯的監視器物件。

b、例項方法:當前物件例項所關聯的監視器物件。

c、程式碼塊:程式碼塊宣告中的物件所關聯的監視器物件。

注:當鎖被釋放,對共享變數的修改會寫入主存;當活得鎖,CPU快取中的內容被置為無效。編譯器在處理synchronized方法或程式碼塊,不會把其中包含的程式碼移動到synchronized方法或程式碼塊之外,從而避免了由於程式碼重排而造成的問題。

例:以下方法getNext()和getNextV2() 都獲得了當前例項所關聯的監視器物件

 在CODE上檢視程式碼片派生到我的程式碼片

  1. public class SynchronizedIdGenerator{  
  2.    private int value = 0;  
  3.    public synchronized int getNext(){  
  4.       return value++;  
  5.    }  
  6.    public int getNextV2(){  
  7.       synchronized(this){  
  8.          return value++;  
  9.       }  
  10.    }  
  11. }  

public class SynchronizedIdGenerator{

private int value = 0;

public synchronized int getNext(){

return value++;

}

public int getNextV2(){

synchronized(this){

return value++;

}

}

}

2、Object類的wait、notify和notifyAll方法

生產者和消費者模式,判斷緩衝區是否滿來消費,緩衝區是否空來生產的邏輯。如果用while 和 volatile也可以做,不過本質上會讓執行緒處於忙等待,佔用CPU時間,對效能造成影響。

wait: 將當前執行緒放入,該物件的等待池中,執行緒A呼叫了B物件的wait()方法,執行緒A進入B物件的等待池,並且釋放B的鎖。(這裡,執行緒A必須持有B的鎖,所以呼叫的程式碼必須在synchronized修飾下,否則直接丟擲java.lang.IllegalMonitorStateException異常)。

notify:將該物件中等待池中的執行緒,隨機選取一個放入物件的鎖池,噹噹前執行緒結束後釋放掉鎖, 鎖池中的執行緒即可競爭物件的鎖來獲得執行機會。

notifyAll:將物件中等待池中的執行緒,全部放入鎖池。

(notify鎖喚醒的執行緒選擇由虛擬機器實現來決定,不能保證一個物件鎖關聯的等待集合中的執行緒按照所期望的順序被喚醒,很可能一個執行緒被喚醒之後,發現他所要求的條件並沒有滿足,而重新進入等待池。因為當等待池中包含多個執行緒時,一般使用notifyAll方法,不過該方法會導致執行緒在沒有必要的情況下被喚醒,之後又馬上進入等待池,對效能有影響,不過能保證程式的正確性)

工作流程:

a、Consumer執行緒A 來 看產品,發現產品為空,呼叫產品物件的wait(),執行緒A進入產品物件的等待池並釋放產品的鎖。

b、Producer執行緒B獲得產品的鎖,執行產品的notifyAll(),Consumer執行緒A從產品的等待池進入鎖池,Producer執行緒B生產產品,然後退出釋放鎖。

c、Consumer執行緒A獲得產品鎖,進入執行,發現有產品,消費產品,然後退出。

例子:

 在CODE上檢視程式碼片派生到我的程式碼片

  1. public synchronized String pop(){  
  2.   this.notifyAll();// 喚醒物件等待池中的所有執行緒,可能喚醒的就是 生產者(當生產者發現產品滿,就會進入物件的等待池,這裡程式碼省略,基本略同)  
  3.    while(index == -1){//如果發現沒產品,就釋放鎖,進入物件等待池  
  4.       this.wait();  
  5.    }//當生產者生產完後,消費者從this.wait()方法再開始執行,第一次還會執行迴圈,萬一產品還是為空,則再等待,所以這裡必須用while迴圈,不能用if  
  6.    String good = buffer[index];  
  7.    buffer[index] = null;  
  8.    index--;  
  9.    return good;// 消費完產品,退出。  
  10. }  

public synchronized String pop(){

this.notifyAll();// 喚醒物件等待池中的所有執行緒,可能喚醒的就是 生產者(當生產者發現產品滿,就會進入物件的等待池,這裡程式碼省略,基本略同)

while(index == -1){//如果發現沒產品,就釋放鎖,進入物件等待池

this.wait();

}//當生產者生產完後,消費者從this.wait()方法再開始執行,第一次還會執行迴圈,萬一產品還是為空,則再等待,所以這裡必須用while迴圈,不能用if

String good = buffer[index];

buffer[index] = null;

index--;

return good;// 消費完產品,退出。

}

注:wait()方法有超時和不超時之分,超時的在經過一段時間,執行緒還在物件的等待池中,那麼執行緒也會推出等待狀態。

3、執行緒狀態轉換:

已經廢棄的方法:stop、suspend、resume、destroy,這些方法在實現上時不安全的。

執行緒的狀態:NEW、RUNNABLE、BLOCKED、WAITING、TIMED_WAITING(有超時的等待)、TERMINATED。

a、方法sleep()進入的阻塞狀態,不會釋放物件的鎖(即大家一起睡,誰也別想執行程式碼),所以不要讓sleep方法處在synchronized方法或程式碼塊中,否則造成其他等待獲取鎖的執行緒長時間處於等待。

b、方法join()則是主執行緒等待子執行緒完成,再往下執行。例如main方法新建兩個執行緒A和B

 在CODE上檢視程式碼片派生到我的程式碼片

  1. public static void main(String[] args) throws InterruptedException {    
  2. Thread t1 = new Thread(new ThreadTesterA());    
  3. Thread t2 = new Thread(new ThreadTesterB());    
  4. t1.start();    
  5. t1.join(); // 等t1執行完再往下執行  
  6. t2.start();    
  7. t2.join(); // 在虛擬機器執行中,這句可能被忽略  
  8. }  

public static void main(String[] args) throws InterruptedException {

Thread t1 = new Thread(new ThreadTesterA());

Thread t2 = new Thread(new ThreadTesterB());

t1.start();

t1.join(); // 等t1執行完再往下執行

t2.start();

t2.join(); // 在虛擬機器執行中,這句可能被忽略

}

c、方法interrupt(),向被呼叫的物件執行緒發起中斷請求。如執行緒A通過呼叫執行緒B的d的interrupt方法來發出中斷請求,執行緒B來處理這個請求,當然也可以忽略,這不是必須的。Object類的wait()、Thread類的join()和sleep方法都會丟擲受檢異常java.lang.InterruptedException,通過interrupt方法中斷該執行緒會導致執行緒離開等待狀態。對於wait()呼叫來說,執行緒需要重新獲取監視器物件上的鎖之後才能丟擲InterruptedException異常,並致以異常的處理邏輯。

可以通過Thread類的isInterrupted方法來判斷是否有中斷請求發生,通常可以利用這個方法來判斷是否退出執行緒(類似上面的volatitle修飾符的例子);

Thread類還有個方法Interrupted(),該方法不但可以判斷當前執行緒是否被中斷,還會清楚執行緒內部的中斷標記,如果返回true,即曾被請求中斷,同時呼叫完後,清除中斷標記。

如果一個執行緒在某個物件的等待池,那麼notify和interrupt 都可以使該執行緒從等待池中被移除。如果同時發生,那麼看實際發生順序。如果是notify先,那照常喚醒,沒影響。如果是interrupt先,並且虛擬機器選擇讓該執行緒中斷,那麼即使nofity,也會忽略該執行緒,而喚醒等待池中的另一個執行緒。

e、yield(),嘗試讓出所佔有的CPU資源,讓其他執行緒獲取執行機會,對作業系統上的排程器來說是一個訊號,不一定立即切換執行緒。(在實際開發中,測試階段頻繁呼叫yeid方法使執行緒切換更頻繁,從而讓一些多執行緒相關的錯誤更容易暴露出來)。

四、非阻塞方式

執行緒之間同步機制的核心是監視物件上的鎖,競爭鎖來獲得執行程式碼的機會。當一個物件獲取物件的鎖,然後其他嘗試獲取鎖的物件會處於等待狀態,這種鎖機制的實現方式很大程度限制了多執行緒程式的吞吐量和效能(執行緒阻塞),且會帶來死鎖(執行緒A有a物件鎖,等著獲取b物件鎖,執行緒B有b物件鎖,等待獲取a物件鎖)和優先順序倒置(優先順序低的執行緒獲得鎖,優先順序高的只能等待對方釋放鎖)等問題。

如果能不阻塞執行緒,又能保證多執行緒程式的正確性,就能有更好的效能。

在程式中,對共享變數的使用一般遵循一定的模式,即讀取、修改和寫入三步組成。之前碰到的問題是,這三步執行中可能執行緒執行切換,造成非原子操作。鎖機制是把這三步變成一個原子操作。

目前CPU本身實現 將這三步 合起來 形成一個原子操作,無需執行緒鎖機制干預,常見的指令是“比較和替換”(compare and swap,CAS),這個指令會先比較某個記憶體地址的當前值是不是指定的舊指,如果是,就用新值替換,否則什麼也不做,指令返回的結果是記憶體地址的當前值。通過CAS指令可以實現不依賴鎖機制的非阻塞演算法。一般做法是把CAS指令的呼叫放在一個無限迴圈中,不斷嘗試,知道CAS指令成功完成修改。

java.util.concurrent.atomic包中提供了CAS指令。(不是所有CPU都支援CAS,在某些平臺,java.util.concurrent.atomic的實現仍然是鎖機制)

atomic包中提供的Java類分成三類:

1、支援以原子操作來進行更新的資料型別的Java類(AtomicBoolean、AtomicInteger、AtomicReference),在記憶體模型相關的語義上,這四個類的物件類似於volatile變數。

類中的常用方法:

a、compareAndSet:接受兩個引數,一個是期望的舊值,一個是替換的新值。

b、weakCompareAndSet:效果同compareAndSet(JSR中表示weak原子方式讀取和有條件地寫入變數但不建立任何 happen-before 排序,但在原始碼中和compareAndSet完全一樣,所以並沒有按JSR實現)

c、get和set:分別用來直接獲取和設定變數的值。

d、lazySet:與set類似,但允許編譯器把lazySet方法的呼叫與後面的指令進行重排,因此對值得設定操作有可能被推遲。

例:

 在CODE上檢視程式碼片派生到我的程式碼片

  1. public class AtomicIdGenerator{  
  2.    private final AtomicInter counter = new AtomicInteger(0);  
  3.    public int getNext(){  
  4.       return counter.getAndIncrement();  
  5.    }  
  6. }  
  7. // getAndIncrement方法的內部實現方式,這也是CAS方法的一般模式,CAS方法不一定成功,所以包裝在一個無限迴圈中,直到成功  
  8. public final int getAndIncrement(){  
  9.    for(;;){  
  10.       int current = get();  
  11.       int next = current +1;  
  12.       if(compareAndSet(current,next))  
  13.          return current;  
  14.    }  
  15. }  

public class AtomicIdGenerator{

private final AtomicInter counter = new AtomicInteger(0);

public int getNext(){

return counter.getAndIncrement();

}

}

// getAndIncrement方法的內部實現方式,這也是CAS方法的一般模式,CAS方法不一定成功,所以包裝在一個無限迴圈中,直到成功

public final int getAndIncrement(){

for(;;){

int current = get();

int next = current +1;

if(compareAndSet(current,next))

return current;

}

}

2、提供對陣列型別的變數進行處理的Java類,AtomicIntegerArray、AtomicLongArray和AtomicReferenceArray類。(同上,只是放在類數組裡,呼叫時也只是多了一個操作元素索引的引數)

3、通過反射的方式對任何物件中包含的volatitle變數使用CAS方法,AtomicIntegerFieldUpdater、AtomicLongFieldUpdater、AtomicReferenceFieldUpdater。他們提供了一種方式把CAS的功能擴充套件到了任何Java類中宣告為volatitle的域上。(靈活,但語義較弱,因為物件的volatitle可能被非atomic的其他方式被修改)

 在CODE上檢視程式碼片派生到我的程式碼片

  1. public class TreeNode{  
  2.    private volatile TreeNode parent;  
  3. // 靜態工廠方法  
  4.    private static final AtomicReferenceFieldUpdater<TreeNode, TreeNode> parentUpdater = AtomicReferenceFieldUpdater.newUpdater(TreeNode.class,TreeNode.class,"parent");  
  5. public boolean compareAndSetParent(TreeNode expect, TreeNode update){  
  6.       return parentUpdater.compareAndSet(this, expect, update);  
  7. }  
  8. }  

public class TreeNode{

private volatile TreeNode parent;

// 靜態工廠方法

private static final AtomicReferenceFieldUpdater<TreeNode, TreeNode> parentUpdater = AtomicReferenceFieldUpdater.newUpdater(TreeNode.class,TreeNode.class,"parent");

public boolean compareAndSetParent(TreeNode expect, TreeNode update){

return parentUpdater.compareAndSet(this, expect, update);

}

}

注:java.util.concurrent.atomic包中的Java類屬於比較底層的實現,一般作為java.util.concurrent包中很多非阻塞的資料結構的實現基礎。

比較多的用AtomicBoolean、AtomicInteger、AtomicLong和AtomicReference。在實現執行緒安全的計數器時,AtomicInteger和AtomicLong類時最佳的選擇。  

五、高階同步機制(比synchronized更靈活的加鎖機制)

synchronized和volatile,以及wait、notify等方法抽象層次低,在程式開發中使用比較繁瑣,易出錯。

而多執行緒之間的互動來說,存在某些固定的模式,如生產者-消費者和讀者-寫者模式,把這些模式抽象成高層API,使用起來會非常方便。

java.util.concurrent包為多執行緒提供了高層的API,滿足日常開發中的常見需求。

常用介面

1、Lock介面,表示一個鎖方法:

a、lock(),獲取所,如果無法獲取所鎖,會處於等待狀態

b、unlock(),釋放鎖。(一般放在finally程式碼塊中)

c、lockInterruptibly(),與lock()類似,但允許當前執行緒在等待獲取鎖的過程中被中斷。(所以要處理InterruptedException)

d、tryLock(),以非阻塞方式獲取鎖,如果無法獲取鎖,則返回false。(tryLock()的另一個過載可以指定超時,如果指定超時,當無法獲取鎖,會等待而阻塞,同時執行緒可以被中斷)

2、ReadWriteLock介面,表示兩個鎖,讀取的共享鎖和寫入的排他鎖。(適合常見的讀者--寫者場景)

ReadWriteLock介面的readLock和writeLock方法來獲取對應的鎖的Lock介面的實現。

在多數執行緒讀取,少數執行緒寫入的情況下,可以提高多執行緒的效能,提高使用該資料結構的吞吐量。

如果是相反的情況,較多的執行緒寫入,則介面會降低效能。

3、ReentrantLock類和ReentrantReadWriteLock,分別為上面兩個介面的實現類。

他們具有重入性:即允許一個執行緒多次獲取同一個鎖(他們會記住上次獲取鎖並且未釋放的執行緒物件,和加鎖的次數,getHoldCount())

同一個執行緒每次獲取鎖,加鎖數+1,每次釋放鎖,加鎖數-1,到0,則該鎖被釋放,可以被其他執行緒獲取。

 在CODE上檢視程式碼片派生到我的程式碼片

  1. public class LockIdGenrator{  
  2. //new ReentrantLock(true)是過載,使用更加公平的加鎖機制,在鎖被釋放後,會優先給等待時間最長的執行緒,避免一些執行緒長期無法獲得鎖  
  3.    private int ReentrantLock lock = ReentrantLock();  
  4.    privafte int value = 0;  
  5.    public int getNext(){  
  6.       lock.lock();      //進來就加鎖,沒有鎖會等待  
  7.       try{  
  8.          return value++;//實際操作  
  9.       }finally{  
  10.          lock.unlock();//釋放鎖  
  11.       }  
  12.    }  
  13. }  

public class LockIdGenrator{

//new ReentrantLock(true)是過載,使用更加公平的加鎖機制,在鎖被釋放後,會優先給等待時間最長的執行緒,避免一些執行緒長期無法獲得鎖

private int ReentrantLock lock = ReentrantLock();

privafte int value = 0;

public int getNext(){

lock.lock(); //進來就加鎖,沒有鎖會等待

try{

return value++;//實際操作

}finally{

lock.unlock();//釋放鎖

}

}

}

注:重入性減少了鎖在各個執行緒之間的等待,例如便利一個HashMap,每次next()之前加鎖,之後釋放,可以保證一個執行緒一口氣完成便利,而不會每次next()之後釋放鎖,然後和其他執行緒競爭,降低了加鎖的代價, 提供了程式整體的吞吐量。(即,讓一個執行緒一口氣完成任務,再把鎖傳遞給其他執行緒)。 4、Condition介面,Lock介面代替了synchronized,Condition介面替代了object的wait、nofity。

a、await(),使當前執行緒進入等待狀態,知道被喚醒或中斷。過載形式可以指定超時時間。

b、awaitNanos(),以納秒為單位等待。

c、awaitUntil(),指定超時發生的時間點,而不是經過的時間,引數為java.util.Date。

d、awaitUninterruptibly(),前面幾種會響應其他執行緒發出的中斷請求,他會無視,直到被喚醒。

注:與Object類的wait()相同,await()會釋放其所持有的鎖。

e、signal()和signalAll, 相當於 notify和notifyAll

 在CODE上檢視程式碼片派生到我的程式碼片

  1. Lock lock = new ReentrantLock();  
  2. Condition condition = lock.newCondition();  
  3. lock.lock();  
  4. try{  
  5.    while(/*邏輯條件不滿足*/){  
  6.       condition.await();     
  7.    }  
  8. }finally{  
  9.    lock.unlock();  
  10. }  

Lock lock = new ReentrantLock();

Condition condition = lock.newCondition();

lock.lock();

try{

while(/*邏輯條件不滿足*/){

condition.await();

}

}finally{

lock.unlock();

}

六、底層同步器

多執行緒程式中,執行緒之間存在多種不同的同步方式。除了Java標準庫提供的同步方式之外,程式中特有的同步方式需要由開發人員自己來實現。

常見的一種需求是 對有限個共享資源的訪問,比如多臺個人電腦,2臺印表機,當多個執行緒在等待同一個資源時,從公平角度出發,會用FIFO佇列。

  如果程式中的同步方式可以抽象成對有限個資源的訪問,那麼可以使用java.util.concurrent.locks包中的AbstractQueuedSynchronizer類和AbstractQueuedLongSynchronizer類作為實現的基礎,前者用int型別的變數來維護內部狀態,而後者用long型別。(可以將這個變數理解為共享資源個數)

通過getState、setState、和compareAndSetState3個方法更新內部變數的值。

AbstractQueuedSynchronizer類是abstract的,需要覆蓋其中包含的部分方法,通常做法是把其作為一個Java類的內部類,外部類提供具體的同步方式,內部類則作為實現的基礎。有兩種模式,排他模式和共享模式,分別對應方法 tryAcquire()、tryRelease 和 tryAcquireShared、tryReleaseShared,在這些方法中,使用getState、setState、compareAndSetState3個方法來修改內部變數的值,以此來反應資源的狀態。

 在CODE上檢視程式碼片派生到我的程式碼片

  1. public class SimpleResourceManager{  
  2.    private final InnerSynchronizer synchronizer;  
  3.    private static class InnerSynchronizer extends AbstractQueuedSynchronizer{  
  4.       InnerSynchronizer(int numOfResources){  
  5.          setState(numOfResources);  
  6.       }  
  7.       protected int tryAcquireShared(int acquires){  
  8.          for(;;){  
  9.             int available = getState();  
  10.             int remain = available - acquires;  
  11.             if(remain <0 || comapreAndSetState(available, remain){  
  12.                return remain;  
  13.             }  
  14.          }  
  15.       }  
  16.       protected boolean try ReleaseShared(int releases){  
  17.          for(;;){  
  18.             int available = getState();   
  19.             int next = available + releases;   
  20.             if(compareAndSetState(available,next){  
  21.                return true;  
  22.             }  
  23.          }  
  24.       }  
  25.    }  
  26.    public SimpleResourceManager(int numOfResources){  
  27.       synchronizer = new InnerSynchronizer(numOfResources);  
  28.    }  
  29.    public void acquire() throws InterruptedException{  
  30.       synchronizer.acquireSharedInterruptibly(1);  
  31.    }        
  32.    pubic void release(){      
  33.       synchronizer.releaseShared(1);  
  34.     }  
  35. }  

public class SimpleResourceManager{

private final InnerSynchronizer synchronizer;

private static class InnerSynchronizer extends AbstractQueuedSynchronizer{

InnerSynchronizer(int numOfResources){

setState(numOfResources);

}

protected int tryAcquireShared(int acquires){

for(;;){

int available = getState();

int remain = available - acquires;

if(remain <0 || comapreAndSetState(available, remain){

return remain;

}

}

}

protected boolean try ReleaseShared(int releases){

for(;;){

int available = getState();

int next = available + releases;

if(compareAndSetState(available,next){

return true;

}

}

}

}

public SimpleResourceManager(int numOfResources){

synchronizer = new InnerSynchronizer(numOfResources);

}

public void acquire() throws InterruptedException{

synchronizer.acquireSharedInterruptibly(1);

}

pubic void release(){

synchronizer.releaseShared(1);

}

}

七、高階同步物件(提高開發效率)

atomic和locks包提供的Java類可以滿足基本的互斥和同步訪問的需求,但這些Java類的抽象層次較低,使用比較複雜。

更簡單的做法是使用java.util.concurrent包中的高階同步物件。

1、訊號量。

訊號量一般用來數量有限的資源,每類資源有一個物件的訊號量,訊號量的值表示資源的可用數量。

在使用資源時,需要從該訊號量上獲取許可,成功獲取許可,資源的可用數-1;完成對資源的使用,釋放許可,資源可用數+1; 當資源數為0時,需要獲取資源的執行緒以阻塞的方式來等待資源,或過段時間之後再來檢查資源是否可用。(上面的SimpleResourceManager類實際上時訊號量的一個簡單實現)

java.util.concurrent.Semaphore類,在建立Semaphore類的物件時指定資源的可用數

a、acquire(),以阻塞方式獲取許可

b、tryAcquire(),以非阻塞方式獲取許可

c、release(),釋放許可。

d、accquireUninterruptibly(),accquire()方法獲取許可以的過程可以被中斷,如果不希望被中斷,使用此方法。

 在CODE上檢視程式碼片派生到我的程式碼片

  1. public class PrinterManager{  
  2.    private final Semphore semaphore;  
  3.    private final List<Printer> printers = new ArrayList<>():  
  4.    public PrinterManager(Collection<? extends Printer> printers){  
  5.       this.printers.addAll(printers);  
  6.       //這裡過載方法,第二個引數為true,以公平競爭模式,防止執行緒飢餓  
  7.       this.semaphore = new Semaphore(this.printers.size(),true);  
  8.    }  
  9.    public Printer acquirePrinter() throws InterruptedException{  
  10.       semaphore.acquire();  
  11.       return getAvailablePrinter();  
  12.    }  
  13.    public void releasePrinter(Printer printer){  
  14.       putBackPrinter(pinter);  
  15.       semaphore.release();  
  16.    }  
  17.    private synchronized Printer getAvailablePrinter(){  
  18.       printer result = printers.get(0);  
  19.       printers.remove(0);  
  20.       return result;  
  21.    }  
  22.    private synchronized void putBackPrinter(Printer printer){  
  23.       printers.add(printer);  
  24.    }  
  25. }  

public class PrinterManager{

private final Semphore semaphore;

private final List<Printer> printers = new ArrayList<>():

public PrinterManager(Collection<? extends Printer> printers){

this.printers.addAll(printers);

//這裡過載方法,第二個引數為true,以公平競爭模式,防止執行緒飢餓

this.semaphore = new Semaphore(this.printers.size(),true);

}

public Printer acquirePrinter() throws InterruptedException{

semaphore.acquire();

return getAvailablePrinter();

}

public void releasePrinter(Printer printer){

putBackPrinter(pinter);

semaphore.release();

}

private synchronized Printer getAvailablePrinter(){

printer result = printers.get(0);

printers.remove(0);

return result;

}

private synchronized void putBackPrinter(Printer printer){

printers.add(printer);

}

}

2、倒數閘門

多執行緒協作時,一個執行緒等待另外的執行緒完成任務才能繼續進行。

java.util.concurrent.CountDownLatch類,建立該類時,指定等待完成的任務數;當一個任務完成,呼叫countDonw(),任務數-1。等待任務完成的執行緒通過await(),進入阻塞狀態,直到任務數量為0。CountDownLatch類為一次性,一旦任務數為0,再呼叫await()不再阻塞當前執行緒,直接返回。

例:

 在CODE上檢視程式碼片派生到我的程式碼片

  1. public class PageSizeSorter{  
  2.    // 併發效能遠遠優於HashTable的 Map實現,hashTable做任何操作都需要獲得鎖,同一時間只有有個執行緒能使用,而ConcurrentHashMap是分段加鎖,不同執行緒訪問不同的資料段,完全不受影響,忘記HashTable吧。  
  3.    private static final ConcurrentHashMap<String , Interger> sizeMap = new ConcurrentHashMap<>();  
  4.    private static class GetSizeWorker implements Runnable{  
  5.       private final String urlString;  
  6.       public GetSizeWorker(String urlString , CountDownLatch signal){  
  7.          this.urlString = urlStirng;  
  8.          this.signal = signal;  
  9.       }  
  10.       public void run(){  
  11.          try{  
  12.             InputStream is = new URL(urlString).openStream();  
  13.             int size = IOUtils.toByteArray(is).length;  
  14.             sizeMap.put(urlString, size);  
  15.          }catch(IOException e){  
  16.             sizeMap.put(urlString, -1);  
  17.          }finally{  
  18.             signal.countDown()://完成一個任務 , 任務數-1  
  19.          }  
  20.       }  
  21.    }  
  22.    private void sort(){  
  23.       List<Entry<String, Integer> list = new ArrayList<sizeMap.entrySet());  
  24.       Collections.slort(list, new Comparator<Entry<String,Integer>>(){  
  25.          public int compare (Entry<String, Integer> o1, Entry<Sting , Integer> o2){  
  26.             return Integer.compare(o2.getValue(),o1.getValue());  
  27.       };  
  28.       System.out.println(Arrays.deepToString(list.toArray()));  
  29.    }  
  30.    public void sortPageSize(Collection<String> urls) throws InterruptedException{  
  31.       CountDownLatch sortSignal = new CountDownLatch(urls.size());  
  32.       for(String url: urls){  
  33.          new Thread(new GetSizeWorker(url, sortSignal)).start();  
  34.       }  
  35.       sortSignal.await()://主執行緒在這裡等待,任務數歸0,則繼續執行  
  36.       sort();  
  37.    }  
  38. }  

public class PageSizeSorter{

// 併發效能遠遠優於HashTable的 Map實現,hashTable做任何操作都需要獲得鎖,同一時間只有有個執行緒能使用,而ConcurrentHashMap是分段加鎖,不同執行緒訪問不同的資料段,完全不受影響,忘記HashTable吧。

private static final ConcurrentHashMap<String , Interger> sizeMap = new ConcurrentHashMap<>();

private static class GetSizeWorker implements Runnable{

private final String urlString;

public GetSizeWorker(String urlString , CountDownLatch signal){

this.urlString = urlStirng;

this.signal = signal;

}

public void run(){

try{

InputStream is = new URL(urlString).openStream();

int size = IOUtils.toByteArray(is).length;

sizeMap.put(urlString, size);

}catch(IOException e){

sizeMap.put(urlString, -1);

}finally{

signal.countDown()://完成一個任務 , 任務數-1

}

}

}

private void sort(){

List<Entry<String, Integer> list = new ArrayList<sizeMap.entrySet());

Collections.slort(list, new Comparator<Entry<String,Integer>>(){

public int compare (Entry<String, Integer> o1, Entry<Sting , Integer> o2){

return Integer.compare(o2.getValue(),o1.getValue());

};

System.out.println(Arrays.deepToString(list.toArray()));

}

public void sortPageSize(Collection<String> urls) throws InterruptedException{

CountDownLatch sortSignal = new CountDownLatch(urls.size());

for(String url: urls){

new Thread(new GetSizeWorker(url, sortSignal)).start();

}

sortSignal.await()://主執行緒在這裡等待,任務數歸0,則繼續執行

sort();

}

}

3、迴圈屏障

迴圈屏障在作用上類似倒數閘門,不過他不像倒數閘門是一次性的,可以迴圈使用。另外,執行緒之間是互相平等的,彼此都需要等待對方完成,當一個執行緒完成自己的任務之後,等待其他執行緒完成。當所有執行緒都完成任務之後,所有執行緒才可以繼續執行。

當執行緒之間需要再次進行互相等待時,可以複用同一個迴圈屏障。

類java.uti.concurrent.CyclicBarrier用來表示迴圈屏障,建立時指定使用該物件的執行緒數目,還可以指定一個Runnable介面的物件作為每次迴圈後執行的動作。(當最後一個執行緒完成任務之後,所有執行緒繼續執行之前,被執行。如果執行緒之間需要更新一些共享的內部狀態,可以利用這個Runnalbe介面的物件來處理)。

每個執行緒任務完成之後,通過呼叫await方法進行等待,當所有執行緒都呼叫await方法之後,處於等待狀態的執行緒都可以繼續執行。在所有執行緒中,只要有一個在等待中被中斷,超時或是其他錯誤,整個迴圈屏障會失敗,所有等待中的其他執行緒丟擲java.uti.concurrent.BrokenBarrierException。

例:每個執行緒負責找一個數字區間的質數,當所有執行緒完成後,如果質數數目不夠,繼續擴大範圍查詢

 在CODE上檢視程式碼片派生到我的程式碼片

  1. public class PrimeNumber{  
  2.    private static final int TOTAL_COUTN = 5000;  
  3.    private static final int RANGE_LENGTH= 200;  
  4.    private static final int WORKER_NUMBER = 5;  
  5.    private static volatitle boolean done = false;  
  6.    private static int rangeCount = 0;  
  7.    private static final List<Long> results = new ArrayList<Long>():  
  8.    private static final CyclicBarrier barrier = new CyclicBarrier(WORKER_NUMBER, new Runnable(){  
  9.       public void run(){  
  10.          if(results.size() >= TOTAL_COUNT){  
  11.             done = true;  
  12.          }  
  13.      }  
  14.    });  
  15.    private static class PrimeFinder implements Runnable{  
  16.       public void run(){  
  17.          while(!done){// 整個過程在一個 while迴圈下,await()等待,下次迴圈開始,會再次判斷 執行條件  
  18.             int range = getNextRange();  
  19.             long start = rang * RANGE_LENGTH;  
  20.             long end = (range + 1) * RANGE_LENGTH;  
  21.             for(long i = start; i<end;i++){  
  22.                if(isPrime(i)){  
  23.                   updateResult(i);  
  24.                }  
  25.             }  
  26.             try{  
  27.                barrier.await();  
  28.             }catch (InterruptedException | BokenBarrierException e){  
  29.                done =  true;  
  30.             }  
  31.          }  
  32.       }  
  33.    }  
  34.    private synchronized static void updateResult(long value){  
  35.       results.add(value);  
  36.    }  
  37.    private synchronized static int getNextRange(){  
  38.       return rangeCount++;  
  39.    }  
  40.    private static boolean isPrime(long number){  
  41.       //找質數的程式碼  
  42.    }  
  43.    public void calculate(){  
  44.       for(int i=0;i<WORKER_NUMBER;i++){  
  45.          new Thread(new PrimeFinder()).start();  
  46.       }  
  47.       while(!done){  
  48.       }  
  49.       //計算完成  
  50.    }  
  51. }  

public class PrimeNumber{

private static final int TOTAL_COUTN = 5000;

private static final int RANGE_LENGTH= 200;

private static final int WORKER_NUMBER = 5;

private static volatitle boolean done = false;

private static int rangeCount = 0;

private static final List<Long> results = new ArrayList<Long>():

private static final CyclicBarrier barrier = new CyclicBarrier(WORKER_NUMBER, new Runnable(){

public void run(){

if(results.size() >= TOTAL_COUNT){

done = true;

}

}

});

private static class PrimeFinder implements Runnable{

public void run(){

while(!done){// 整個過程在一個 while迴圈下,await()等待,下次迴圈開始,會再次判斷 執行條件

int range = getNextRange();

long start = rang * RANGE_LENGTH;

long end = (range + 1) * RANGE_LENGTH;

for(long i = start; i<end;i++){

if(isPrime(i)){

updateResult(i);

}

}

try{

barrier.await();

}catch (InterruptedException | BokenBarrierException e){

done = true;

}

}

}

}

private synchronized static void updateResult(long value){

results.add(value);

}

private synchronized static int getNextRange(){

return rangeCount++;

}

private static boolean isPrime(long number){

//找質數的程式碼

}

public void calculate(){

for(int i=0;i<WORKER_NUMBER;i++){

new Thread(new PrimeFinder()).start();

}

while(!done){

}

//計算完成

}

}

4、物件交換器 適合於兩個執行緒需要進行資料交換的場景。(一個執行緒完成後,把結果交給另一個執行緒繼續處理)

java.util.concurrent.Exchanger類,提供了這種物件交換能力,兩個執行緒共享一個Exchanger類的物件,一個執行緒完成對資料的處理之後,呼叫Exchanger類的exchange()方法把處理之後的資料作為引數傳送給另外一個執行緒。而exchange方法的返回結果是另外一個執行緒鎖提供的相同型別的物件。如果另外一個執行緒未完成對資料的處理,那麼exchange()會使當前執行緒進入等待狀態,直到另外一個執行緒也呼叫了exchange方法來進行資料交換。

例:

 在CODE上檢視程式碼片派生到我的程式碼片

  1. public class SendAndReceiver{  
  2.    private final Exchanger<StringBuilder> exchanger = new Exchanger<StringBuilder>();  
  3.    private class Sender implements Runnable{  
  4.       public void run(){  
  5.          try{  
  6.             StringBuilder content = new StringBuilder("Hello");  
  7.             content = exchanger.exchange(content);  
  8.          }catch(InterruptedException e){  
  9.             Thread.currentThread().interrupt();  
  10.          }  
  11.       }  
  12.    }  
  13.    private class Receiver implements Runnable{  
  14.       public void run(){  
  15.          try{  
  16.             StringBuilder content = new StringBuilder("World");  
  17.             content = exchanger.exchange(content);  
  18.          }catch(InterruptedException e){  
  19.             Thread.currentThread().interrupt();  
  20.          }  
  21.       }  
  22.    }  
  23.    public void exchange(){  
  24.       new Thread(new Sender()).start();  
  25.       new Thread(new Receiver()).start();  
  26.    }  
  27. }  

public class SendAndReceiver{

private final Exchanger<StringBuilder> exchanger = new Exchanger<StringBuilder>();

private class Sender implements Runnable{

public void run(){

try{

StringBuilder content = new StringBuilder("Hello");

content = exchanger.exchange(content);

}catch(InterruptedException e){

Thread.currentThread().interrupt();

}

}

}

private class Receiver implements Runnable{

public void run(){

try{

StringBuilder content = new StringBuilder("World");

content = exchanger.exchange(content);

}catch(InterruptedException e){

Thread.currentThread().interrupt();

}

}

}

public void exchange(){

new Thread(new Sender()).start();

new Thread(new Receiver()).start();

}

}

八、資料結構(多執行緒程式使用的高效能資料結構)

java.util.concurrent包中提供了一些適合多執行緒程式使用的高效能資料結構,包括佇列和集合類物件等。

1、佇列

a、BlockingQueue介面:執行緒安全的阻塞式佇列;當佇列已滿時,想佇列新增會阻塞;當佇列空時,取資料會阻塞。(非常適合消費者-生產者模式)

阻塞方式:put()、take()。

非阻塞方式:offer()、poll()。

實現類:基於陣列的固定元素個數的ArrayBolockingQueue和基於連結串列結構的不固定元素個數的LinkedBlockQueue類。

b、BlockingDeque介面: 與BlockingQueue相似,但可以對頭尾進行新增和刪除操作的雙向佇列;方法分為兩類,分別在隊首和對尾進行操作。

實現類:標準庫值提供了一個基於連結串列的實現,LinkedBlockgingDeque

2、集合類

在多執行緒程式中,如果共享變數時集合類的物件,則不適合直接使用java.util包中的集合類。這些類要麼不是執行緒安全,要麼在多執行緒下效能比較差。

應該