1. 程式人生 > 其它 >【Java】JAVA多執行緒和執行緒池

【Java】JAVA多執行緒和執行緒池

1、執行緒狀態

(1) 新建狀態

使用 new 關鍵字和 Thread 類或其子類建立一個執行緒物件後,該執行緒物件就處於新建狀態。它保持這個狀態直到程式 start() 這個執行緒。

(2) 就緒狀態

當執行緒物件呼叫了start()方法之後,該執行緒就進入就緒狀態。就緒狀態的執行緒處於就緒佇列中,要等待JVM裡執行緒排程器的排程。

(3) 執行狀態

如果就緒狀態的執行緒獲取 CPU 資源,就可以執行 run(),此時執行緒便處於執行狀態。處於執行狀態的執行緒最為複雜,它可以變為阻塞狀態、就緒狀態和死亡狀態。

(4) 阻塞狀態

如果一個執行緒執行了sleep(睡眠)、suspend(掛起)等方法,失去所佔用資源之後,該執行緒就從執行狀態進入阻塞狀態。在睡眠時間已到或獲得裝置資源後可以重新進入就緒狀態。可以分為三種:

  • 等待阻塞,執行狀態中的執行緒執行 wait() 方法,使執行緒進入到等待阻塞狀態。wait()釋放鎖
  • 同步阻塞,執行緒在獲取 synchronized 同步鎖失敗(因為同步鎖被其他執行緒佔用)。
  • 其他阻塞,通過呼叫執行緒的 sleep() 或 join() 發出了 I/O 請求時,執行緒就會進入到阻塞狀態。當sleep() 狀態超時,join() 等待執行緒終止或超時,或者 I/O 處理完畢,執行緒重新轉入就緒狀態。sleep()不釋放鎖

(5) 死亡狀態

一個執行狀態的執行緒完成任務或者其他終止條件發生時,該執行緒就切換到終止狀態。

2、執行緒優先順序

每一個 Java 執行緒都有一個優先順序,這樣有助於作業系統確定執行緒的排程順序。Java 執行緒的優先順序是一個整數,其取值範圍是1(Thread.MIN_PRIORITY)到10(Thread.MAX_PRIORITY)。預設情況下,每一個執行緒都會分配一個優先順序 NORM_PRIORITY(5)。具有較高優先順序的執行緒對程式更重要,並且應該在低優先順序的執行緒之前分配處理器資源。但是,執行緒優先順序不能保證執行緒執行的順序,而且非常依賴於平臺。

3、同步工具synchronized、wait、notify

他們是應用於同步問題的人工執行緒排程工具。講其本質,首先就要明確monitor的概念,Java中的每個物件都有一個監視器,來監測併發程式碼的重入。在非多執行緒編碼時該監視器不發揮作用,反之如果在synchronized 範圍內,監視器發揮作用。

wait/notify必須存在於synchronized塊中。並且,這三個關鍵字針對的是同一個監視器(某物件的監視器)。這意味著wait之後,其他執行緒可以進入同步塊執行。

當某程式碼並不持有監視器的使用權時去wait或notify,會丟擲java.lang.IllegalMonitorStateException。也包括在synchronized塊中去呼叫另一個物件的wait/notify,因為不同物件的監視器不同,同樣會丟擲此異常。

synchronized單獨使用:

  • 程式碼塊:如下,在多執行緒環境下,synchronized塊中的方法獲取了lock例項的monitor,如果例項相同,那麼只有一個執行緒能執行該塊內容

public class Thread1 implements Runnable { 
    Object lock; 
    public void run() { 
        synchronized(lock){ 
            //TODO 
        }
    }
}
  • 直接用於方法: 相當於上面程式碼中用lock來鎖定的效果,實際獲取的是Thread1類的monitor。更進一步,如果修飾的是static方法,則鎖定該類所有例項。

public class Thread1 implements Runnable { 
    public synchronized void run() {
        //TODO 
    }
}

多執行緒的記憶體模型:main memory(主存)、working memory(執行緒棧),在處理資料時,執行緒會把值從主存load到本地棧,完成操作後再save回去(volatile關鍵詞的作用:每次針對該變數的操作都激發一次load and save)。

針對多執行緒使用的變數如果不是volatile或者final修飾的,很有可能產生不可預知的結果(另一個執行緒修改了這個值,但是之後在某執行緒看到的是修改之前的值)。其實道理上講同一例項的同一屬性本身只有一個副本。但是多執行緒是會快取值的,本質上,volatile就是不去快取,直接取值。線上程安全的情況下加volatile會犧牲效能。

