併發基礎知識補全和CAS基本原理
併發基礎知識補全
Callable、Future和FutureTask
在前文(執行緒基礎、執行緒之間的共享與協作)中提到過中,新啟執行緒的方式只有兩種,一種就是擴充套件自Thread
類,然後重寫run()
方法,另一種就是實現Runnable
介面,實現run()
方法。
那麼Callable
介面這種方式,又是怎麼回事呢。我們先來觀察Thread類中的構造方法,並沒有可以接受一個callable這種引數的構造方法。我們使用Callable
的時候,首先要把它包裝成FutureTask
,而它又實現了RunnableFuture
介面。
public class FutureTask<V> implements RunnableFuture<V> {
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public void run() {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
}catch (Throwable ex) {}
}
}
}
而RunnableFuture
介面實際上又是繼承了Runnable
和Future
介面。也就是說到底,Callable
Runnable
交給執行緒去執行。
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
阻塞方法sleep()和wait()的區別
之前提到執行緒生命週期的時候,無論是sleep
還是wait()
,都會進入阻塞狀態,但是如果細分的話,雖然兩者都會暫停執行緒的執行,實際上兩者進入的執行緒狀態是不一樣的。
我們先來看sleep()的原始碼:
/**
* Causes the currently executing thread to sleep (temporarily cease
* execution) for the specified number of milliseconds, subject to
* the precision and accuracy of system timers and schedulers. The thread
* does not lose ownership of any monitors.
*/
public static native void sleep(long millis) throws InterruptedException;
sleep()方法是Thread類中的一個native方法,它會在指定的時間內阻塞執行緒的執行。而且從其註釋中可知,並不會失去對任何監視器(monitors)的所有權,也就是說不會釋放鎖,僅僅會讓出cpu的執行權。
我們再來看wait()的原始碼:
無論是wait()
,還是wait(long timeout, int nanos)
走的都是native的wait()
方法。
/**
* This method should only be called by a thread that is the owner
* of this object's monitor. See the {@code notify} method for a
* description of the ways in which a thread can become the owner of
* a monitor.
*/
public final native void wait(long timeout) throws InterruptedException;
根據註釋可以看出,此方法呼叫的前提是當前執行緒已經獲取了物件監視器monitor的所有權。
該方法會呼叫後不僅會讓出cpu的執行權,還會釋放鎖(即monitor的所有權),並且進入wait set中,直到其他執行緒呼叫notify()
或者notifyall()
方法,或者指定的timeout到了,才會從wait set中出來,並重新競爭鎖。
區別
兩者最主要的區別就是釋放鎖(monitor的所有權)與否,但是兩個方法都會丟擲InterruptedException。
執行緒阻塞BLOCKED和等待WAITING的區別
阻塞BLOCKED:
阻塞表示執行緒在等待物件的monitor鎖,試圖通過synchronized去獲取某個鎖,但是此時其他執行緒已經獨佔了monitor鎖,那麼當前執行緒就會進入等待狀態。
等待WAITING
當前執行緒等待其他執行緒執行某些操作,典型場景就是生產者消費者模式,在任務條件不滿足時,等待其他執行緒的操作從而使得條件滿足。可以通過wait()
方法或者Thread.join()
方法都會使執行緒進入等待狀態。
實際上不用可以區分兩者, 因為兩者都會暫停執行緒的執行。兩者的區別是: 進入WAITING狀態是執行緒主動的, 而進入BLOCKED狀態是被動的。更進一步的說, 進入BLOCKED狀態是在同步(synchronized程式碼之外), 而進入WAITING狀態是在同步程式碼之內。
例如:
synchronized(obj){
obj.wait()
}
在這個同步程式碼塊中,我們通過synchronize關鍵字去獲取obj物件的同步鎖,如果沒有獲取到,這時候被動就會進入BLOCKED狀態。直到獲取到了鎖,從阻塞狀態進入就緒/執行狀態,然後呼叫obj.wait()
,主動進入WAITING狀態進如狀態,直到其他執行緒在同步程式碼塊中呼叫了obj.notify()/obj.notifyAll()
,又會從WAITING狀態進入進入就緒/執行狀態。
死鎖
死鎖是指兩個或兩個以上的程序在執行過程中,由於競爭資源或者由於彼此通訊而造成的一種阻塞的現象,若無外力的作用,它們都將無法推進下去。此時稱系統處於死鎖狀態或系統產生了死鎖。
來看這個例子:
小明和小王都去買餃子皮和餡料,但是它們都各自只有一份,這時候小明搶到了餃子皮,小王搶到了餡料,但他們都各自不肯鬆手,但是隻有一份原料,又都包不成餃子。所以兩個人都只能僵持著,這就是所謂的死鎖。
所以,死鎖有這些特點:
- 多個操作者(M>=2),爭奪多個資源(N>=2),且N<=M
- 爭奪資源的順序不對
- 拿到資源後不放手
用凝練一點的語言來描述,那就是:
- 互斥:一個時間同一個資源只能由一個程序持有
- 持有並等待:程序保持至少一個資源,並等待其他程序持有的額外資源
- 不剝奪:程序持有資源之後不會被其他程序剝奪
- 迴圈等待:程序互相等待各自的資源
我們用一段程式碼來進行死鎖的演示:
/**
* 類說明:演示死鎖的產生
*/
public class DeadLock {
private static Object lock1 = new Object();//第一個鎖
private static Object lock2 = new Object();//第二個鎖
//第一個拿鎖的方法
private static void firstDo() throws InterruptedException {
String threadName = Thread.currentThread().getName();
synchronized (lock1) {
System.out.println(threadName + " get lock-1");
Thread.sleep(100);
synchronized (lock2) {
System.out.println(threadName + " get lock-2");
}
}
}
//第二個拿鎖的方法
private static void secondDo() throws InterruptedException {
String threadName = Thread.currentThread().getName();
synchronized (lock2) {
System.out.println(threadName + " get lock-2");
Thread.sleep(100);
synchronized (lock1) {
System.out.println(threadName + " get lock-1");
}
}
}
private static class TestThread extends Thread {
private String name;
public TestThread(String name) {
this.name = name;
}
@Override
public void run() {
Thread.currentThread().setName(name);
try {
firstDo();
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
Thread.currentThread().setName("Thread A");
TestThread testThread = new TestThread("Thread 2");
testThread.start();
secondDo();
}
}
在實際開發過程中,我們應避免死鎖的產生,因為死鎖會進入一個等待的狀態,並不會丟擲異常,不利於我們查詢錯誤。那麼怎麼來避免死鎖呢?
由於多個操作者來爭搶多個資源,這是由業務邏輯來決定的,所以我們一般從後者入手,比如以正確的順序去爭奪資源,或者拿到資源後允許放手,都可以解決死鎖。
ReentrantLock中的tryLock()
就可以用於死鎖的解決。它可以對顯式鎖嘗試獲取,並返回boolean值。程式碼段可以這麼寫
if (lock.tryLock()) {
try {
//TODO
} finally {
lock.unlock();
}
} else {
// perform alternative actions
}
活鎖
在上文中,我們使用tryLock()
來解決死鎖,但是不正當使用的,有可能會造成活鎖的產生。來看這麼一段程式碼:
/**
*類說明:演示嘗試拿鎖解決死鎖
*/
public class TryLock {
private static Lock lock1 = new ReentrantLock();//第一個鎖
private static Lock lock2 = new ReentrantLock();//第二個鎖
//先嚐試拿lock1 鎖,再嘗試拿lock2鎖,lock1鎖沒拿到,連同lock2鎖一起釋放掉
private static void firstToSecond() throws InterruptedException {
String threadName = Thread.currentThread().getName();
Random r = new Random();
while(true){
if(lock1.tryLock()){
System.out.println(threadName +" get lock1");
try{
if(lock2.tryLock()){
try{
System.out.println(threadName +" get lock2");
System.out.println("firstToSecond do work------------");
break;
}finally{
System.out.println(threadName +" release lock2");
lock2.unlock();
}
}
}finally {
System.out.println(threadName +" release lock1");
lock1.unlock();
}
}
//Thread.sleep(r.nextInt(3));
}
}
//先嚐試拿lock2 鎖,再嘗試拿lock1鎖,lock2鎖沒拿到,連同lock1鎖一起釋放掉
private static void SecondToFirst() throws InterruptedException {
String threadName = Thread.currentThread().getName();
Random r = new Random();
while(true){
if(lock2.tryLock()){
System.out.println(threadName +" get lock2");
try{
if(lock1.tryLock()){
try{
System.out.println(threadName +" get lock1");
System.out.println("SecondToFirst do work------------");
break;
}finally{
System.out.println(threadName +" release lock1");
lock1.unlock();
}
}
}finally {
System.out.println(threadName +" release lock2");
lock2.unlock();
}
}
//Thread.sleep(r.nextInt(3));
}
}
private static class TestThread extends Thread{
private String name;
public TestThread(String name) {
this.name = name;
}
public void run(){
Thread.currentThread().setName(name);
try {
SecondToFirst();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
Thread.currentThread().setName("Thread A");
TestThread testThread = new TestThread("Thread B");
testThread.start();
try {
firstToSecond();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
在我們沒有加上Thread.sleep(r.nextInt(3));
之前,執行緒之間的相互等待的過程可能會被急劇拉長。為什麼呢?比如馬路中間有條小橋,只能容納一輛車經過,橋兩
頭開來兩輛車A和B,A比較禮貌,示意B先過,B也比較禮貌,示意A先過,結果兩人一直謙讓誰也過不去。
在上面的程式碼中,有執行緒A、B,分別去獲取鎖1和2
- A先競爭到1,然後嘗試去競爭2
- B先競爭到2,然後嘗試去競爭1
- A釋放鎖1,B釋放鎖2
- A再競爭到1,然後嘗試去競爭2
- B再競爭到2,然後嘗試去競爭1
這樣一來,A和B執行緒就一直在相互競爭中迴圈等待著,但是跟死活不一樣,進入了死鎖的執行緒,是不進行工作的,而進入活鎖的執行緒,是在忙碌的工作著。而我們加上休眠,可以讓兩個執行緒在競爭鎖的時候,時間錯開一點,避免了活鎖。
CAS基本操作(Compare And Swap)
原子操作
我們都知道,在沒有發現電子、原子核之前,科學界所認識到物質的最小單位就是原子,原子就是不可再分的。那麼反映併發程式設計中,所謂原子操作是指不會被執行緒排程機制打斷的操作;這種操作一旦開始,就一直執行到結束,不會發生上下文的切換。
如何實現原子操作
被synchronized
包圍的程式碼塊,其實就是一個原子操作。但是,使用synchronized
是一個很消耗效能的操作,因為會涉及到執行緒的狀態變化,沒有搶到內建鎖的執行緒,會進入等待佇列裡面並等待。但假如我們的同步程式碼塊裡面只有簡單的i++
,那麼使用synchronized
是不是就太大題小做了,有沒有一種更輕量級的同步機制呢?
為了解決這個問題,在現代CPU裡面,提供了一種Compare And Swap指令,簡稱CAS
CAS指令原理
Compare And Swap,從名字來看,就是包含比較並交換兩個步驟,但是在CPU提供的CAS指令已經包含這兩個操作了,由CPU去保證這兩個步驟是原子的。意思就是說,比較和交換兩個動作,要麼全都完成,要麼就全都不執行。
那麼CAS指令是如何保證執行緒的同步的呢,我們就拿簡單的i++
來看。假如使用synchronized
,那就是誰搶到了鎖,誰就去執行i++
。那麼在CAS指令中,是怎麼操作的呢。
假如i初始值為0,有A~D四個執行緒,都要執行i++
這個操作。首先,四個執行緒都從記憶體裡面取出i=0,然後在自己的方法棧上,進行i++,i變為1,這時候要重新寫回記憶體的時候,同一時間只允許一個執行緒進行操作。假設A執行緒拿到了這個許可權,這時候它會再次從記憶體中取出i的值,如果i==0,則把計算得到的值寫回去,這個執行緒就執行完了,輪到其他執行緒來執行。這時候B執行緒從記憶體中取出i,悲催地發現i已經等於1了,說明i的值已經被人讀寫過了,那麼就應該重新執行i++
。其他執行緒也一樣。
所以說,CAS其實就是不斷重複這個指令(自旋),直到成功為止。
在這裡涉及到了悲觀鎖跟樂觀鎖的概念。在使用synchronized
同步關鍵字的時候,執行緒會悲觀的認為,總有其他執行緒想來害它自己,不如先用鎖把程式碼塊鎖起來,i++
這個過程只有自己能做,直到我自己做完了,才讓別人去接著幹這個事情。而對於CAS來說,執行緒會樂觀的認為,沒人會來改自己的東西,我先把值取出來,先改了再說。但它也不傻,還是會去檢查結果,發現已經被改過,那也沒辦法,只能再來一遍。
CAS指令問題
既然CAS指令在執行效率要高於synchronized
,那麼是不是可以替代synchronized
呢?
答案是不可以,因為CAS存在以下問題:
- ABA問題
- 開銷問題
- 只能保證一個共享變數的原子操作
我們來看以下思考:
假設有執行緒1和執行緒2,並且有一個變數A,對於1來說,它要把A改為B,假設執行緒2跑的更快,它先把A改為C,再改回A。那麼對於執行緒1來說,在執行CAS指令的時候,發現A的值沒有變化吧,並沒有修改過,然後放心的將它改寫為B。
但實際上,這個A已經不是原來的A了,已經被執行緒2修改過。但是CAS操作並沒有辦法去發現。拿一個實際的例子來說,你的水杯裡面裝滿了你最喜歡的可樂,這時候你有事去打了一個電話,你的女朋友口渴了,突然喝了一口,然後她怕被你發現,又將水杯給重新倒滿了。你打完電話回來,一看水杯是滿的,並沒有人喝過,然後繼續高興的吃你的炸雞。這個就是ABA問題。
那麼要怎麼解決ABA問題呢,我們可以加上一個版本戳,每次修改,都會對更新當前變數的版本。
而開銷問題就是說,因為CAS指令是基於自旋來實現的,但是執行緒如果長時間不能成功執行,會給CPU帶來非常大的執行開銷。
CAS指令是通過比較記憶體中某個變數的值,來決定是否能進行交換操作,對於計算機來說,一個地址只能存放一個變數。所以CAS指令一次只能保證一個共享變數的原子操作。假如有ABC三個變數,需要保證它們的讀寫是一個原子操作,那麼CAS就不能實現了,而使用synchronized
就能很好的執行。從Java 1.5開始,JDK提供了AtomicReference類來保證引用物件之間的原子性,就可以把多個變數放在一個物件裡來進行CAS操作。
Java中的原子操作類
Jdk中為我們提供瞭如下相關原子操作類:
- 更新基本型別類:AtomicBoolean,AtomicInteger,AtomicLong
- 更新陣列類:AtomicIngerArray,AtomicLongArray,AtomicReferenceArray
- 更新引用型別:AtomicReference,AtomicMarkableReference,AtomicStampedReference