1. 程式人生 > 其它 >Java中的多執行緒你只要看這一篇就夠了

Java中的多執行緒你只要看這一篇就夠了

如果對什麼是執行緒、什麼是程序仍存有疑惑,請先Google之,因為這兩個概念不在本文的範圍之內。

用多執行緒只有一個目的,那就是更好的利用cpu的資源,因為所有的多執行緒程式碼都可以用單執行緒來實現。說這個話其實只有一半對,因為反應“多角色”的程式程式碼,最起碼每個角色要給他一個執行緒吧,否則連實際場景都無法模擬,當然也沒法說能用單執行緒來實現:比如最常見的“生產者,消費者模型”。

很多人都對其中的一些概念不夠明確,如同步、併發等等,讓我們先建立一個數據字典,以免產生誤會。

多執行緒:指的是這個程式(一個程序)執行時產生了不止一個執行緒
並行與併發:
並行:多個cpu例項或者多臺機器同時執行一段處理邏輯,是真正的同時。
併發:通過cpu排程演算法,讓使用者看上去同時執行,實際上從cpu操作層面不是真正的同時。併發往往在場景中有公用的資源,那麼針對這個公用的資源往往產生瓶頸,我們會用TPS或者QPS來反應這個系統的處理能力。更多的Java進階教程微我:hua2021ei

 

 


併發與並行

執行緒安全:經常用來描繪一段程式碼。指在併發的情況之下,該程式碼經過多執行緒使用,執行緒的排程順序不影響任何結果。這個時候使用多執行緒,我們只需要關注系統的記憶體,cpu是不是夠用即可。反過來,執行緒不安全就意味著執行緒的排程順序會影響最終結果,如不加事務的轉賬程式碼:
void transferMoney(User from, User to, float amount){
to.setMoney(to.getBalance() + amount);
from.setMoney(from.getBalance() - amount);
}
同步:Java中的同步指的是通過人為的控制和排程,保證共享資源的多執行緒訪問成為執行緒安全,來保證結果的準確。如上面的程式碼簡單加入@synchronized關鍵字。在保證結果準確的同時,提高效能,才是優秀的程式。執行緒安全的優先順序高於效能。
好了,讓我們開始吧。我準備分成幾部分來總結涉及到多執行緒的內容:

紮好馬步:執行緒的狀態
內功心法:每個物件都有的方法(機制)
太祖長拳:基本執行緒類
九陰真經:高階多執行緒控制類
紮好馬步:執行緒的狀態
先來兩張圖:

 

 

 

執行緒狀態

 

 

 

執行緒狀態轉換

各種狀態一目瞭然,值得一提的是"Blocked"和"Waiting"這兩個狀態的區別:

執行緒在Running的過程中可能會遇到阻塞(Blocked)情況
對Running狀態的執行緒加同步鎖(Synchronized)使其進入(lock blocked pool ),同步鎖被釋放進入可執行狀態(Runnable)。從jdk原始碼註釋來看,blocked指的是對monitor的等待(可以參考下文的圖)即該執行緒位於等待區。

執行緒在Running的過程中可能會遇到等待(Waiting)情況
執行緒可以主動呼叫object.wait或者sleep,或者join(join內部呼叫的是sleep,所以可看成sleep的一種)進入。從jdk原始碼註釋來看,waiting是等待另一個執行緒完成某一個操作,如join等待另一個完成執行,object.wait()等待object.notify()方法執行。

Waiting狀態和Blocked狀態有點費解,我個人的理解是:Blocked其實也是一種wait,等待的是monitor,但是和Waiting狀態不一樣,舉個例子,有三個執行緒進入了同步塊,其中兩個呼叫了object.wait(),進入了waiting狀態,這時第三個呼叫了object.notifyAll(),這時候前兩個執行緒就一個轉移到了Runnable,一個轉移到了Blocked。

從下文的monitor結構圖來區別:每個 Monitor在某個時刻,只能被一個執行緒擁有,該執行緒就是 “Active Thread”,而其它執行緒都是 “Waiting Thread”,分別在兩個佇列 “ Entry Set”和 “Wait Set”裡面等候。在 “Entry Set”中等待的執行緒狀態Blocked,從jstack的dump中來看是 “Waiting for monitor entry”,而在 “Wait Set”中等待的執行緒狀態是Waiting,表現在jstack的dump中是 “in Object.wait()”。

