並發編程(三):從AQS到CountDownLatch與ReentrantLock
阿新 • • 發佈:2017-07-01
splay public 繼續 for admin font 通信 html integer
以RenentrantLock為例,如何知道共享資源是否有線程正在被訪問呢?其實,它有一個state變量初始值為0,表示未鎖定狀態。當線程A訪問的時候,state+1,就代表該線程鎖定了共享資源,其他線程將無法訪問,而當線程A訪問完共享資源以後,state-1,直到state等於0,就將釋放對共享變量的鎖定,其他線程將可以搶占式或者公平式爭奪。當然,它支持可重入,那什麽是可重入呢?同一線程可以重復鎖定共享資源,每鎖定一次state+1,也就是鎖定多次。說明:鎖定多少次就要釋放多少次。
什麽是共享式呢?
以CountDownLatch為例,共享資源可以被N個線程訪問,也就是初始化的時候,state就被指定為N(N與線程個數相等),線程countDown()一次,state會CAS減1,直到所有線程執行完(state=0),那些await()的線程將被喚醒去執行執行剩余動作。
什麽是CAS?CAS的定義為Compare-And-Swap,語義為比較並且交換。在深入理解JVM書中,談到自旋鎖,因為鎖的堵塞釋放對於cpu資源的損害很高,那麽自旋鎖就是當線程A訪問共享資源的時候,其他線程並不放棄對鎖的持有,它們在不停循環,不斷嘗試的獲取鎖,直到獲得鎖就停止循環,自旋鎖是對於資源共享的一種優化手段,但是它適用於對鎖持有時間比較短的情況。
獨占式lock流程(unlock同理):
一、目錄
1、AQS簡要分析 2、談CountDownLatch 3、談ReentrantLock 4、談消費者與生產者模式(notfiyAll/wait、signAll/await、condition)二、AQS簡要分析
問題:AQS是什麽?有什麽用? AQS是什麽? 字面上看,它被稱為抽象隊列式的同步器(AbstractQueuedSynchronizer)。簡單說,它就是一個同步隊列容器。 AQS有什麽用?- 為什麽會產生ArrayList、LinkedList、HashMap這些容器?它們底層實現無非都是對數組、鏈表、樹的操作,至於它們的產生,就是因為對編程人員對於數組、鏈表、樹的增刪改查操作非常繁瑣而提出的解決方案。
- 那為什麽會產生AQS呢?談到同步,大家最容易想到的就是在多線程中如何確保安全的資源共享。那同步隊列就是為了解決資源共享的同步容器。像上述容器一樣,在頂層就設計好,編程人員只需要調用接口就能輕易實現復雜的資源共享問題。
- 調用自定義同步器的tryAcquire()嘗試直接去獲取資源,如果成功就返回。
- 沒成功,則addWaiter()將線程加入等待隊列的尾部,並標記為獨享模式。
- acquireQueued()使線程在等待隊列中休息,有機會時會去嘗試獲得資源。獲得資源後返回。如果整個過程有中斷過返回true,否則返回false。
- 如果線程在等待過程中中斷過,它是不響應的。只是獲得資源後才再進行自我中斷selfInterrupt(),將中斷補上。
- tryAcquireShared()嘗試獲取資源,成功則直接返回。
- 失敗則通過 doAcquireShared()進入等待隊列,直到被喚醒或者中斷並且成功獲取資源才返回。
- 不同:獨占式是只喚醒後繼節點。共享式是喚醒後繼,後繼還會去喚醒它的後繼,從而實現共享。
以上是核心的關於CountDownLatch、ReentrantLock的分析。由於博主研究程度有限,想更深層次研究,請參考:Java並發AQS詳解
三、淺談CountDownLatch
CountDownLatch是什麽? 有什麽用? CountDownLatch是一個同步容器,但是有人叫它發令槍,也有人叫它門閂。初始化設定線程的個數,調用countDownLatch.await()阻塞所有線程,直到countDownLatch.countDown()為0,那麽將繼續執行剩余的操作。例如,跑步比賽,所有線程都await()在起跑線,當所有人告訴裁判準備好了,裁判發令槍一響,運動員開炮。門閂道理一樣,門不開全給我等著! 作用:為了實現同步共享數據的一種更加高效的解決辦法。/** * CountDownLatch相當於指令槍或者門閂,所有線程都awit()阻塞在起跑線,只有countDown到state為0,其他線程才能往下運行。 * @author qiuyongAaron */ public class CountDownLatchDemo { private static final int PLAYER_NUM=5; public static void main(String[] args) { CountDownLatch start=new CountDownLatch(1); CountDownLatch end =new CountDownLatch(PLAYER_NUM); Player [] players=new Player[PLAYER_NUM]; for(int i=0;i<PLAYER_NUM;i++) players[i]=new Player(start, end, i); //指定線程個數的線程池! ExecutorService exe=Executors.newFixedThreadPool(PLAYER_NUM); for(Player player:players) exe.execute(player); System.out.println("比賽開始!"); //比賽開始! start.countDown(); try { end.await(); } catch (InterruptedException e) { e.printStackTrace(); }finally{ System.out.println("比賽結束!"); exe.shutdown(); } } } class Player implements Runnable{ private CountDownLatch start; private CountDownLatch end; private int id; Random random=new Random(); public Player(CountDownLatch start,CountDownLatch end,int id) { this.start=start; this.end=end; this.id=id; } @Override public void run() { try { //等待比賽開始。 start.await(); TimeUnit.SECONDS.sleep(random.nextInt(10)); System.out.println("Player-"+id+":arrived"); } catch (InterruptedException e) { e.printStackTrace(); }finally{ //選手-id到達終點,end計數為0結束比賽! end.countDown(); } } } //運行結果: 比賽開始! Player-3:arrived Player-4:arrived Player-0:arrived Player-1:arrived Player-2:arrived 比賽結束!
三、談ReentrantLock
1、ReentrantLock是什麽?有什麽用? ReentrantLock跟synchronized作用差不多,是在於synchronized基礎上的一種簡易同步容器,並沒有深層次的原理剖析。 2、ReentrantLock的基礎用法 2.1 回顧synchronized如何實現線程同步。/** * 示例一:同步鎖的使用 * reentrantlock用於替代synchronized * 本例中由於m1鎖定this,只有m1執行完畢的時候,m2才能執行 * @author qiuyongAaron */ public class ReentrantLockOne { public synchronized void m1(){ for(int i=0;i<10;i++){ try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(i); } } public synchronized void m2(){ System.out.println("hello m2!"); } public static void main(String[] args) { ReentrantLockOne lock=new ReentrantLockOne(); new Thread(()->lock.m1(),"t1").start(); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(()->lock.m2(),"t2").start(); } }Synchronized實現線程同步
2.2 ReentrantLock實現線程同步-與synchronized作用一致!
/** * 示例二:等價於同步鎖 * 使用reentrantlock可以完成同樣的功能 * 需要註意的是,必須要必須要必須要手動釋放鎖(重要的事情說三遍) * 使用syn鎖定的話如果遇到異常,jvm會自動釋放鎖,但是lock必須手動釋放鎖,因此經常在finally中進行鎖的釋放 * @author qiuyongAaron */ public class ReentrantLockTwo { ReentrantLock lock =new ReentrantLock(); public void m1(){ try { lock.lock(); for(int i=0;i<10;i++){ TimeUnit.SECONDS.sleep(1); System.out.println(i); } } catch (InterruptedException e) { e.printStackTrace(); }finally{ lock.unlock(); } } public synchronized void m2(){ lock.lock(); System.out.println("hello m2!"); lock.unlock(); } public static void main(String[] args) { ReentrantLockTwo lock=new ReentrantLockTwo(); new Thread(()->lock.m1(),"t1").start(); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(()->lock.m2(),"t2").start(); } }ReentrantLock同步互斥 2.3 ReentrantLock嘗試獲取鎖,若指定時間無法獲取鎖放棄等待!
/** * 示例三:tryLock * 使用reentrantlock可以進行“嘗試鎖定”tryLock,這樣無法鎖定,或者在指定時間內無法鎖定,線程可以決定是否繼續等待 * @author qiuyongAaron */ public class ReentrantLockThree { ReentrantLock lock=new ReentrantLock(); public void m1(){ try { lock.lock(); for(int i=0;i<10;i++){ TimeUnit.SECONDS.sleep(1); System.out.println(i); } } catch (Exception e) { e.printStackTrace(); }finally{ lock.unlock(); } } boolean locked=false; public void m2(){ try { lock.tryLock(5,TimeUnit.SECONDS); System.out.println("m2:"+locked); } catch (Exception e) { e.printStackTrace(); }finally{ if(locked) lock.unlock(); } } public static void main(String[] args) { ReentrantLockThree lock=new ReentrantLockThree(); new Thread(()->lock.m1(),"t1").start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(()->lock.m2(),"t2").start(); } }ReentrantLock嘗試獲取鎖
2.4 指定公平鎖或者搶占式鎖
/** * ReentrantLock還可以指定為公平鎖 * @author qiuyongAaron */ public class ReentrantLockFive extends Thread{ //默認false:為非公平鎖 true:公平鎖 ReentrantLock lock=new ReentrantLock(); @Override public void run() { for(int i=0;i<100;i++){ lock.lock(); try { TimeUnit.SECONDS.sleep(1); System.out.println(Thread.currentThread().getName()+"獲得鎖"+"-"+i); } catch (InterruptedException e) { e.printStackTrace(); }finally{ lock.unlock(); } } } public static void main(String[] args) { ReentrantLockFive lock=new ReentrantLockFive(); new Thread(lock,"t1").start(); new Thread(lock,"t2").start(); } } 運行結果: //非公平鎖 t2獲得鎖-0 t2獲得鎖-1 t1獲得鎖-0 t1獲得鎖-1 t1獲得鎖-2 t2獲得鎖-2 //公平鎖 t1獲得鎖-0 t2獲得鎖-0 t1獲得鎖-1 t2獲得鎖-1 t1獲得鎖-2 t2獲得鎖-2ReentrantLock公平鎖 3、ReentrantLock實現線程通信
/** * 模擬生產者消費者模式-線程之間通信 synchronized-notifyAll/wait * @author qiuyongAaron */ public class MyContainerOne { LinkedList<Integer> list=new LinkedList<Integer>(); static final int MAX=10; int count=0; //生產者線程 public synchronized void put(int i){ while(list.size()==MAX){ try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } list.add(i); ++count; this.notifyAll();//通知消費者來消費 } //消費者線程 public synchronized int get(){ while(list.size()==0){ try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } int num=list.removeFirst(); count--; this.notifyAll();//通知生產者生產 return num; } public static void main(String[] args) { MyContainerOne container=new MyContainerOne(); //制造10個消費者 for(int i=0;i<10;i++){ new Thread(()->{ for(int j=0;j<5;j++) System.out.println(container.get()); }, "c"+i).start(); } try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } //制造2個生產者 for(int i=0;i<2;i++){ new Thread(()->{ for(int j=0;j<25;j++) container.put(j); }, "p"+i).start(); } } }
/** * 模擬生產者消費者模式-reentrantLock-awit/signAll * @author qiuyongAaron */ public class MyContainerTwo { LinkedList<Integer> list=new LinkedList<Integer>(); static final int MAX=10; int count=0; ReentrantLock lock=new ReentrantLock(); Condition producer=lock.newCondition(); Condition consumer=lock.newCondition(); //生產者線程 public void put(int i){ try { lock.lock(); while(list.size()==MAX){ producer.await(); } list.add(i); ++count; consumer.signalAll();//通知消費者來消費 } catch (InterruptedException e){ e.printStackTrace(); }finally{ lock.unlock(); } } //消費者線程 public int get(){ try{ lock.lock(); while(list.size()==0){ consumer.await(); } int num=list.removeFirst(); count--; producer.signalAll();//通知生產者生產 return num; }catch(Exception e){ e.printStackTrace(); }finally{ lock.unlock(); } return 0; } public static void main(String[] args) { MyContainerTwo container=new MyContainerTwo(); //制造10個消費者 for(int i=0;i<10;i++){ new Thread(()->{ for(int j=0;j<5;j++) System.out.println(container.get()); }, "c"+i).start(); } try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } //制造2個生產者 for(int i=0;i<2;i++){ new Thread(()->{ for(int j=0;j<25;j++) container.put(j); }, "p"+i).start(); } } }總結:synchronized實現線程的消費者-生產者模式是通過wait/notifyAll實現,ReentrantLock是通過condition+await/signAll。那他們有什麽區別呢?synchronized要麽通過notify隨機喚醒一個,或者notifyAll喚醒所有不管你是消費者還是生產者、而ReentrantLock是喚醒指定的線程的,更加精確效率更高。
四、版權聲明
作者:邱勇Aaron
出處:http://www.cnblogs.com/qiuyong/
您的支持是對博主深入思考總結的最大鼓勵。
本文版權歸作者所有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接,尊重作者的勞動成果。
參考:馬士兵並發編程、並發編程實踐
AQS詳解:http://www.cnblogs.com/waterystone/p/4920797.html
CountDownLatch詳解:http://www.cnblogs.com/yezhenhan/archive/2012/01/07/2315652.html並發編程(三):從AQS到CountDownLatch與ReentrantLock