4、建立執行緒

Java 提供了三種建立執行緒的方法:實現 Runnable 介面、繼承 Thread 類、通過 Callable 和 Future 建立執行緒。

(1) 實現 Runnable 介面

public class Test {
    public static void main(String[] args) { 
        MyRunnable runnable = new MyRunnable(); 
        Thread thread = new Thread(runnable); 
        thread.start();
    } 
} 
class MyRunnable implements Runnable{ 
    public MyRunnable() {
        //TODO
    } 
    @Override public void run() { 
        //TODO
    } 
}

(2) 繼承 Thread 類

public class Test { 
    public static void main(String[] args) { 
        MyThread thread = new MyThread(); 
        thread.start(); 
    } 
} 
class MyThread extends Thread{ 
    public MyThread(){ 
        //TODO
    } 
    @Override public void run() { 
        //TODO
    } 
}

Thread類相關方法:

//當前執行緒可轉讓CPU控制權,讓別的就緒狀態執行緒執行(切換)
public static Thread.yield()
 
//暫停一段時間
public static Thread.slepp()
 
//在一個項城中呼叫other.join(),將等待other執行緒執行完後才繼續本執行緒
public join()
 
//後兩個函式皆可以被打斷
public interrupte()

關於中斷:

它並不像stop方法那樣會中斷一個正在執行的執行緒。執行緒會不時地檢測中斷標識位,以判斷執行緒是否應該被中斷(中斷標識值是否為true)。中斷只會影響到wait狀態、sleep狀態和join狀態。被打斷的執行緒會丟擲InterruptedException。Thread.interrupted()檢查當前執行緒是否發生中斷,返回boolean。synchronized在獲鎖的過程中是不能被中斷的。

中斷是一個狀態!interrupt()方法只是將這個狀態置為true而已。所以說正常執行的程式不去檢測狀態,就不會終止,而wait等阻塞方法會去檢查並丟擲異常。

interrupt():設定當前中斷標誌位為true;

interrupted():檢查當前執行緒是否發生中斷(即中斷標誌位是否為true)

設定中斷標誌位後,只能通過wait()、sleep()、join()判斷標誌位,若標誌位為true,會丟擲InterruptedException異常,捕獲異常後,手動中斷執行緒或進行其他操作。

不能使用try/catch來捕獲異常!需使用自定義異常處理器捕獲異常,步驟如下:

1.定義異常處理器。實現 Thread.UncaughtExceptionHandler的uncaughtException方法

//第一步:定義符合執行緒異常處理器規範的"異常處理器",實現Thread.UncaughtExceptionHandler規範
 
class MyUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler{
    //Thread.UncaughtExceptionHandler.uncaughtException()會線上程因未捕獲的異常而臨近死亡時被呼叫
    @Override
    public void uncaughtException(Thread t, Throwable e) {
        System.out.println("caught:"+e);
    }
}

2.定義使用該異常處理器的執行緒工廠

//第二步:定義執行緒工廠。執行緒工廠用來將任務附著給執行緒,並給該執行緒繫結一個異常處理器
 
class HanlderThreadFactory implements ThreadFactory{
    @Override
    public Thread newThread(Runnable r) {
        System.out.println(this+"creating new Thread");
        Thread t = new Thread(r);
        System.out.println("created "+t);
        t.setUncaughtExceptionHandler(new MyUncaughtExceptionHandler()); //設定執行緒工廠的異常處理器
        System.out.println("eh="+t.getUncaughtExceptionHandler());
        return t;
    }
}

3.定義一個任務,讓其丟擲一個異常

//第三步:我們的任務可能會丟擲異常。顯示的丟擲一個exception
 
class ExceptionThread implements Runnable{
    @Override
    public void run() {
        Thread t = Thread.currentThread();
        System.out.println("run() by "+t);
        System.out.println("eh = "+t.getUncaughtExceptionHandler());
        throw new RuntimeException();
    }
}

4.呼叫實驗

//第四步:使用執行緒工廠建立執行緒池,並呼叫其execute方法
 
public class ThreadExceptionUncaughtExceptionHandler{
    public static void main(String[] args){
        ExecutorService exec = Executors.newCachedThreadPool(new HanlderThreadFactory());
        exec.execute(new ExceptionThread());
    }
}

(3) 通過 Callable 和 Future 建立執行緒