此外,在runnable狀態的執行緒是處於被排程的執行緒,此時的排程順序是不一定的。Thread類中的yield方法可以讓一個running狀態的執行緒轉入runnable。

內功心法:每個物件都有的方法(機制)
synchronized, wait, notify 是任何物件都具有的同步工具。讓我們先來了解他們

 

monitor

他們是應用於同步問題的人工執行緒排程工具。講其本質,首先就要明確monitor的概念,Java中的每個物件都有一個監視器,來監測併發程式碼的重入。在非多執行緒編碼時該監視器不發揮作用,反之如果在synchronized 範圍內,監視器發揮作用。

wait/notify必須存在於synchronized塊中。並且,這三個關鍵字針對的是同一個監視器(某物件的監視器)。這意味著wait之後,其他執行緒可以進入同步塊執行。

當某程式碼並不持有監視器的使用權時(如圖中5的狀態,即脫離同步塊)去wait或notify,會丟擲java.lang.IllegalMonitorStateException。也包括在synchronized塊中去呼叫另一個物件的wait/notify,因為不同物件的監視器不同,同樣會丟擲此異常。

再講用法:

synchronized單獨使用:
程式碼塊:如下,在多執行緒環境下,synchronized塊中的方法獲取了lock例項的monitor,如果例項相同,那麼只有一個執行緒能執行該塊內容
public class Thread1 implements Runnable {
Object lock;
public void run() {
synchronized(lock){
..do something
}
}
}
直接用於方法: 相當於上面程式碼中用lock來鎖定的效果,實際獲取的是Thread1類的monitor。更進一步,如果修飾的是static方法,則鎖定該類所有例項。
public class Thread1 implements Runnable {
public synchronized void run() {
..do something
}
}
synchronized, wait, notify結合:典型場景生產者消費者問題
/**
* 生產者生產出來的產品交給店員
*/
public synchronized void produce()
{
if(this.product >= MAX_PRODUCT)
{
try
{
wait();
System.out.println("產品已滿,請稍候再生產");
}
catch(InterruptedException e)
{
e.printStackTrace();
}
return;
}

this.product++;
System.out.println("生產者生產第" + this.product + "個產品.");
notifyAll(); //通知等待區的消費者可以取出產品了
}

/**
* 消費者從店員取產品
*/
public synchronized void consume()
{
if(this.product <= MIN_PRODUCT)
{
try
{
wait();
System.out.println("缺貨,稍候再取");
}
catch (InterruptedException e)
{
e.printStackTrace();
}
return;
}

System.out.println("消費者取走了第" + this.product + "個產品.");
this.product--;
notifyAll(); //通知等待去的生產者可以生產產品了
}

volatile

多執行緒的記憶體模型:main memory(主存)、working memory(執行緒棧),在處理資料時,執行緒會把值從主存load到本地棧,完成操作後再save回去(volatile關鍵詞的作用:每次針對該變數的操作都激發一次load and save)。

 

volatile

針對多執行緒使用的變數如果不是volatile或者final修飾的,很有可能產生不可預知的結果(另一個執行緒修改了這個值,但是之後在某執行緒看到的是修改之前的值)。其實道理上講同一例項的同一屬性本身只有一個副本。但是多執行緒是會快取值的,本質上,volatile就是不去快取,直接取值。線上程安全的情況下加volatile會犧牲效能。

太祖長拳:基本執行緒類
基本執行緒類指的是Thread類,Runnable介面,Callable介面
Thread 類實現了Runnable介面,啟動一個執行緒的方法:

  MyThread my = new MyThread();
  my.start();
Thread類相關方法:

//當前執行緒可轉讓cpu控制權,讓別的就緒狀態執行緒執行(切換)
public static Thread.yield()
//暫停一段時間
public static Thread.sleep()
//在一個執行緒中呼叫other.join(),將等待other執行完後才繼續本執行緒。    
public join()
//後兩個函式皆可以被打斷
public interrupte()

關於中斷:它並不像stop方法那樣會中斷一個正在執行的執行緒。執行緒會不時地檢測中斷標識位,以判斷執行緒是否應該被中斷(中斷標識值是否為true)。終端只會影響到wait狀態、sleep狀態和join狀態。被打斷的執行緒會丟擲InterruptedException。
Thread.interrupted()檢查當前執行緒是否發生中斷,返回boolean
synchronized在獲鎖的過程中是不能被中斷的。

