1. 程式人生 > >java並行程式開發及優化

java並行程式開發及優化

一、執行緒池
1. 無限制執行緒的缺陷

 - 執行緒的建立和關閉需要花費時間。
 - 執行緒本身也會佔用記憶體空間,大量的執行緒回收加重GC的壓力。
 - 不能有效的控制和管理正在執行的執行緒。
因此在生產環境需要用執行緒池對執行緒進行控制和管理。

2.JDK執行緒池框架
jdk提供的executor執行緒池框架

 - ThreadPoolExecutor表示一個執行緒池
 - Executors表示一個執行緒池工廠
 - ThreadPoolExecutor實現了Executor介面,因此任何Runnable物件都可以被它排程

2.1.Executors執行緒工廠

 - newFixedThreadPool():返回一個固定執行緒數量的執行緒池,若有空閒執行緒,優先使用空閒執行緒,否則加入等待佇列等待空閒執行緒。
 - newSingleThreadExecutor():只有一個執行緒的執行緒池。
 - newCachedThreadPool():根據實際情況調整執行緒數量的執行緒池,若有空閒執行緒,優先使用空閒執行緒,否則建立執行緒執行任務。
 - newScheduledThreadPool():執行定時任務或週期任務的執行緒池。

2.2.ThreadPoolExecutor執行緒池
2.2.1執行緒池構造方法引數描述

這裡寫圖片描述

 - corePoolSize:池中常規執行緒數量
 - maximumPoolSize:池中的最大執行緒數量
 - keepAliveTime:池中執行緒數量大於corePoolSize時,多餘的空閒執行緒存活時間
 - unit:keepAliveTime的單位,比如秒
 - workQueue:任務佇列,儲存被提交但未被執行的任務
 - threadFactory:執行緒工廠,用於建立執行緒
 - handler:拒絕策略,當任務太多來不及處理時,如何拒絕任務

2.2.2執行緒池的任務佇列

任務佇列,儲存被提交但未被執行的任務,是一個BlockingQueue介面物件。

直接提交佇列:SynchronousQueue,沒有容量,不儲存任務,總是將任
務提交給執行緒,如果沒有空閒執行緒,則建立執行緒,如果執行緒數量達到最大值則執行拒絕策略。

有界任務佇列:ArrayBlockingQueue,其構造方法引數表示佇列最大容量。
如果池中執行緒數量小於corePoolSize則建立新執行緒,若等於corePoolSize則放入佇列,當佇列已滿時,
則在匯流排程小於池的最大容量的前提下建立新執行緒;若匯流排程大於池的最大容量則執行拒絕策略。

無界任務佇列:LinkedBlockingQueue,如果池中執行緒數量小於corePoolSize則建立新執行緒,
若大於corePoolSize則放入佇列。當池中執行緒數量等於corePoolSize後,不會再建立執行緒。

優先任務佇列:PriorityBlockingQueue,根據任務的優先順序順序先後執行。

2.3.執行緒池的拒絕策略

執行緒池的拒絕策略,當任務數量超過系統實際承載能力時,改如何處理。jdk內建拒絕策略(RejectedExecutionHandler):

 - AbortPolicy:直接丟擲執行時異常,阻止程式執行。
 - CallerRunsPolicy:只要執行緒池未關閉,讓呼叫者執行緒執行當前被丟棄的任務,然後提交任務。這種方式不會有任務被丟棄。
 - DiscardOldestPolicy:只要執行緒池未關閉,丟棄最老的一個任務,並提交當前任務。
 - DiscardPolicy:丟棄無法處理的任務,不予任何處理,即丟棄當前需要提交的任務。

2.4ThreadPoolExecutor執行緒池的擴充套件

執行緒池提供瞭如下3個方法,通過擴充套件這3個方法,可以實現對執行緒池執行狀態的跟蹤。

 - beforeExecute():執行緒池中某執行緒執行前執行
 - afterExecute():執行緒池中某執行緒執行後執行
 - terminated():執行緒池關閉時呼叫執行

二、併發資料結構