上述兩種建立執行緒的方法,在執行完任務之後無法獲取執行結果。如果需要獲取執行結果,就必須通過共享變數或者使用執行緒通訊的方式來達到效果。而Callable和Future可以在任務執行完畢之後得到任務執行結果。通過以下四種方法建立執行緒:

建立 Callable 介面的實現類,並實現 call() 方法,該 call() 方法將作為執行緒執行體,並且有返回值。

建立 Callable 實現類的例項,使用 FutureTask 類來包裝 Callable 物件,該 FutureTask 物件封裝了該 Callable 物件的 call() 方法的返回值。

使用 FutureTask 物件作為 Thread 物件的 target 建立並啟動新執行緒。

呼叫 FutureTask 物件的 get() 方法來獲得子執行緒執行結束後的返回值。

5、三種建立方式的區別

(1) 繼承Thread類建立的執行緒可以擁有自己獨立的類成員變數,但是實現Runnable介面建立執行緒共享實現介面類的成員變數。兩中方式建立執行緒都要重寫run方法,run方法是執行緒的執行體。(Thread和runnable均可以實現單獨資源和共享資源)

Eg.

a、Thread類:

啟動兩個執行緒,每個執行緒擁有單獨的成員變數

class MyThread extends Thread{
    //TODO 
} 
new MyThread().start(); 
new MyThread().start();

啟動兩個執行緒,兩個執行緒共享成員變數

MyThread m = new MyThread(); 
new Thread(m).start(); 
new Thread(m).start();

b、Runnable介面:

啟動兩個執行緒,共同享有成員變數

class MyThread implements Runnable { 
    //TODO
}
 
MyThread m = new MyThread(); 
new Thread(m).start(); 
new Thread(m).start();

啟動兩個執行緒,每個執行緒擁有單獨的成員變數

MyThread myThread = new MyThread(); 
MyThread myThread2 = new MyThread(); 
new Thread(myThread).start(); 
new Thread(myThread2).start();

(2) 在繼承Thread類建立執行緒中可以通過使用this獲得當前執行緒的物件,但是在實現Runnable介面建立執行緒的途徑中可以使用Thread.currentThread()方式來獲得當前執行緒。

(3) 第三種方式是較為複雜的一種。Callable介面是一個與Runnable介面十分相似的介面。在Runnable中run方法為執行緒的執行體,但是在Callable介面中call方法是執行緒的執行體。下面是兩個介面實現執行體的不同:

call方法有返回值,但是run方法沒有

call方法可以宣告丟擲異常

所以可以說Callable介面是Runnable介面的增強版本。

(4) FutureTask類實現了Runnable和Future介面。和Callable一樣都是泛型。

(5) Future介面是對Callable任務的執行結果進行取消,查詢是否完成,獲取結果的。下面是這個介面的幾個重要方法:

boolean cancel(boolean myInterruptRunning),試圖取消Future與Callable關聯的任務

V get(), 返回Callable任務中call方法的返回值,呼叫該方法會導致程式阻塞,必須等到子執行緒結束才會有返回值。這裡V表示泛型

V get(long timeout, TimeUnit unit), 返回Callable中call方法的返回值,該方法讓程式最多阻塞timeout毫秒的時間,或者直到unit時間點。如果在指定的時間Callable的任務沒有完成就會丟擲異常TimeoutEexception

boolean isCancelled(), 如果Callable中的任務被取消,則返回true,否則返回false

boolean isDone(),如果Callable中的任務被完成,則返回true,否則返回false

6、執行緒池

(1) 什麼是執行緒池

執行緒池,其實就是一個容納多個執行緒的容器,其中的執行緒可以反覆使用,省去了頻繁建立執行緒物件的操作,無需反覆建立執行緒而消耗過多資源

(2) 為什麼要有執行緒池

在java中,如果每個請求到達就建立一個新執行緒,開銷是相當大的。在實際使用中,建立和銷燬執行緒花費的時間和消耗的系統資源都相當大,甚至可能要比在處理實際的使用者請求的時間和資源要多的多。除了建立和銷燬執行緒的開銷之外,活動的執行緒也需要消耗系統資源。如果在一個jvm裡建立太多的執行緒,可能會使系統由於過度消耗記憶體或"切換過度"而導致系統資源不足。為了防止資源不足,需要採取一些辦法來限制任何給定時刻處理的請求數目,儘可能減少建立和銷燬執行緒的次數,特別是一些資源耗費比較大的執行緒的建立和銷燬,儘量利用已有物件來進行服務。