中斷是一個狀態!interrupt()方法只是將這個狀態置為true而已。所以說正常執行的程式不去檢測狀態,就不會終止,而wait等阻塞方法會去檢查並丟擲異常。如果在正常執行的程式中新增while(!Thread.interrupted()) ,則同樣可以在中斷後離開程式碼體

Thread類最佳實踐:
寫的時候最好要設定執行緒名稱 Thread.name,並設定執行緒組 ThreadGroup,目的是方便管理。在出現問題的時候,列印執行緒棧 (jstack -pid) 一眼就可以看出是哪個執行緒出的問題,這個執行緒是幹什麼的。

如何獲取執行緒中的異常

 

不能用try,catch來獲取執行緒中的異常

Runnable
與Thread類似

Callable
future模式:併發模式的一種,可以有兩種形式,即無阻塞和阻塞,分別是isDone和get。其中Future物件用來存放該執行緒的返回值以及狀態

ExecutorService e = Executors.newFixedThreadPool(3);
//submit方法有多重引數版本,及支援callable也能夠支援runnable介面型別.
Future future = e.submit(new myCallable());
future.isDone() //return true,false 無阻塞
future.get() // return 返回值,阻塞直到該執行緒執行結束
九陰真經:高階多執行緒控制類
以上都屬於內功心法,接下來是實際專案中常用到的工具了,Java1.5提供了一個非常高效實用的多執行緒包:java.util.concurrent, 提供了大量高階工具,可以幫助開發者編寫高效、易維護、結構清晰的Java多執行緒程式。

1.ThreadLocal類
用處:儲存執行緒的獨立變數。對一個執行緒類(繼承自Thread)
當使用ThreadLocal維護變數時,ThreadLocal為每個使用該變數的執行緒提供獨立的變數副本,所以每一個執行緒都可以獨立地改變自己的副本,而不會影響其它執行緒所對應的副本。常用於使用者登入控制,如記錄session資訊。

實現:每個Thread都持有一個TreadLocalMap型別的變數(該類是一個輕量級的Map,功能與map一樣,區別是桶裡放的是entry而不是entry的連結串列。功能還是一個map。)以本身為key,以目標為value。
主要方法是get()和set(T a),set之後在map裡維護一個threadLocal -> a,get時將a返回。ThreadLocal是一個特殊的容器。

2.原子類(AtomicInteger、AtomicBoolean……)
如果使用atomic wrapper class如atomicInteger,或者使用自己保證原子的操作,則等同於synchronized

//返回值為boolean
AtomicInteger.compareAndSet(int expect,int update)

該方法可用於實現樂觀鎖,考慮文中最初提到的如下場景:a給b付款10元,a扣了10元,b要加10元。此時c給b2元,但是b的加十元程式碼約為:

if(b.value.compareAndSet(old, value)){
return ;
}else{
//try again
// if that fails, rollback and log
}
AtomicReference
對於AtomicReference 來講,也許物件會出現,屬性丟失的情況,即oldObject == current,但是oldObject.getPropertyA != current.getPropertyA。
這時候,AtomicStampedReference就派上用場了。這也是一個很常用的思路,即加上版本號

3.Lock類
lock: 在java.util.concurrent包內。共有三個實現:

ReentrantLock
ReentrantReadWriteLock.ReadLock
ReentrantReadWriteLock.WriteLock
主要目的是和synchronized一樣, 兩者都是為了解決同步問題,處理資源爭端而產生的技術。功能類似但有一些區別。

區別如下:

lock更靈活,可以自由定義多把鎖的枷鎖解鎖順序(synchronized要按照先加的後解順序)
提供多種加鎖方案,lock 阻塞式, trylock 無阻塞式, lockInterruptily 可打斷式, 還有trylock的帶超時時間版本。
本質上和監視器鎖(即synchronized是一樣的)
能力越大,責任越大,必須控制好加鎖和解鎖,否則會導致災難。
和Condition類的結合。
效能更高,對比如下圖:


synchronized和Lock效能對比

