1. 程式人生 > >java並發基礎

java並發基礎

nts his isf 排序 工具類 call() pre 類庫 空間

  《java並發編程實戰》終於讀完4-7章了,感觸很深,但是有些東西還沒有吃透,先把已經理解的整理一下。《java並發編程實戰》筆記(一)是對前3章的總結。這裏總結一下第5章的東西,為什麽跳過第4章?不告訴你。

一,阻塞隊列和生產者-消費者模式

  java中的阻塞隊列提供了可阻塞的put和take方法,以及支持定時的offer和poll方法。如果隊列已經滿了,那麽put方法將阻塞直到有空間可用;如果隊列為空,那麽take方法將會阻塞直到有元素可用。類庫中包含了BlockingQueue的多種實現,其中,LinkedBlockingQueue和ArrayBlockingQueue是FIFO隊列(先進先出),PriorityBlockingQueue是一個按優先級排序的隊列。

  阻塞隊列支持生產者-消費者模式,該模式將“找出需要完成的工作”和“執行工作”這兩個過程分離開來;並把工作項放入一個“待完成”列表中以便在隨後處理,而不是找出後立即處理。當數據生成時,生產者把數據放入隊列,而當消費者準備處理數據時,將從隊列中獲取數據。生產者不需要知道消費者的標識和數量,或者它們是否是唯一的生產者,而只需將數據放入隊列即可。同樣,消費者也不需要知道生產者是誰,來自何處。

  舉個例子說明阻塞隊列和生產者-消費者模式如何配合使用:在某個文件層次結構中搜索所有的.java文件。

