漫談並發編程(五):線程之間的協作
wait()與notifyAll()
wait()使你能夠等待某個條件發生變化,wait()會在等待外部世界產生變化的時候將任務掛起,而且僅僅有在notify()或notifyAll()發生時,即表示發生了某些感興趣的事物,這個任務才會被喚醒並去檢查所產生的變化。 調用sleep()的時候鎖並沒有被釋放,調用yield()也屬於這樣的情況。理解這一點非常關鍵。還有一方面。當一個任務在方法裏遇到了對wait()的調用的時候。線程的運行被掛起,對象上的鎖被釋放。因此wait()釋放鎖。這就意味著還有一個任務能夠獲得這個鎖。因此在該對象中的其它synchronized方法能夠在wait()期間被調用,而其它的方法通常將會產生改變,而這樣的改變正是使被掛起的任務又一次喚醒所感興趣的變化。
有兩種形式的wait()。第一種版本號接受毫秒數為參數,含義與sleep()方法裏的參數的意思同樣,都是指"在此期間暫停"。可是與sleep()不同的是,對於wait()而言:- 在wait()期間對象鎖是釋放的
- 能夠通過notify()、notifyAll(),或者令時間到期。從wait()中恢復運行。
這樣的wait()將無限等待下去,直到線程接收到notify()或者notifyAll()消息。
能夠想象,wait()、notify()、notifyAll()一定是基於某個"東西",把自身狀態附加上去,來實現這樣的通知及狀態的變化。考慮設計方式:1. 這樣的東西能夠單獨被定義出來。 2. 在Object中提供該"東西"的實現。 明顯另外一種方式要輕松方便很多。遷移性更強。其次。這樣的東西可能不是線程安全的,所以須要鎖來支持。
使用synchronized來進行同步的保護是理所應當,由於"東西"的實現就在Object中,其次使用synchronized的優點是一定程度能夠避免由於鎖不一致的情況下產生的wait()及notifyAll的不正確應。wait()在一把鎖中釋放了鎖,和notifyAll在還有一把鎖進行操作毫無相關。
class Car { private boolean waxOn = false; public synchronized void waxed() { waxOn = true; notifyAll( ); } public synchronized void buffed( ) { waxOn = false; notifyAll( ); } public synchronized void waitForWaxing( ) throws InterruptedException{ while(waxOn == false) wait( ); } public synchronized void waitForBuffing( ) throws InterruptedException { while(waxOn == true) wait( ); } } class WaxOn implements Runnable { private Car car; public WaxOn(Car c) { car = c;} public void run() { try { while(!Thread.interrupted()) { System.out.print(" Wax on!"); TimeUnit.MILLISECONDS.sleep(200); car.waxed(); car.waitForBuffing(); } } catch (InterruptedException e) { System.out.println("Exiting via interrupt"); } System.out.println("Ending Wax On task"); } } class WaxOff implements Runnable { private Car car; public WaxOff(Car c) {car = c;} public void run( ) { try { while(!Thread.interrupted()) { car.waitForWaxing(); System.out.print("Wax Off"); TimeUnit.MILLISECONDS.sleep(200); car.buffed(); } } catch(InterruptedException e) { System.out.println("Exiting via interrupt"); } System.out.println("Ending Wax Off task"); } } public class WaxOMatic { public static void main(String[] args) throws Exception{ Car car = new Car(); ExecutorService exec = Executors.newCachedThreadPool(); exec.execute(new WaxOff(car)); exec.execute(new WaxOn(car)); TimeUnit.SECONDS.sleep(5); exec.shutdownNow(); } }前面的演示樣例強調你必須用一個檢查感興趣的條件的while循環包圍wait()。
這非常重要,由於:
- 你可能有多個任務出於同樣的原因在等待一個鎖,而第一個喚醒任務可能已經改變這樣的狀況(即使你沒有這麽做,有人也會通過繼承你的類去這麽做)。假設屬於這樣的情況,那麽這個任務應該被再次掛起,直至其感興趣的條件發生變化。
- 也有可能某些任務處於不同的原因在等待你的對象上鎖(在這樣的情況下必須使用(notifyAll))。在這樣的情況下,你須要檢查是否已經由正確的原因喚醒,假設不是,就再次調用wait()。
notify()與notifyAll() 由於在技術上,可能會有多個任務在單個Car對象上處於wait()狀態。因此調用notifyAll()比調用notify()要更安全。可是,上面程序的結構僅僅會有一個任務處於wait()狀態,因此你能夠使用notify()來取代notifyAll()。 使用notify()而不是notifyAll()是一種優化。
使用notify()時。在眾多等待同一個鎖的任務中僅僅有一個會被喚醒,因此假設你希望使用notify()就必須保證被喚醒的是恰當的任務。
另外,為了使用notify()。全部任務必須等待同樣的條件,由於假設你有多個任務在等待不同的條件。那麽你就不會知道是否喚醒的恰當的任務。假設使用notify(),當條件發生變化時,必須僅僅有一個任務能從中受益。最後,這些限制對全部可能存在的子類都必須總是起作用的。
假設這些規則中有不論什麽一條不滿足。那麽你就必須使用notifyAll()而不是notify()。
用wait()和notifyAll()實現生產者消費者問題
使用wait()和notifyAll()時一定要註意不能兩層嵌套synchronized,假設使用了兩層,則外層的sycnhronized加的鎖無法釋放。並且須要註意的是不能使用Lock來限制資源的訪問。由於wait時無法釋放該鎖。假設還要限制在notifyAll時不能notifyAll到同類。那麽實現這個問題還是有難度的。
以下貼上一個自己一個粗陋的實現。各位朋友有美麗代碼的也能夠貼上來交流下。class Meal { } class WaitPerson implements Runnable { private String name; private Restaurant restaurant; public WaitPerson(String name, Restaurant res) { this.name = name; this.restaurant = res; } @Override public void run() { try { while (!Thread.interrupted()) { synchronized (restaurant.waitPersons) { while (restaurant.meals.size() < 1) { restaurant.waitPersons.wait(); } } synchronized (restaurant.chefs) { if (restaurant.meals.size() >= 1) { restaurant.meals.poll(); restaurant.chefs.notifyAll(); System.out.println(name + " consumed a meal !"); } } } } catch (InterruptedException e) { System.out.println(name + " is ended via InterruptedException !"); return; } System.out.println(name + " is ended via InterruptedException !"); } } class Chef implements Runnable { private String name; private Restaurant restaurant; public Chef(String name, Restaurant res) { this.name = name; this.restaurant = res; } @Override public void run() { try { while (!Thread.interrupted()) { synchronized (restaurant.chefs) { while (restaurant.meals.size() > 10) { restaurant.chefs.wait(); } } synchronized (restaurant.waitPersons) { if (restaurant.meals.size() <= 10) { restaurant.meals.add(new Meal()); restaurant.waitPersons.notifyAll(); System.out.println(name + " produced a meal !"); } } } } catch (InterruptedException e) { System.out.println(name + " is ended via InterruptedException !"); return; } System.out.println(name + " is ended via InterruptedException !"); } } public class Restaurant { public Queue<Meal> meals = new ConcurrentLinkedQueue<Meal>(); public List<WaitPerson> waitPersons = new ArrayList<WaitPerson>(); public List<Chef> chefs = new ArrayList<Chef>(); public static void main(String[] args) throws InterruptedException { Restaurant res = new Restaurant(); ExecutorService exec = Executors.newCachedThreadPool(); Chef chef1 = new Chef("chef1", res); Chef chef2 = new Chef("chef2", res); res.chefs.add(chef1); res.chefs.add(chef2); exec.execute(chef1); exec.execute(chef2); WaitPerson waitPerson1 = new WaitPerson("waitPerson1", res); WaitPerson waitPerson2 = new WaitPerson("waitPerson2", res); res.waitPersons.add(waitPerson1); res.waitPersons.add(waitPerson2); exec.execute(waitPerson1); exec.execute(waitPerson2); // TimeUnit.MILLISECONDS.sleep(3000); // exec.shutdownNow(); } }上面這個程序能夠證明出來是線程安全的。
只是使用這樣的方式實在是太晦澀了。生產者消費者問題的機制須要我們去控制,實際上,java並發類庫為我們提供了這樣的模型的實現,我們待會會用堵塞隊列來重寫這個問題。
使用顯式的Lock和Condition對象
我們能夠顯式的使用Condition對象來替代我前面提到的"東西",使用這樣的方式將更加靈活,且有更清晰的辯識度,但會添加程序中對象的數量。你能夠通過在Condition上調用await()來掛起一個任務。當外部條件發生變化。意味著某個任務應該繼續運行時。你能夠通過調用signal()來通知這個任務,從而喚醒一個任務,或者調用signalAll()來喚醒全部在這個Condition上被其自身掛起的任務。 以下我們利用此工具重寫前面樣例中的Car類。class Car { private boolean waxOn = false; private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); public void waxed() { lock.lock(); try { waxOn = true; condition.signalAll(); } finally { lock.unlock(); } } public void buffed( ) { lock.lock(); try { waxOn = false; condition.signalAll(); } finally { lock.unlock(); } } public void waitForWaxing( ) throws InterruptedException{ lock.lock(); try{ while(waxOn == false) condition.await(); } finally { lock.unlock(); } } public void waitForBuffing( ) throws InterruptedException { lock.lock(); try { while(waxOn == true) condition.await( ); } finally { lock.unlock(); } } }
使用BlockingQueue來解決生產者消費者問題
java幫我們抽象了生產者消費者問題。我們能夠使用同步隊列來解決任務協作的問題。同步隊列在不論什麽時刻都僅僅同意一個任務插入或移除元素。在java.util.concurrent.BlockingQueue接口中提供了這個隊列,這個接口有大量的實現。你通常能夠使用LinkedBlockingQueue。它是一個無界隊列,還能夠使用ArrayBlockingQueue,它具有固定的尺寸,因此你能夠在它被堵塞之前,向當中放置有限數量的元素。
假設消費者任務試圖從隊列中獲取對象,而該隊列此時為空,那麽這些隊列還能夠掛起消費者任務,而且當有很多其它的元素可用時恢復消費者任務。堵塞隊列能夠解決很大量的問題。而其方式與wait()和notifyAll()相比,則簡單並可靠太多。 以下利用堵塞隊列實現了上面的餐廳問題。class Meal { } class WaitPerson implements Runnable { private String name; private RestaurantBlookingQueue restaurant; public WaitPerson(String name, RestaurantBlookingQueue res) { this.name = name; this.restaurant = res; } @Override public void run() { try { while (!Thread.interrupted()) { restaurant.meals.take(); System.out.println(name + "taked a Meal"); Thread.sleep(100); } } catch (InterruptedException e) { System.out.println(name + " is ended via InterruptedException !"); return; } System.out.println(name + " is ended via InterruptedException !"); } } class Chef implements Runnable { private String name; private RestaurantBlookingQueue restaurant; public Chef(String name, RestaurantBlookingQueue res) { this.name = name; this.restaurant = res; } @Override public void run() { try { while (!Thread.interrupted()) { restaurant.meals.put(new Meal()); System.out.println(this.name + "made a meal"); Thread.sleep(100); } } catch (InterruptedException e) { System.out.println(name + " is ended via InterruptedException !"); return; } System.out.println(name + " is ended via InterruptedException !"); } } public class RestaurantBlookingQueue { public BlockingQueue<Meal> meals = new ArrayBlockingQueue<Meal>(10); public List<WaitPerson> waitPersons = new ArrayList<WaitPerson>(); public List<Chef> chefs = new ArrayList<Chef>(); public static void main(String[] args) throws InterruptedException { RestaurantBlookingQueue res = new RestaurantBlookingQueue(); ExecutorService exec = Executors.newCachedThreadPool(); Chef chef1 = new Chef("chef1", res); Chef chef2 = new Chef("chef2", res); res.chefs.add(chef1); res.chefs.add(chef2); exec.execute(chef1); exec.execute(chef2); WaitPerson waitPerson1 = new WaitPerson("waitPerson1", res); WaitPerson waitPerson2 = new WaitPerson("waitPerson2", res); res.waitPersons.add(waitPerson1); res.waitPersons.add(waitPerson2); exec.execute(waitPerson1); exec.execute(waitPerson2); // TimeUnit.MILLISECONDS.sleep(3000); // exec.shutdownNow(); } }
任務間使用管道進行輸入/輸出
通過輸入/輸出在線程間進行通信通常非常實用。提供線程功能的類庫以"管道"的形式對線程的輸入/輸出提供了支持。
它們在Java輸入/輸出類庫中的相應物就是PipedWriter類(同意任務向管道寫)和PipedReader類(同意不同任務從同一個管道讀)。這個模型能夠看成是"生產者-消費者"問題的變體。
管道基本是一個堵塞隊列,存在於多個引入BlookingQueue之前的Java版本號。
class Sender implements Runnable { private Random rand = new Random(47); private PipedWriter out = new PipedWriter(); public PipedWriter getPipedWriter( ) {return out;} public void run( ) { try { while(true) { for(char c = ‘A‘ ; c <= ‘z‘; c++) { out.write(c); TimeUnit.MILLISECONDS.sleep( rand.nextInt(500)); } } } catch (IOException e) { System.out.println(e + " Sender write exception"); } catch (InterruptedException e) { System.out.println(e + " Sender sleep exception"); } } } class Receiver implements Runnable { private PipedReader in; public Receiver(Sender sender) throws IOException { in = new PipedReader(sender.getPipedWriter()); } public void run( ) { try { while(true) { System.out.print("Read: "+(char)in.read() + ", "); } } catch (IOException e) { System.out.println(e + " Receiver read exception"); } } } public class PipedIO { public static void main(String []args) throws Exception { Sender sender = new Sender( ); Receiver receiver = new Receiver( sender ); ExecutorService exec = Executors.newCachedThreadPool(); exec.execute(sender); exec.execute(receiver); TimeUnit.SECONDS.sleep( 4 ); exec.shutdownNow(); } }
死鎖
死鎖本是操作系統的中概念,由於操作系統中會遇到非常多可能發生死鎖的狀況。但我們在並發程序常常也須要預防死鎖。特別是多個線程在並發的訪問多個對象的時候。首先,我們須要從邏輯上避免死鎖發生的可能性,比如哲學家進餐問題。一般在程序中的解決方案是一次性將資源全然分配給它,為了提供並發度,須要我們進一步縮小並發鎖的範圍。除了邏輯上預防並發。我們還須要處理意外情況,比如獲取到資源的線程中途掛掉。我們須要釋放資源。在程序中即釋放鎖。在程序中能夠通過try-catch實現。
漫談並發編程(五):線程之間的協作