java並發基礎
《java並發編程實戰》終於讀完4-7章了,感觸很深,但是有些東西還沒有吃透,先把已經理解的整理一下。《java並發編程實戰》筆記(一)是對前3章的總結。這裏總結一下第5章的東西,為什麽跳過第4章?不告訴你。
一,阻塞隊列和生產者-消費者模式
java中的阻塞隊列提供了可阻塞的put和take方法,以及支持定時的offer和poll方法。如果隊列已經滿了,那麽put方法將阻塞直到有空間可用;如果隊列為空,那麽take方法將會阻塞直到有元素可用。類庫中包含了BlockingQueue的多種實現,其中,LinkedBlockingQueue和ArrayBlockingQueue是FIFO隊列(先進先出),PriorityBlockingQueue是一個按優先級排序的隊列。
阻塞隊列支持生產者-消費者模式,該模式將“找出需要完成的工作”和“執行工作”這兩個過程分離開來;並把工作項放入一個“待完成”列表中以便在隨後處理,而不是找出後立即處理。當數據生成時,生產者把數據放入隊列,而當消費者準備處理數據時,將從隊列中獲取數據。生產者不需要知道消費者的標識和數量,或者它們是否是唯一的生產者,而只需將數據放入隊列即可。同樣,消費者也不需要知道生產者是誰,來自何處。
舉個例子說明阻塞隊列和生產者-消費者模式如何配合使用:在某個文件層次結構中搜索所有的.java文件。
import java.io.File; import java.io.FileFilter; importjava.util.concurrent.BlockingQueue; /** * @描述:生產者 * @author 肖冬 */ public class FileCrawler implements Runnable { private final BlockingQueue<File> fileQueue ; private final FileFilter fileFilter; private final File root; public FileCrawler(BlockingQueue<File> fileQueue,FileFilter fileFilter, File root) {super(); this.fileQueue = fileQueue; this.fileFilter = fileFilter; this.root = root; } @Override public void run() { try { crawl(root); } catch (InterruptedException e) { //使線程恢復中斷狀態,為什麽這麽做:因為線程一旦拋出中斷異常,就會重置中斷狀態 Thread.currentThread().interrupt(); } } private void crawl(File root) throws InterruptedException{ File[] entries = root.listFiles(fileFilter); if (entries !=null) { for (File entry : entries) { if (entry.isDirectory()) { crawl(entry); }else{ //生產者將任務放到阻塞隊列 fileQueue.put(entry); } } } } }
import java.io.File; import java.util.concurrent.BlockingQueue; /** * @描述:消費者 * @author 肖冬 */ public class Indexer implements Runnable { private final BlockingQueue<File> queue; public Indexer(BlockingQueue<File> queue) { this.queue = queue; } @Override public void run() { try{ while (true) { indexFile(queue.take()); } }catch(InterruptedException e){ Thread.currentThread().interrupt(); } } private void indexFile(File f){ System.out.println(f.getName()); } }
import java.io.File; import java.io.FileFilter; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import org.junit.Test; /** * @描述:junit測試 * @author 肖冬 */ public class TestMain { @Test public void testFileFilter(){ File root = new File("D://test"); FileFilter fileFilter = new FileFilter() { @Override public boolean accept(File f) { if (f.isDirectory()) return true; if (f.isFile()) { String name = f.getName(); if (name.endsWith(".java")) return true; else return false; } return false; } }; BlockingQueue<File> queue = new LinkedBlockingQueue<File>(); //生產者 new Thread(new FileCrawler(queue,fileFilter, root)).start(); //消費者 new Thread(new Indexer(queue)).start(); } }
生產者將符合條件的文件放入阻塞隊列,消費者處理任務時只從阻塞隊列中去任務就可以了,不必關心生產者。
生產者-消費者模式同樣能帶來許多性能優勢。生產者和消費者可以並發的執行。如果一個是I/O密集型,另一個是CPU密集型,那麽並發執行的吞吐率要高於串行執行的吞吐率。
二,同步工具類
同步工具類可以是任何一個對象,只要它根據自身的狀態來協調線程間的控制流,比如阻塞隊列,應為take和put等方法將阻塞,直到隊列達到期望的狀態(隊列即非空,也非滿)。
除了阻塞隊列,其他類型的同步容器還包括:閉鎖,柵欄,信號量。
1.閉鎖
我自己理解閉鎖的意思,它可以讓某些動作一起開始,比如,讓多個線程一起執行,並等待最慢的線程執行完畢。代碼如下:
import java.util.concurrent.CountDownLatch; public class Harness { public long timeTasks(int nThreads,final Runnable task) throws InterruptedException { final CountDownLatch startGate = new CountDownLatch(1); final CountDownLatch endGate = new CountDownLatch(nThreads); for (int i = 0; i < nThreads; i++) { Thread t = new Thread(){ @Override public void run() { try { startGate.await(); try { task.run(); } finally { endGate.countDown(); } } catch(InterruptedException e){} } }; t.start(); } long startTime = System.nanoTime(); startGate.countDown(); endGate.await(); long endTime = System.nanoTime(); return endTime - startTime; } }
例子中的CountDownLatch是一種靈活的閉鎖實現,它可以使一個或者多個線程等待一組事件發生。閉鎖包括一個計數器,該計數器被初始化為一個正數,表示需要等待的事件數量。countDown方法遞減計數器,表示有一個事件已經發生了,而await方法等待計數器達到零,這表示所有需要等待的事件都已經發生。如果計數器的值非零,那麽await會一直阻塞直到計數器為零,或等待中的線程中斷,或等待超時。countDown和await方法是配合使用的。
Harness創建一定數量的線程,利用它們並發的執行指定任務,它使用兩個閉鎖,分別表示起始門和結束門,起始門的初始值為1,而結束門的計數器初始值為工作線程的數量,每個線程首先要做的事就是在啟動門上等待,確保所有線程同時執行。而每個線程要做的最後一件事是調用結束門的countDown方法減1,使主線程高效的等待直到所有線程都執行完成,然後統計耗時。
2.信號量
計數信號量用來控制同時訪問某個特定資源的操作數量,或者同時執行某個指定操作的數量。
例子:有固定大小的HashSet,如果容器滿了,容器將處於阻塞狀態。
import java.util.Collections; import java.util.HashSet; import java.util.Set; import java.util.concurrent.Semaphore; public class BoundedHashSet<T> { private final Set<T> set; private final Semaphore sem; public BoundedHashSet(int bound) { this.set = Collections.synchronizedSet(new HashSet<T>()); sem = new Semaphore(bound); } public boolean add(T o) throws InterruptedException{ sem.acquire();//獲得一個許可 boolean wasAdded = false; try { wasAdded = set.add(o); return wasAdded; } finally{ if (!wasAdded) { sem.release();//將許可返回給信號量 } } } public boolean remove(T o){ boolean wasRemoved = set.remove(o); if (wasRemoved) { sem.release();//將許可返回給信號量 } return wasRemoved; } }
Semaphore中管理著一組虛擬的許可,許可的虛擬數量可通過構造函數指定,在執行操作時可以首先獲得許可(只要還有剩余的許可),並在使用後返還許可。如果沒有許可,那麽acquire方法將阻塞,直到有許可為止(或者直到被中斷或者操作超時),release方法將返還一個許可給信號量。所以例子中的如果容器滿了,再add元素的時候出現的阻塞狀態,實際上是Semaphore的acquire處於阻塞狀態,將線程掛起,直到信號量中有新的許可為止。
三,構建高效且可伸縮的結果緩存
幾乎所有的服務器應用程序都會使用某種形式的緩存,重用之前的計算結果能降低延遲,提高吞吐量,但卻需要消耗更多的內存。我們從簡單的HashMap開始,然後分析它的並發性缺陷,並討論如何修復它們。
public interface Computable<A, V> { V compute(A arg) throws InterruptedException; }
import java.math.BigInteger; public class ExpensiveFunction implements Computable<String, BigInteger> { @Override public BigInteger compute(String arg) throws InterruptedException { //這裏模擬耗時的計算 return new BigInteger(arg); } }
import java.util.HashMap; import java.util.Map; public class Memoizer1<A, V> implements Computable<A, V> { private final Map<A, V> cache = new HashMap<A, V>(); private Computable<A, V> c; public Memoizer1(Computable<A, V> c) { super(); this.c = c; } @Override public synchronized V compute(A arg) throws InterruptedException { V result = cache.get(arg); if (result == null) { result = c.compute(arg); cache.put(arg,result); } return result; } }
Memoizer1給出了第一種嘗試:使用HashMap保存之前的結果,compute方法將首先檢查需要的結果是否已經在緩存中,如果存在則返回之前計算的值,否則,計算結果並保存到緩存中,再返回。
HashMap不是線程安全的,因此要確保兩個線程不會同時訪問HashMap,Memoizer1用了一種保守的方法,即對整個compute方法進行同步,這種方法能確保線程安全性,但會帶來一個明顯的可伸縮性的問題:每次只有一個線程能執行compute方法。如果另一個線程正在計算結果,那麽其他調用compute的線程可能會被阻塞很長時間。如果有多個線程在排隊等待還未計算出的結果,那麽,compute方法的計算時間可能比沒有使用緩存的計算時間更長。
import java.util.Map; import java.util.concurrent.ConcurrentHashMap; public class Memoizer2<A, V> implements Computable<A, V> { private final Map<A, V> cache = new ConcurrentHashMap<A, V>(); private Computable<A, V> c; public Memoizer2(Computable<A, V> c) { super(); this.c = c; } @Override public V compute(A arg) throws InterruptedException { V result = cache.get(arg); if (result == null) { result = c.compute(arg); cache.put(arg,result); } return result; } }
Memoizer2用ConcurrentHashMap替換HashMap,用於解決多線程訪問緩存的安全性。為什麽用ConcurrentHashMap而不是HashTable,因為ConcurrentHashMap的並發能力比HashTable強。
但是還有一個問題:如果某個線程啟動了一個開銷很大的計算,而其他線程並不知道這個計算正在進行。那麽很可能會重復這個計算。我們希望通過某種方式來表達“線程X正在進行計算f(27)”這種情況,這樣當另一個線程查找f(27)時,它能夠直到最高效的方法是等待線程X計算結束,然後去緩存中直接拿結果,怎麽解決這個問題呢?有一個類能基本實現這個功能:FutureTask。
FutureTask表示一個計算的過程,這個過程可能已經計算完成,也可能正在進行。如果有結果可用,那麽FutureTask.get將立即返回結果,否則它會一直阻塞,直到結果計算出來再將其返回。
import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; public class Memoizer3<A, V> implements Computable<A, V> { private final Map<A, Future<V>> cache = new ConcurrentHashMap<A, Future<V>>(); private Computable<A, V> c; public Memoizer3(Computable<A, V> c) { super(); this.c = c; } @Override public V compute(final A arg) throws InterruptedException { Future<V> f = cache.get(arg); if (f == null) { Callable<V> eval = new Callable<V>() { @Override public V call() throws InterruptedException { return c.compute(arg); } }; FutureTask<V> ft = new FutureTask<V>(eval); f = ft; cache.put(arg, f); ft.run();//在這裏調用c.compute } try { return f.get(); } catch (ExecutionException e) { e.printStackTrace(); } return null; } }
Memoizer3解決了上面提到的問題。若結果已經計算出來,那麽將立即返回,如果其他線程正在計算該結果,那麽新的線程將一直等待這個結果被計算出來。它只有一個缺陷,即仍然可能存在兩個線程同時計算一個相同結果的情況。但是概率已經小了很多了。之所以還會發生,是因為put以後沒有進行再判斷,每個線程只要判斷f==null就立刻開始創建任務。
mport java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; public class Memoizer4<A, V> implements Computable<A, V> { private final ConcurrentHashMap<A, Future<V>> cache = new ConcurrentHashMap<A, Future<V>>(); private Computable<A, V> c; public Memoizer4(Computable<A, V> c) { super(); this.c = c; } @Override public V compute(final A arg) throws InterruptedException { Future<V> f = cache.get(arg); if (f == null) { Callable<V> eval = new Callable<V>() { @Override public V call() throws InterruptedException { return c.compute(arg); } }; FutureTask<V> ft = new FutureTask<V>(eval); f = cache.putIfAbsent(arg, f); if (f== null) { f = ft; ft.run();//在這裏調用c.compute } } try { return f.get(); } catch (ExecutionException e) { e.printStackTrace(); } return null; } }
Memoizer4解決了這個問題。至此,支持並發的耗時計算結果的緩存寫完了,當然Memoizer4還有其他的問題,比如緩存清理的問題,但是已經可以用了。
java並發基礎