ReentrantLock    
可重入的意義在於持有鎖的執行緒可以繼續持有,並且要釋放對等的次數後才真正釋放該鎖。
使用方法是:

1.先new一個例項

static ReentrantLock r=new ReentrantLock();
2.加鎖

r.lock()或r.lockInterruptibly();
此處也是個不同,後者可被打斷。當a執行緒lock後,b執行緒阻塞,此時如果是lockInterruptibly,那麼在呼叫b.interrupt()之後,b執行緒退出阻塞,並放棄對資源的爭搶,進入catch塊。(如果使用後者,必須throw interruptable exception 或catch)

3.釋放鎖

r.unlock()
必須做!何為必須做呢,要放在finally裡面。以防止異常跳出了正常流程,導致災難。這裡補充一個小知識點,finally是可以信任的:經過測試,哪怕是發生了OutofMemoryError,finally塊中的語句執行也能夠得到保證。

ReentrantReadWriteLock

可重入讀寫鎖(讀寫鎖的一個實現)

  ReentrantReadWriteLock lock = new ReentrantReadWriteLock()
  ReadLock r = lock.readLock();
  WriteLock w = lock.writeLock();
兩者都有lock,unlock方法。寫寫,寫讀互斥;讀讀不互斥。可以實現併發讀的高效執行緒安全程式碼

4.容器類
這裡就討論比較常用的兩個:

BlockingQueue
ConcurrentHashMap
BlockingQueue
阻塞佇列。該類是java.util.concurrent包下的重要類,通過對Queue的學習可以得知,這個queue是單向佇列,可以在佇列頭新增元素和在隊尾刪除或取出元素。類似於一個管  道,特別適用於先進先出策略的一些應用場景。普通的queue介面主要實現有PriorityQueue(優先佇列),有興趣可以研究

BlockingQueue在佇列的基礎上添加了多執行緒協作的功能:

 

BlockingQueue

除了傳統的queue功能(表格左邊的兩列)之外,還提供了阻塞介面put和take,帶超時功能的阻塞介面offer和poll。put會在佇列滿的時候阻塞,直到有空間時被喚醒;take在隊 列空的時候阻塞,直到有東西拿的時候才被喚醒。用於生產者-消費者模型尤其好用,堪稱神器。

常見的阻塞佇列有:

ArrayListBlockingQueue
LinkedListBlockingQueue
DelayQueue
SynchronousQueue
** ConcurrentHashMap**
高效的執行緒安全雜湊map。請對比hashTable , concurrentHashMap, HashMap

5.管理類
管理類的概念比較泛,用於管理執行緒,本身不是多執行緒的,但提供了一些機制來利用上述的工具做一些封裝。
瞭解到的值得一提的管理類:ThreadPoolExecutor和 JMX框架下的系統級管理類 ThreadMXBean
ThreadPoolExecutor
如果不瞭解這個類,應該瞭解前面提到的ExecutorService,開一個自己的執行緒池非常方便:

ExecutorService e = Executors.newCachedThreadPool();
ExecutorService e = Executors.newSingleThreadExecutor();
ExecutorService e = Executors.newFixedThreadPool(3);
// 第一種是可變大小執行緒池,按照任務數來分配執行緒,
// 第二種是單執行緒池,相當於FixedThreadPool(1)
// 第三種是固定大小執行緒池。
// 然後執行
e.execute(new MyRunnableImpl());
該類內部是通過ThreadPoolExecutor實現的,掌握該類有助於理解執行緒池的管理,本質上,他們都是ThreadPoolExecutor類的各種實現版本。請參見javadoc:

 

ThreadPoolExecutor引數解釋
翻譯一下:
corePoolSize:池內執行緒初始值與最小值,就算是空閒狀態,也會保持該數量執行緒。
maximumPoolSize:執行緒最大值,執行緒的增長始終不會超過該值。
keepAliveTime:當池內執行緒數高於corePoolSize時,經過多少時間多餘的空閒執行緒才會被回收。回收前處於wait狀態
unit:
時間單位,可以使用TimeUnit的例項,如TimeUnit.MILLISECONDS 
workQueue:待入任務(Runnable)的等待場所,該引數主要影響排程策略,如公平與否,是否產生餓死(starving)
threadFactory:執行緒工廠類,有預設實現,如果有自定義的需要則需要自己實現ThreadFactory介面並作為引數傳入。