(3) 執行緒池可以幹什麼

執行緒池主要用來解決執行緒生命週期開銷問題和資源不足問題。通過對多個任務重複使用執行緒,執行緒建立的開銷就被分攤到了多個任務上了,而且由於在請求到達時執行緒已經存在,所以消除了執行緒建立所帶來的延遲。這樣,就可以立即為請求服務,使用應用程式響應更快;另外,通過適當的調整執行緒中的執行緒數目可以防止出現資源不足的情況。

(4) 執行緒池的建立

執行緒池都是通過執行緒池工廠建立,再呼叫執行緒池中的方法獲取執行緒,再通過執行緒去執行任務方法。

Executors:執行緒池建立工廠類

public static ExecutorService newFixedThreadPool(int nThreads):返回執行緒池物件

ExecutorService:執行緒池類

Future<?> submit(Runnable task):獲取執行緒池中的某一個執行緒物件,並執行

Future 介面:用來記錄執行緒任務執行完畢後產生的結果。執行緒池建立與使用

a、使用Runnable介面建立執行緒池

  • 建立執行緒池物件

  • 關閉執行緒池

  • 提交 Runnable 介面子類物件

  • 建立 Runnable 介面子類物件

public static void main(String[] args) {
    //建立執行緒池物件  引數5,代表有5個執行緒的執行緒池
    ExecutorService service = Executors.newFixedThreadPool(5);
 
    //建立Runnable執行緒任務物件
    TaskRunnable task = new TaskRunnable();
        
    //從執行緒池中獲取執行緒物件
    service.submit(task);
    System.out.println("----------------------");
        
    //再獲取一個執行緒物件
    service.submit(task);
        
    //關閉執行緒池
    service.shutdown();
}

ExecutorService:執行緒池類

<T> Future<T> submit(Callable<T> task):獲取執行緒池中的某一個執行緒物件,並執行執行緒中的 call() 方法

Future 介面:用來記錄執行緒任務執行完畢後產生的結果。

b、使用Callable介面建立執行緒池

執行緒池建立與使用

  • 建立執行緒池物件

  • 建立 Callable介面子類物件

  • 提交 Callable介面子類物件

  • 關閉執行緒池

public static void main(String[] args) {
       
    ExecutorService service = Executors.newFixedThreadPool(3);
    TaskCallable c = new TaskCallable();
        
    //執行緒池中獲取執行緒物件,呼叫run方法
    service.submit(c);
        
    //再獲取一個
    service.submit(c);
        
    //關閉執行緒池
    service.shutdown();
}

7、執行緒池的實現類(ThreadPoolExecutor)

ThreadPoolExecutor 類繼承了 AbstractExecutorService 類,而 AbstractExecutorService 類實現了 ExecutorService 介面。所以上述執行緒池建立的方法可以將建立的執行緒池(例如newFixedThreadPool)賦給 ExecutorService。

ThreadPoolExecutor 建構函式如下:

ThreadPoolExecutor ( int corePoolSize, // 執行緒池中的執行緒數量
 
                    int maximumPoolSize, // 執行緒池中的最大執行緒數量
 
                    long keepAliveTime, // 當執行緒池執行緒數量超過corePoolsize時,多餘的空閒執行緒的存活時間,即超過corePoolSize的空閒執行緒,在keepAliveTime時間內會被銷燬
                    
                    TimeUnit unit, // keepAliveTime的單位
 
                    BlockingQueue<Runnable> workQueue, // 任務佇列,被提交但尚未被執行的任務
 
                    ThreadFactory threadFactory, // 執行緒工廠,用於建立執行緒,一般用預設的執行緒工廠即可
 
                    RejectedExecutionHandler handler) // 拒絕策略。當任務太多來不及處理時,如何拒絕任務

關鍵引數:workQueue

(1) 直接提交佇列

由 SynchronousQueue 實現,一種無緩衝的等待佇列。

SynchronousQueue 沒有容量,即沒有等待佇列,總是將新任務提交給執行緒去執行,當沒有空閒執行緒時,就新增一個執行緒,當執行緒數達到最大值maximumPoolSize時,即無法再新增執行緒時,則執行拒絕策略。

(2) 有界的任務佇列