import java.io.File;
import java.io.FileFilter;
import
java.util.concurrent.BlockingQueue; /** * @描述:生產者 * @author 肖冬 */ public class FileCrawler implements Runnable { private final BlockingQueue<File> fileQueue ; private final FileFilter fileFilter; private final File root; public FileCrawler(BlockingQueue<File> fileQueue,FileFilter fileFilter, File root) {
super(); this.fileQueue = fileQueue; this.fileFilter = fileFilter; this.root = root; } @Override public void run() { try { crawl(root); } catch (InterruptedException e) { //使線程恢復中斷狀態,為什麽這麽做:因為線程一旦拋出中斷異常,就會重置中斷狀態 Thread.currentThread().interrupt(); } } private void crawl(File root) throws InterruptedException{ File[] entries = root.listFiles(fileFilter); if (entries !=null) { for (File entry : entries) { if (entry.isDirectory()) { crawl(entry); }else{ //生產者將任務放到阻塞隊列 fileQueue.put(entry); } } } } }
import java.io.File;
import java.util.concurrent.BlockingQueue;
 
/**
 * @描述:消費者
 * @author 肖冬
 */
public class Indexer implements Runnable {
    private final BlockingQueue<File> queue;
     
    public Indexer(BlockingQueue<File> queue) {
        this.queue = queue;
    }
 
    @Override
    public void run() {
        try{
            while (true) {
                indexFile(queue.take());
            }
        }catch(InterruptedException e){
            Thread.currentThread().interrupt();
        }
         
    }
     
    private void indexFile(File f){
        System.out.println(f.getName());
    }
     
}
import java.io.File;
import java.io.FileFilter;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
 
import org.junit.Test;
 
/**
 * @描述:junit測試
 * @author 肖冬
 */
public class TestMain {
     
    @Test
    public void testFileFilter(){
        File root = new File("D://test");
        FileFilter fileFilter = new FileFilter() {
            @Override
            public boolean accept(File f) {
                if (f.isDirectory())  return true;
                if (f.isFile()) {
                    String name = f.getName();
                    if (name.endsWith(".java")) return true;
                    else return false;
                }
                return false;
            }
        };
        BlockingQueue<File> queue = new LinkedBlockingQueue<File>();
        //生產者
        new Thread(new FileCrawler(queue,fileFilter, root)).start();
        //消費者
        new Thread(new Indexer(queue)).start();
    }
}

  生產者將符合條件的文件放入阻塞隊列,消費者處理任務時只從阻塞隊列中去任務就可以了,不必關心生產者。

  生產者-消費者模式同樣能帶來許多性能優勢。生產者和消費者可以並發的執行。如果一個是I/O密集型,另一個是CPU密集型,那麽並發執行的吞吐率要高於串行執行的吞吐率。

二,同步工具類

  同步工具類可以是任何一個對象,只要它根據自身的狀態來協調線程間的控制流,比如阻塞隊列,應為take和put等方法將阻塞,直到隊列達到期望的狀態(隊列即非空,也非滿)。

  除了阻塞隊列,其他類型的同步容器還包括:閉鎖,柵欄,信號量。

  1.閉鎖

  我自己理解閉鎖的意思,它可以讓某些動作一起開始,比如,讓多個線程一起執行,並等待最慢的線程執行完畢。代碼如下:

import java.util.concurrent.CountDownLatch;
 
public class Harness {
    public long timeTasks(int nThreads,final Runnable task)
            throws InterruptedException {
        final CountDownLatch startGate = new CountDownLatch(1);
        final CountDownLatch endGate = new CountDownLatch(nThreads);
         
        for (int i = 0; i < nThreads; i++) {
            Thread t = new Thread(){
                @Override
                public void run() {
                    try {
                        startGate.await();
                        try {
                            task.run();
                        } finally {
                            endGate.countDown();
                        }
                    } catch(InterruptedException e){}
                }
            };
             
            t.start();
        }
         
        long startTime = System.nanoTime();
        startGate.countDown();
        endGate.await();
        long endTime = System.nanoTime();
         
        return endTime - startTime;
         
    }
}

  例子中的CountDownLatch是一種靈活的閉鎖實現,它可以使一個或者多個線程等待一組事件發生。閉鎖包括一個計數器,該計數器被初始化為一個正數,表示需要等待的事件數量。countDown方法遞減計數器,表示有一個事件已經發生了,而await方法等待計數器達到零,這表示所有需要等待的事件都已經發生。如果計數器的值非零,那麽await會一直阻塞直到計數器為零,或等待中的線程中斷,或等待超時。countDown和await方法是配合使用的。

  Harness創建一定數量的線程,利用它們並發的執行指定任務,它使用兩個閉鎖,分別表示起始門和結束門,起始門的初始值為1,而結束門的計數器初始值為工作線程的數量,每個線程首先要做的事就是在啟動門上等待,確保所有線程同時執行。而每個線程要做的最後一件事是調用結束門的countDown方法減1,使主線程高效的等待直到所有線程都執行完成,然後統計耗時。

  2.信號量

  計數信號量用來控制同時訪問某個特定資源的操作數量,或者同時執行某個指定操作的數量。

  例子:有固定大小的HashSet,如果容器滿了,容器將處於阻塞狀態。

import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Semaphore;
 
public class BoundedHashSet<T> {
    private final Set<T> set;
    private final Semaphore sem;
     
    public BoundedHashSet(int bound) {
        this.set = Collections.synchronizedSet(new HashSet<T>());
        sem = new Semaphore(bound);
    }
     
    public boolean add(T o) throws InterruptedException{
        sem.acquire();//獲得一個許可
        boolean wasAdded = false;
        try {
            wasAdded = set.add(o);
            return wasAdded;
        } finally{
            if (!wasAdded) {
                sem.release();//將許可返回給信號量
            }
        }
    }
     
    public boolean remove(T o){
        boolean wasRemoved = set.remove(o);
        if (wasRemoved) {
            sem.release();//將許可返回給信號量
        }
        return wasRemoved;
    }
     
}

  Semaphore中管理著一組虛擬的許可,許可的虛擬數量可通過構造函數指定,在執行操作時可以首先獲得許可(只要還有剩余的許可),並在使用後返還許可。如果沒有許可,那麽acquire方法將阻塞,直到有許可為止(或者直到被中斷或者操作超時),release方法將返還一個許可給信號量。所以例子中的如果容器滿了,再add元素的時候出現的阻塞狀態,實際上是Semaphore的acquire處於阻塞狀態,將線程掛起,直到信號量中有新的許可為止。

三,構建高效且可伸縮的結果緩存

  幾乎所有的服務器應用程序都會使用某種形式的緩存,重用之前的計算結果能降低延遲,提高吞吐量,但卻需要消耗更多的內存。我們從簡單的HashMap開始,然後分析它的並發性缺陷,並討論如何修復它們。

public interface Computable<A, V> {
    V compute(A arg) throws InterruptedException;
}
import java.math.BigInteger;
 
public class ExpensiveFunction implements Computable<String, BigInteger> {
    @Override
    public BigInteger compute(String arg) throws InterruptedException {
        //這裏模擬耗時的計算
        return new BigInteger(arg);
    }
}
import java.util.HashMap;
import java.util.Map;
 
public class Memoizer1<A, V> implements Computable<A, V> {
    private final Map<A, V> cache = new HashMap<A, V>();
    private Computable<A, V> c;
     
     
    public Memoizer1(Computable<A, V> c) {
        super();
        this.c = c;
    }
 
 
    @Override
    public synchronized V compute(A arg) throws InterruptedException {
        V result = cache.get(arg);
        if (result == null) {
            result = c.compute(arg);
            cache.put(arg,result);
        }
         
        return result;
         
    }
}

  Memoizer1給出了第一種嘗試:使用HashMap保存之前的結果,compute方法將首先檢查需要的結果是否已經在緩存中,如果存在則返回之前計算的值,否則,計算結果並保存到緩存中,再返回。

  HashMap不是線程安全的,因此要確保兩個線程不會同時訪問HashMap,Memoizer1用了一種保守的方法,即對整個compute方法進行同步,這種方法能確保線程安全性,但會帶來一個明顯的可伸縮性的問題:每次只有一個線程能執行compute方法。如果另一個線程正在計算結果,那麽其他調用compute的線程可能會被阻塞很長時間。如果有多個線程在排隊等待還未計算出的結果,那麽,compute方法的計算時間可能比沒有使用緩存的計算時間更長。

  

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
 
public class Memoizer2<A, V> implements Computable<A, V> {
    private final Map<A, V> cache = new ConcurrentHashMap<A, V>();
    private Computable<A, V> c;
     
     
    public Memoizer2(Computable<A, V> c) {
        super();
        this.c = c;
    }
 
 
    @Override
    public  V compute(A arg) throws InterruptedException {
        V result = cache.get(arg);
        if (result == null) {
            result = c.compute(arg);
            cache.put(arg,result);
        }
         
        return result;
         
    }
}

  Memoizer2用ConcurrentHashMap替換HashMap,用於解決多線程訪問緩存的安全性。為什麽用ConcurrentHashMap而不是HashTable,因為ConcurrentHashMap的並發能力比HashTable強。

但是還有一個問題:如果某個線程啟動了一個開銷很大的計算,而其他線程並不知道這個計算正在進行。那麽很可能會重復這個計算。我們希望通過某種方式來表達“線程X正在進行計算f(27)”這種情況,這樣當另一個線程查找f(27)時,它能夠直到最高效的方法是等待線程X計算結束,然後去緩存中直接拿結果,怎麽解決這個問題呢?有一個類能基本實現這個功能:FutureTask。

  FutureTask表示一個計算的過程,這個過程可能已經計算完成,也可能正在進行。如果有結果可用,那麽FutureTask.get將立即返回結果,否則它會一直阻塞,直到結果計算出來再將其返回。

import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
 
public class Memoizer3<A, V> implements Computable<A, V> {
    private final Map<A, Future<V>> cache = new ConcurrentHashMap<A, Future<V>>();
    private Computable<A, V> c;
     
     
    public Memoizer3(Computable<A, V> c) {
        super();
        this.c = c;
    }
 
 
    @Override
    public  V compute(final A arg) throws InterruptedException {
        Future<V> f = cache.get(arg);
        if (f == null) {
            Callable<V> eval = new Callable<V>() {
                @Override
                public V call() throws InterruptedException {
                    return c.compute(arg);
                }
            };
             
            FutureTask<V> ft = new FutureTask<V>(eval);
            f = ft;
            cache.put(arg, f);
            ft.run();//在這裏調用c.compute
        }
         
        try {
            return f.get();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
         
        return null;
    }
}

  Memoizer3解決了上面提到的問題。若結果已經計算出來,那麽將立即返回,如果其他線程正在計算該結果,那麽新的線程將一直等待這個結果被計算出來。它只有一個缺陷,即仍然可能存在兩個線程同時計算一個相同結果的情況。但是概率已經小了很多了。之所以還會發生,是因為put以後沒有進行再判斷,每個線程只要判斷f==null就立刻開始創建任務。

mport java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
 
public class Memoizer4<A, V> implements Computable<A, V> {
    private final ConcurrentHashMap<A, Future<V>> cache = new ConcurrentHashMap<A, Future<V>>();
    private Computable<A, V> c;
     
     
    public Memoizer4(Computable<A, V> c) {
        super();
        this.c = c;
    }
 
 
    @Override
    public  V compute(final A arg) throws InterruptedException {
        Future<V> f = cache.get(arg);
        if (f == null) {
            Callable<V> eval = new Callable<V>() {
                @Override
                public V call() throws InterruptedException {
                    return c.compute(arg);
                }
            };
             
            FutureTask<V> ft = new FutureTask<V>(eval);
             
            f = cache.putIfAbsent(arg, f);
            if (f== null) {
                f = ft;
                ft.run();//在這裏調用c.compute
            }
        }
         
        try {
            return f.get();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
         
        return null;
    }
}

  Memoizer4解決了這個問題。至此,支持並發的耗時計算結果的緩存寫完了,當然Memoizer4還有其他的問題,比如緩存清理的問題,但是已經可以用了。

java並發基礎