1、併發list

 - Vector:相關方法都是同步方法
 - Collections.synchronizedList(List list):相關方法都採用同步塊方式
 - CopyOnWriteArrayList:讀沒有加鎖,寫操作時,先加鎖,再獲取內部陣列的副本,
 在副本上新增元素,然後把副本寫回,最後釋放鎖;但在構造方法中,沒有使用鎖,因此可以將一個
 執行緒非安全的list構造進來。適用於讀多寫少的場景。
 內部陣列是由關鍵字volatile修飾,volatile關鍵字的作用見後面章節。

2、併發set

 - Collections.synchronizedSet(Set set):相關方法都採用同步塊方式
 - CopyOnWriteArraySet:內部採用CopyOnWriteArrayList實現

3、併發map

 - HashTable:相關方法都是同步方法
 - Collections.synchronizedMap(Map map):相關方法都採用同步塊方式
 - ConcurrentHashMap:讀不加鎖;內部將整個集合分成N(預設16)個小集合(段),
 每個段一把鎖。當向集合寫資料的時候,首先判斷該key的hash值在那個段中,再獲得該段的鎖,
 最後插入資料。多個put同時操作時,只要其key的hash值不在同一個段中,就可以併發寫入資料。
 如果是size方法,則要依次對所有段加鎖。

4、併發queue

- ConcurrentLinkedQueue:內部採用無鎖的方式實現高併發狀態下的高效能。
- BlockingQueue:阻塞式佇列,主要是簡化多執行緒間的資料共享。如生產者消費者模式下的佇列。
常用的BlockingQueue主要有ArrayBlockingQueue和LinkedBlockingQueue。

這裡寫圖片描述
5、併發deque
LinkedBlockingDeque:內部使用連結串列實現,每個節點都有一個前驅和後驅節點,並且沒有進行讀寫鎖的分離,因此同時只有一個執行緒對其操作,效能低於LinkedBlockingQueue,更低於ConcurrentLinkedQueue。

三、併發控制方法

1、volatile
在Java中,每個執行緒有一塊工作記憶體區,儲存主記憶體中的變數的值的拷貝。線上程執行過程中,對變數的修改實際上是對工作記憶體區中變數的修改,修改完成後再寫入主記憶體中。每個執行緒改變工作記憶體區的資料時,對其他執行緒來說是不可見的。可能有些朋友會問這樣做多此一舉,其實不是,這樣做能提高系統性能。因為這裡的工作記憶體區不是JVM中的堆疊,而可能是cpu的快取記憶體。
volatile,使所有執行緒均讀寫主記憶體中的對應變數,其他執行緒對變數的修改,可以及時反映在當前執行緒中,當前執行緒對變數的修改,可以及時寫回主記憶體中,並被其他執行緒所見,使用volatile的變數,jvm會保證其的有序性。
volatile解決了執行緒間共享變數的可見性問題,而不能解決執行緒同步問題,並且使用volatile會增加效能開銷。
最佳實踐:使用atomic包下的原子型別實現執行緒間變數的共享。

2、synchronized內部鎖
synchronized方法:則呼叫物件必須獲得當前物件(this)的鎖。如果一個物件有多個synchronized方法,只要一個執行緒訪問了其中的一個synchronized方法,其它執行緒不能同時訪問這個物件中任何一個synchronized方法。不同的物件例項的synchronized方法是不相干擾的。
synchronized(ob){}:呼叫物件必須獲得ob物件的鎖,原理和synchronized方法一致。
synchronized靜態方法:則呼叫物件必須獲得當前Class物件鎖。對類的所有物件例項起作用。
synchronized關鍵字不能被繼承。多執行緒間的互動:wait(),notify()。

    打個比方:一個object就像一個大房子,大門永遠開啟。
    房子裡有 很多房間(也就是方法)。這些房間有上鎖的(synchronized方法), 和不上鎖之分(普通方法)。
    房門口放著一把鑰匙(key),這把鑰匙可以開啟所有上鎖的房間。
    另外我把所有想呼叫該物件方法的執行緒比喻成想進入這房子某個房間的人。
    一個人想進入某間上了鎖的房間,他來到房子門口,看見鑰匙在那兒(說明暫時還沒有其他人要使用上鎖的房間)。
    於是他走上去拿到了鑰匙,並且按照自己的計劃使用那些房間。注意一點,他每次使用完一次上鎖的房間後會馬上把鑰匙還回去。
    即使他要連續使用兩間上鎖的房間,中間他也要把鑰匙還回去,再取回來。因此,鑰匙的使用原則是:“隨用隨借,用完即還。”
    這時其他人可以不受限制的使用那些不上鎖的房間,一個人用一間可以,兩個人用一間也可以,沒限制。
    但是如果當某個人想要進入上鎖的房間,他就要跑到大門口去看看了。有鑰匙當然拿了就走,沒有的話,就只能等了。
    要是很多人在等這把鑰匙,等鑰匙還回來以後,誰會優先得到鑰匙?這個就不確定了