由 ArrayBlockingQueue 實現,其內部維護了一個定長陣列,用於儲存佇列,其內部還儲存著兩個整形變數,分別標識著佇列的頭部和尾部在陣列中的位置。ArrayBlockingQueue 在生產者放入資料和消費者獲取資料時,都是共用同一個鎖物件,由此也意味著兩者無法真正並行執行,這點尤其不同於 LinkedBlockingQueue;按照實現原理來分析,ArrayBlockingQueue 完全可以採用分離鎖,從而實現生產者和消費者操作的完全並行執行。之所以沒這樣去做,也許是因為 ArrayBlockingQueue 的資料寫入和獲取操作已經足夠輕巧,以至於引入獨立的鎖機制,除了給程式碼帶來額外的複雜性外,其在效能上完全佔不到任何便宜。 ArrayBlockingQueue 和LinkedBlockingQueue 間還有一個明顯的不同之處在於,前者在插入或刪除元素時不會產生或銷燬任何額外的物件例項,而後者則會生成一個額外的 Node 物件。這在長時間內需要高效併發地處理大批量資料的系統中,其對於GC的影響還是存在一定的區別。而在建立ArrayBlockingQueue 時,我們還可以控制物件的內部鎖是否採用公平鎖,預設採用非公平鎖。

當有新任務需要執行時,如果執行緒池的實際執行緒數小於 corePoolSize,則會新增一個執行緒,若大於 corePoolSize,則會將新任務加入等待佇列。若佇列已滿,則在匯流排程數不大於maximumPoolSize 的前提下,新增一個執行緒,若大於 maximumPoolSize,則執行拒絕策略。也就是說,只有當等待佇列滿了的時候,才可能將執行緒數增加到 corePoolSize 以上。也就是說,除非系統非常繁忙,否則執行緒數量基本維持在 corePoolSize。

(3) 無界的任務佇列

由 LinkedBlockingQueue 實現,其內部維護了一個連結串列(如果沒有指定長度,則預設容量為無窮大),LinkedBlockingQueue 之所以能夠高效的處理併發資料,是因為其對於生產者端和消費者端分別採用了獨立的鎖來控制資料同步,這也意味著在高併發的情況下生產者和消費者可以並行地操作佇列中的資料,以此來提高整個佇列的併發效能。作為開發者,我們需要注意的是,如果構造一個 LinkedBlockingQueue 物件,而沒有指定其容量大小,LinkedBlockingQueue 會預設一個類似無限大小的容量(Integer.MAX_VALUE),這樣的話,如果生產者的速度一旦大於消費者的速度,也許還沒有等到佇列滿阻塞產生,系統記憶體就有可能已被消耗殆盡了。

與有界佇列相比,除非系統資源耗盡,否則無界佇列不存在任務入隊失敗的情況,即無界佇列的長度是無窮大。當有新的任務需要執行時,若執行緒池的實際數量小於corePoolSize,則會新增一個執行緒,且執行緒池的最大執行緒數為corePoolSize。若生產者的速度遠小於消費者的速度,則等待佇列會快速增長,直至系統資源耗盡。

(4) 優先任務佇列

由 PriorityBlockingQueue 實現,其內部維護了一個數組,優先順序的判斷通過建構函式傳入的 Comparator 物件來決定,需要注意的是 PriorityBlockingQueue 並不會阻塞資料生產者,而只會在沒有可消費的資料時,阻塞資料的消費者。因此使用的時候要特別注意,生產者生產資料的速度絕對不能快於消費者消費資料的速度,否則時間一長,會最終耗盡所有的可用堆記憶體空間。在實現 PriorityBlockingQueue 時,內部控制執行緒同步的鎖採用的是公平鎖。

這是一個有優先順序的無界佇列。

(5) 幾種常見的包裝執行緒池類

newFixedThreadPool:設定 corePoolSize 和 maximumPoolSize 相等,使用無界的任務佇列(LinkedBlockingQueue)

newSignalThreadExecutor:newFixedThreadPool 的一種特殊形式,即 corePoolSize為1

newCachedThreadPool:設定 corePoolSize 為0,maximumPoolSize 為無窮大,使用直接提交佇列(SynchronousQueue)

(6) 拒絕策略

AbortPolicy:直接丟擲異常,阻止系統正常工作

CallerRunsPolicy:由呼叫執行緒直接執行當前任務,可能會造成任務提交執行緒(即呼叫執行緒)的效能急劇下降

DiscardOldestPolicy:丟棄等待佇列頭的一個任務,並再次提交該任務

DiscardPolicy:丟棄提交任務,即什麼都不做