線程通信之生產者消費者模型
線程通信,是指線程之間的消息傳遞。
多個線程在操作同一個資源時,它們對共享資源的操作動作可能不同;它們共享同一個資源,互為條件,相互依賴,相互通信,從而讓任務向前推進。
另外,在線程的同步策略中,雖然可以解決並發更新同一個資源,保障資源的安全,但不能用來實現線程間的消息傳遞。因此,線程通信與線程同步往往會融合使用。
生產者消費者模型堪稱是線程通信中的一個典型案例,我們接下來通過生產者消費者模式來進一步認識線程通信。在此,我們先對若幹概念進行了解。
生產者:沒有生產之前通知消費者等待,生產產品結束之後,馬上通知消費者消費
消費者:沒有消費之前通知生產者等待,消費產品結束之後,通知生產者繼續生產產品以供消費
線程通信:使用java中超類Object中提供的一些方法:
1 public final void wait(); //註:long timeout=0 表示線程一直等待,直到其它線程通知 2 public final native void wait(long timeout); //線程等待指定毫秒參數的時間,超過該時間則不再等待 3 public final void wait(long timeout, int nanos); //線程等待指定毫秒、微妙的時間,timeout最大等待時間,以毫秒為單位,nanos額外的時間,在納秒範圍0-9999994 public final native voidnotify(); //喚醒一個處於等待狀態的線程 5 public final native void notifyAll(); //喚醒同一個對象上所有調用wait()方法的線程,優先級別高的線程優先運行
需要註意的是,上述方法只能在同步方法或者同步代碼塊中使用,否則會拋出異常。
接下來,我們以生產A-D個產品,放入倉庫,待消費者消費後,生產者再進行生產為例,看下生產者消費者模式的運行流程。
1 /** 2 * 1.共享資源緩存和操作類 3 */ 4 public class SharedCache { 5 //產品,此處使用char字符,作為存儲共享數據的數據類型6 private char cache; 7 //產品消費標識,是線程間通信的信號,為true表示未消費(生產),false表示未生產(消費) 8 private boolean flag=false; 9 /* 10 生產操作(生產者):向倉庫中添加共享數據 11 */ 12 public synchronized void addSharedCacheData(char data){ 13 //產品未消費,則生產者的生產操作等待 14 if(flag){ 15 System.out.println("產品未消費,生產者的生產操作等待"); 16 try { 17 //生產者等待 18 wait(); 19 } catch (InterruptedException e) { 20 System.out.println("Thread interrupted Exception:"+e.getMessage()); 21 } 22 } 23 //產品已消費,則生產者繼續生產 24 this.cache=data; 25 //標記已生產 26 flag=true; 27 //通知消費者已生產 28 notify(); 29 System.out.println("生產者--->產品:"+data+"已生產,等待消費者消費"); 30 } 31 /* 32 消費操作(消費者):向倉庫中獲取共享數據 33 */ 34 public synchronized char getSharedCacheData(){ 35 //如果產品未生產,則消費者等待 36 if(!flag){ 37 System.out.println("產品未生產,消費者的消費操作等待"); 38 try { 39 wait(); 40 } catch (InterruptedException e) { 41 System.out.println("Thread interrupted Exception:"+e.getMessage()); 42 } 43 } 44 //標記已消費 45 flag=false; 46 //通知生產者已消費 47 notify(); 48 System.out.println("消費者--->產品:"+this.cache+"已消費,通知生產者生產"); 49 return this.cache; 50 } 51 } 52 /** 53 * 2.生產者線程類 54 */ 55 public class Producer extends Thread{ 56 //共享緩存資源類的對象 57 private SharedCache cache; 58 //構造器,傳入共享資源類的對象 59 public Producer(SharedCache cache){ 60 this.cache=cache; 61 } 62 /* 63 生產者生產產品,放入共享資源緩存類(相當於將生產的產品放入倉庫裏) 64 生產A-D類型的產品 65 */ 66 @Override 67 public void run() { 68 for(char product=‘A‘;product<=‘D‘;product++){ 69 try { 70 sleep((int)(Math.random()*3000)); 71 } catch (InterruptedException e) { 72 System.out.println("Thread interrupted Exception:"+e.getMessage()); 73 } 74 //生產產品,放入共享緩存數據類的對象裏(相當於把生產的產品放到倉庫裏) 75 cache.addSharedCacheData(product); 76 } 77 } 78 } 79 /** 80 * 3.消費者線程類 81 */ 82 public class Consumer extends Thread{ 83 //共享緩存資源類的對象 84 private SharedCache cache; 85 //構造器,傳入共享資源類的對象 86 public Consumer(SharedCache cache){ 87 this.cache=cache; 88 } 89 /* 90 消費者消費產品,獲取共享緩存類的對象裏的數據(相當於從倉庫裏提取產品) 91 當消費到D類型的產品時即停止消費 92 */ 93 @Override 94 public void run() { 95 char product=‘a‘; 96 do{ 97 try { 98 Thread.sleep((int)(Math.random()*3000)); 99 } catch (InterruptedException e) { 100 System.out.println("Thread interrupted Exception:"+e.getMessage()); 101 } 102 //消費,從倉庫取走商品 103 product=cache.getSharedCacheData(); 104 }while (product!=‘D‘); 105 } 106 } 107 /** 108 * 4.線程通信測試類 109 */ 110 public class Test { 111 public static void main(String[] args) { 112 //生產者與消費者共享同一個資源 113 SharedCache cache = new SharedCache(); 114 //啟動消費者線程 115 new Consumer(cache).start(); 116 //啟動生產者線程 117 new Producer(cache).start(); 118 } 119 }
運行上述的測試類後,執行結果如下:
產品未生產,消費者的消費操作等待 生產者--->產品:A已生產,等待消費者消費 消費者--->產品:A已消費,通知生產者生產 生產者--->產品:B已生產,等待消費者消費 消費者--->產品:B已消費,通知生產者生產 生產者--->產品:C已生產,等待消費者消費 產品未消費,生產者的生產操作等待 消費者--->產品:C已消費,通知生產者生產 生產者--->產品:D已生產,等待消費者消費 消費者--->產品:D已消費,通知生產者生產
我們在上面完成的生產者消費者模型,在處理線程同步問題時,主要是用了synchronized同步方法,JDK 1.5提供了多線程升級方案,將同步synchronized替換成了顯示的Lock操作,可以實現喚醒、凍結指定的線程。
接口Lock 實現提供了比使用 synchronized 方法和語句可獲得的更廣泛的鎖定操作。Lock 可以支持多個相關的 Condition 對象,從而在使用中更加靈活。
接口Condition可以替代傳統的線程間通信,用await()替換wait(),用signal()替換notify(),用signalAll()替換notifyAll()。該對象可以通過Lock鎖進行獲取。可以說,傳統線程的通信方式,Condition都可以實現。
需要註意的是,Condition是被綁定到Lock上的,要創建一個Lock的Condition必須用newCondition()方法。
Java.util.concurrent.lock 中的Lock 框架是鎖定的一個抽象,它允許把鎖定的實現作為 Java 類,從而為Lock 的多種實現留下了空間,各種實現可能有不同的調度算法、性能特性或者鎖定語義。
其中,ReentrantLock 類實現了Lock ,它擁有與synchronized 相同的並發性和內存語義,還添加了類似鎖投票、定時鎖等候和可中斷鎖等候的一些特性。此外,它還提供了在激烈爭用情況下更佳的性能。
我們接下來通過ReentrantLock 類和Condition接口的實現類來完成一個生產者消費者模型。為此,我們需要創建一個ReentrantLock類的多態對象,即建立一把鎖,然後將這把鎖與兩個Condition對象關聯。我們接下來就用Lock與Condition實現一個生產者消費者模型,實現與上述例子相似的效果,代碼具體如下:
1 import java.util.concurrent.locks.Condition; 2 import java.util.concurrent.locks.Lock; 3 import java.util.concurrent.locks.ReentrantLock; 4 /** 5 * 共享的資源 6 */ 7 public class Resource { 8 private char product; 9 // private int count = 1; 10 //產品消費標識,是線程間通信的信號,為true表示未消費(生產),false表示未生產(消費) 11 private boolean flag = false; 12 //定義一個實現Lock接口的ReentrantLock類對象 13 private Lock lock = new ReentrantLock(); 14 /* 15 Condition是被綁定到Lock上的, 16 要創建一個Lock的Condition, 17 必須用Lock對象的newCondition()方法 18 */ 19 private Condition cond_pro = lock.newCondition(); 20 //一個lock可以有多個相關的condition 21 private Condition cond_con = lock.newCondition(); 22 /* 23 定義生產方法 24 */ 25 public void produce(char product) throws InterruptedException { 26 lock.lock();//手動加同步鎖 27 try { 28 while (flag) {//此時若生產完一個以後喚醒了另一個生產者,則再次判斷,避免兩個生產者同時生產 29 System.out.println("產品未消費,生產者的生產操作等待"); 30 cond_pro.await(); 31 } 32 this.product = product; 33 //標記已生產 34 flag = true; 35 //通知消費者已生產 36 cond_con.signal();//喚醒消費方法,利用了condition的signal()指定喚醒對象 37 System.out.println("生產者"+Thread.currentThread().getName()+"--->產品:"+product+"已生產,等待消費者消費"); 38 } finally { 39 lock.unlock();//釋放鎖 40 } 41 } 42 /* 43 定義消費方法 44 */ 45 public void consume() throws InterruptedException { 46 lock.lock(); 47 try { 48 while (!flag) { 49 System.out.println("產品未生產,消費者的消費操作等待"); 50 cond_con.await(); 51 } 52 //標記已消費 53 flag = false; 54 //通知生產者已消費 55 cond_pro.signal(); 56 System.out.println("消費者"+Thread.currentThread().getName()+"--->產品:"+this.product+"已消費,通知生產者生產"); 57 } finally { 58 lock.unlock(); 59 } 60 } 61 } 62 /** 63 * 生產者 64 */ 65 public class Producer implements Runnable{ 66 private Resource res; 67 public Producer(Resource res){ 68 this.res=res; 69 } 70 @Override 71 public void run() { 72 char product=‘A‘; 73 while(product<‘E‘){ 74 try { 75 res.produce(product); 76 } catch (InterruptedException e) { 77 e.printStackTrace(); 78 } 79 product++; 80 } 81 } 82 } 83 /** 84 * 消費者 85 */ 86 public class Consumer implements Runnable{ 87 private Resource res; 88 public Consumer(Resource res){ 89 this.res=res; 90 } 91 @Override 92 public void run() { 93 char product=‘A‘; 94 while(product<‘E‘){ 95 try { 96 res.consume(); 97 } catch (InterruptedException e) { 98 e.printStackTrace(); 99 } 100 product++; 101 } 102 } 103 } 104 /** 105 * 用ReentrantLock和Condition實現生產者消費者模型 106 */ 107 public class Test { 108 //入口方法 109 public static void main(String[] args) { 110 Resource res = new Resource();//生產者與消費者共享的資源 111 Producer producer = new Producer(res);//生產者 112 Consumer consumer = new Consumer(res);//消費者 113 //生產者線程與消費者線程各創建兩個 114 Thread p1 = new Thread(producer); 115 Thread p2 = new Thread(producer); 116 Thread c1 = new Thread(consumer); 117 Thread c2 = new Thread(consumer); 118 p1.start(); 119 p2.start(); 120 c1.start(); 121 c2.start(); 122 } 123 }
上述代碼執行結果如下:
生產者Thread-0--->產品:A已生產,等待消費者消費 產品未消費,生產者的生產操作等待 消費者Thread-2--->產品:A已消費,通知生產者生產 產品未生產,消費者的消費操作等待 生產者Thread-1--->產品:A已生產,等待消費者消費 產品未消費,生產者的生產操作等待 消費者Thread-2--->產品:A已消費,通知生產者生產 產品未生產,消費者的消費操作等待 生產者Thread-0--->產品:B已生產,等待消費者消費 產品未消費,生產者的生產操作等待 消費者Thread-3--->產品:B已消費,通知生產者生產 產品未生產,消費者的消費操作等待 生產者Thread-1--->產品:B已生產,等待消費者消費 產品未消費,生產者的生產操作等待 消費者Thread-2--->產品:B已消費,通知生產者生產 產品未生產,消費者的消費操作等待 生產者Thread-0--->產品:C已生產,等待消費者消費 產品未消費,生產者的生產操作等待 消費者Thread-3--->產品:C已消費,通知生產者生產 產品未生產,消費者的消費操作等待 生產者Thread-1--->產品:C已生產,等待消費者消費 產品未消費,生產者的生產操作等待 消費者Thread-2--->產品:C已消費,通知生產者生產 生產者Thread-0--->產品:D已生產,等待消費者消費 消費者Thread-3--->產品:D已消費,通知生產者生產 產品未生產,消費者的消費操作等待 生產者Thread-1--->產品:D已生產,等待消費者消費 消費者Thread-3--->產品:D已消費,通知生產者生產
線程通信之生產者消費者模型