3、ReentrantLock重入鎖
synchronized缺點:不能被中斷、不能定時(萬一方法內部是死迴圈,那麼就沒辦法讓其中斷)。
ReentrantLock還提供了公平和非公平兩種鎖。公平鎖可以保證在鎖的等待佇列裡的執行緒是公平的,不會出現插隊的情況,對鎖的獲取總是先進先出,而非公平鎖不做這個保證,可能會存在插入的情況。公平鎖的實現代價比非公平鎖要高,因此從效能角度而言,非公平鎖要好,因此在沒有特殊要求的情況下,應該使用非公平鎖。即在構造方法的引數裡,用false來建立非公平鎖。
在使用ReentrantLock鎖時,一定要注意在最後釋放鎖,釋放鎖一般寫在finally裡。而synchronized,JVM在最後會自動釋放鎖。
ReentrantLock相關方法:

 - lock():獲得鎖,如果鎖被使用,則等待
 - lockInterruptibly():獲得鎖,但優先響應中斷。
 - tryLock():嘗試獲得鎖,如果獲得鎖返回true,否則返回false
 - tryLock(long timeout,TimeUnit unit):在給定的時間範圍內嘗試獲得鎖。
 - unlock():釋放鎖,一般寫在finally中

4、ReadWriteLock讀寫鎖
內部鎖或者重入鎖不管是讀還是寫,都需要獲得鎖,所有讀寫都是序列。而ReadWriteLock允許多個執行緒同時讀,寫寫、讀寫直接需要加鎖。ReadWriteLock在讀多寫少的情況下,比重入鎖效能更高。讀寫鎖示例如下:

 private Object object;  
    private ReadWriteLock lock = new ReentrantReadWriteLock();  
    private Lock readlock= lock.readLock(); 
    private Lock writelock=lock.writeLock();

    public void get() {  
        readlock.lock();  
        System.out.println(Thread.currentThread().getName() + "準備讀資料!!");  

        try {  
            Thread.sleep(new Random().nextInt(1000));  
            System.out.println(Thread.currentThread().getName() + "讀資料為:" + this.object);  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        } finally {  
            readlock.unlock();  
        }  
    }  

    public void put(Object object) {  
        writelock.lock();  
        System.out.println(Thread.currentThread().getName() + "準備寫資料");  

        try {  
            Thread.sleep(new Random().nextInt(1000));  
            this.object = object;  
            System.out.println(Thread.currentThread().getName() + "寫資料為" + this.object);  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        } finally {  
            writelock.unlock();  
        }  
    }  

5、Condition
Condition是與鎖相關聯的,通過Lock介面的newCondition方法生成一個與鎖繫結的Condition示例。重入鎖、讀寫鎖都可以使用。Condition物件與鎖的關係,就像wait、notify和synchronized的關係。Condition相關方法:

 - await():當前執行緒等待,並釋放鎖。其他執行緒signal時,當前執行緒獲得鎖繼續執行。若當前執行緒中斷則跳出等待。
 - awaitUninterruptibly():不會在等待過程中響應中斷
 - signal():喚醒一個在等待的執行緒,signalAll()喚醒所有等待的執行緒。

示例程式碼:

private  final Lock lock=new ReentrantLock();
private  final Condition notFull=lock.newCondition();
private  final Condition notEmpty=lock.newCondition();
private int maxSize;
private List<Date> storage;

public void put()  {
    lock.lock();
    try {   
        while (storage.size() ==maxSize ){//如果佇列滿了
            System.out.print(Thread.currentThread().getName()+": wait \n");;
            notFull.await();//阻塞生產執行緒
        }
        storage.add(new Date());
        System.out.print(Thread.currentThread().getName()+": put:"+storage.size()+ "\n");
        Thread.sleep(1000);         
        notEmpty.signalAll();//喚醒消費執行緒
    } catch (InterruptedException e) {

    }finally{   
        lock.unlock();
    }
}

public  void take() {       
    lock.lock();
    try {  
        while (storage.size() ==0 ){//如果佇列滿了
            System.out.print(Thread.currentThread().getName()+": wait \n");;
            notEmpty.await();//阻塞消費執行緒
        }
        Date d=((LinkedList<Date>)storage).poll();
        System.out.print(Thread.currentThread().getName()+": take:"+storage.size()+ "\n");
        Thread.sleep(1000);         
        notFull.signalAll();//喚醒生產執行緒

    } catch (InterruptedException e) {

    }finally{
        lock.unlock();
    }
} 

6、Semaphore訊號量
java鎖的缺點:一次都只允許一個執行緒訪問一個資源。
Semaphore:是對鎖的擴充套件,可以指定多個執行緒同時訪問某個資源。一般用它和鎖(內部鎖或重入鎖)來構建物件池。

Semaphore相關方法:

 - Semaphore(int permits, boolean fair):同時能申請多少個許可,指定是否公平。
 - acquire():申請一個許可,若無法獲得,則執行緒等待,直到有執行緒釋放許可或當前執行緒被中斷。
 - tryAcquire():嘗試獲得許可。
 - release():是否一個許可。

示例程式碼:

  private static final int SEM_MAX = 10;
public static void main(String[] args) { 
    Semaphore sem = new Semaphore(SEM_MAX);
    //建立執行緒池
    ExecutorService threadPool = Executors.newFixedThreadPool(3);
    //線上程池中執行任務
    threadPool.execute(new MyThread(sem, 5));
    threadPool.execute(new MyThread(sem, 4));
    threadPool.execute(new MyThread(sem, 7));
    //關閉池
    threadPool.shutdown();
    }
  }
  class MyThread extends Thread {
    private volatile Semaphore sem;    // 訊號量
    private int count;        // 申請訊號量的大小 

    MyThread(Semaphore sem, int count) {
        this.sem = sem;
        this.count = count;
    }

    public void run() {
        try {
            // 從訊號量中獲取count個許可
            sem.acquire(count);

            Thread.sleep(2000);
            System.out.println(Thread.currentThread().getName() + " acquire count="+count);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 釋放給定數目的許可,將其返回到訊號量。
            sem.release(count);
            System.out.println(Thread.currentThread().getName() + " release " + count + "");
    }
}

7、ThreadLocal執行緒區域性變數

ThreadLocal為變數在每個執行緒中都建立了一個副本,那麼每個執行緒可以訪問自己內部的副本變數。一般線上程的某個階段,將值放入ThreadLocal中,則線上程的整個生命週期中,都可以獲得ThreadLocal對應的值,但需要注意的是,如果是執行緒池中的執行緒,則線上程run方法結束時,一般在finally中將其設定為空。

四、鎖優化、無鎖的使用

1、鎖優化

- 減小鎖持有的時間:鎖程式碼塊
- 減小鎖的粒度:ConcurrentHashmap內部分為N個段,每個段再用鎖進行控制
- 讀寫分離鎖代替獨佔鎖,如利用讀寫鎖而不是內部鎖或重入鎖
- 鎖分離:LinkedBlockingQueue中的讀和取分別用了2個重入鎖,一個控制讀一個控制寫,
這是因為對佇列的操作,讀和寫都會改變佇列裡的資料。這樣讀和取就可以併發執行

2、無鎖的使用
在jdk的java.util.concurrent.atomic包下有一組使用無鎖演算法實現的原子操作類,如:AtomicInteger。它採用CAS演算法實現,是執行緒安全的類,可放心使用。
CAS演算法:CAS(V,E,N),V表示要更新的值,E表示預期值,N表示新值。僅當V=E時,才將V的值設為E,最後返回當前V的真實值。