1. 程式人生 > >java高併發程式設計總結三:JDK併發包之ReentrantLock重入鎖

java高併發程式設計總結三:JDK併發包之ReentrantLock重入鎖

為了更好的支援併發程式,jdk內部提供了大量實用的API和框架,重入鎖就是一種對同步的擴充套件

ReentrantLock起源

在1.5的時候,synchronized關鍵的效能不是很好,這也是concurrent併發包出現的一種潛在原因,而新出現的ReentrantLock重入鎖的效能那時比synchronized好太多,也提供了更加靈活、細粒度的同步操作。(在jdk1.6開始,jdk在synchronized上做了大量的優化,使得兩者差距並不大,並且併發包出錯性高,通常使用synchronized即可)

lock/unlock

重入鎖是synchronized功能的一種擴充套件,它和synchronized一樣能同步執行方法或者程式碼塊,除此之外,它還能指定釋放鎖物件。
通過lock方法來加鎖,通過unlock來解鎖,這兩個方法簽名如下:
lock
public void lock()
獲取鎖。
如果該鎖沒有被另一個執行緒保持,則獲取該鎖並立即返回,將鎖的保持計數設定為 1。
如果當前執行緒已經保持該鎖,則將保持計數加 1,並且該方法立即返回。
如果該鎖被另一個執行緒保持,則出於執行緒排程的目的,禁用當前執行緒,並且在獲得鎖之前,該執行緒將一
直處於休眠狀態,此時鎖保持計數被設定為 1。



public void unlock();
釋放重入鎖,並將保持計數減1

lockInterruptibly相應中斷

對於synchronized關鍵字來說,如果一個執行緒在等待鎖,那麼它就只有兩種情況:獲得鎖繼續執行/保持等待。而對於重入鎖來說,它還有另外一種可能,就是被中斷:也就是說在等待鎖的過程中,程式可以根據需要取消對鎖的請求。這裡主要使用了ReentrantLock物件的lockInterruptibly方法。
lockInterruptibly
public void lockInterruptibly() throws InterruptedException
1)如果當前執行緒未被中斷,則獲取鎖。 

2)如果該鎖沒有被另一個執行緒保持,則獲取該鎖並立即返回,將鎖的保持計數設定為 1。 

3)如果當前執行緒已經保持此鎖,則將保持計數加 1,並且該方法立即返回。 

4)如果鎖被另一個執行緒保持,則出於執行緒排程目的,禁用當前執行緒,並且在發生以下兩種情況之一以
前,該執行緒將一直處於休眠狀態: 
     1)鎖由當前執行緒獲得;或者 

     2)其他某個執行緒中斷當前執行緒。 

5)如果當前執行緒獲得該鎖,則將鎖保持計數設定為 1。 
   如果當前執行緒: 
       1)在進入此方法時已經設定了該執行緒的中斷狀態;或者 

       2)在等待獲取鎖的同時被中斷。 

   則丟擲 InterruptedException,並且清除當前執行緒的已中斷狀態。 


6)在此實現中,因為此方法是一個顯式中斷點,所以要優先考慮響應中斷,而不是響應鎖的普通獲取或
重入獲取。
lockInterruptibly()和上面的第一種情況是一樣的, 執行緒在請求lock並被阻塞時,如果被interrupt,則“此執行緒會被喚醒並被要求處理InterruptedException”。並且如果執行緒已經被interrupt,再使用lockInterruptibly的時候,此執行緒也會被要求處理interruptedException

立即返回的加鎖方式:tryLock

重入鎖還有一個加鎖的方法:tryLock(),該方法簽名及介紹如下:
tryLock    public boolean tryLock()

//還有一個帶引數執行的tryLock,接受兩個引數:等待時長和計時單位,超過
指定時間後還沒有獲得鎖就會返回false,沒有引數的tryLock會立即返回

僅在呼叫時鎖未被另一個執行緒保持的情況下,才獲取該鎖。 

1)如果該鎖沒有被另一個執行緒保持,並且立即返回 true 值,則將鎖的保持計數設定為 1。
即使已將此鎖設定為使用公平排序策略,但是呼叫 tryLock() 仍將 立即獲取鎖(如果有可用的),
而不管其他執行緒當前是否正在等待該鎖。在某些情況下,此“闖入”行為可能很有用,即使它會打破公
平性也如此。如果希望遵守此鎖的公平設定,則使用 tryLock(0, TimeUnit.SECONDS) 
,它幾乎是等效的(也檢測中斷)。 

2)如果當前執行緒已經保持此鎖,則將保持計數加 1,該方法將返回 true3)如果鎖被另一個執行緒保持,則此方法將立即返回 false 值。

方法總結

使用lock和unlock可以很容易的操作鎖的鎖定和釋放,從簽名來看,他們不會丟擲中斷異常,因此也就不能響應中斷,只有單純的鎖獲得與所釋放,如下測試程式碼
public static class thread extends Thread{

        public static ReentrantLock lock = new ReentrantLock();
        public void run(){
            lock.lock();
            while(true){
                System.out.println(i);
                Thread.currentThread().interrupt();
                //lock.unlock();
            }
            //lock.unlock();
        }
    }

建立並啟動上面的執行緒,會發現它將一直執行下去,而不會中斷,加入第一個將會在執行時
報錯:java.lang.IllegalMonitorStateException,因為lock只有一次,而unlockwhile迴圈了
而加入第二個將會在編譯器就報錯,因為while無限迴圈,後面的程式碼將沒有執行的機會
使用lockInterruptibly方法可以解決類似問題:執行緒A需要先佔用lock1,在佔用lock2,而執行緒B需要先佔用lock2,再佔用lock1,這裡如果簡單的使用lock/unlock的話很容易形成兩個執行緒之間的相互等待,所以可以使用lockInterruptibly響應中斷:如果當前執行緒在等待的時候被中斷就會取消對鎖的請求而丟擲InterruptedException中斷異常,如下實現程式碼:
public class Main{
    public static void main(String[] args) throws InterruptedException{
        thread test1 = new thread(1);
        thread test2 = new thread(2);

        test1.start();
        test2.start();
        //test2.interrupt();
        test1.join();
        test2.join();
        System.out.println("執行完成");
    }


    //使用重入鎖ReentrantLock:synchronized的擴充套件
    public static ReentrantLock lock1 = new ReentrantLock();
    public static ReentrantLock lock2 = new ReentrantLock();

    public static class thread extends Thread{

        //指定需要獲得的鎖的序號1/2
        public int lock ;
        public thread(int lock){
            this.lock = lock;
        }

        public void run(){
            System.out.println("starting...");
            try{
                if(lock == 1){
                    lock1.lockInterruptibly();
                    try{
                        Thread.sleep(2000);
                    }
                    catch(Exception e){
                        e.printStackTrace();
                        System.out.println("sleep exception...");
                    }
                    lock2.lockInterruptibly();
                }
                else{
                    lock2.lockInterruptibly();
                    try{
                        Thread.sleep(2000);
                    }
                    catch(Exception e){
                        e.printStackTrace();
                        System.out.println("sleep exception...");
                    }
                    lock1.lockInterruptibly();
                }
            }
            catch(Exception e){
                e.printStackTrace();
                System.out.println("interrupting...");
            }
            finally{
                if(lock1.isHeldByCurrentThread())
                    lock1.unlock();
                if(lock2.isHeldByCurrentThread())
                    lock2.unlock();
                System.out.println("thread is running over....");
            }
        }
    }
}

在不加上test2.interrupt()進行中斷時執行發現它們將會處於阻塞狀態,因為
都在等對方控制的鎖資源unlock

而加上了該行程式碼將會全部執行完成,不過test1是正常執行完畢,test2是丟擲異常跳出
test2響應了中斷,取消了對鎖資源的等待,丟擲了InterruptedException異常
而此時,如果將其改成lock將會一直處於等待狀態
這裡介紹下tryLock帶引數的使用方式:
public class Main{
    public static void main(String[] args) throws InterruptedException{
        trythread test1 = new trythread(1);
        trythread test2 = new trythread(2);
        test1.start();
        test2.start();
    }


    //使用重入鎖ReentrantLock:synchronized的擴充套件
    public static ReentrantLock lock1 = new ReentrantLock();
    public static ReentrantLock lock2 = new ReentrantLock();

    public static class trythread extends Thread{

        public int lock;
        public trythread(int lock){
            this.lock = lock;
        }
        public void run(){
            try{
                if(lock == 1){
                    if(!lock1.tryLock()){
                        System.out.println("lock1 trylock failure in lock==1");
                        return;
                    }
                    try{
                        Thread.sleep(1000);
                    }
                    catch(Exception e){
                        e.printStackTrace();
                        System.out.println("sleep exception in lock==1...");
                    }
                    if(!lock2.tryLock()){
                        System.out.println("lock2 trylock failure in lock==1");
                        return ;
                    }
                }
                else{
                    if(!lock2.tryLock()){
                        System.out.println("lock2 trylock failure in lock==2");
                        return;
                    }
                    try{
                        Thread.sleep(1000);
                    }
                    catch(Exception e){
                        e.printStackTrace();
                        System.out.println("sleep exception in lock==2...");
                    }
                    if(!lock1.tryLock()){
                        System.out.println("lock1 trylock failure in lock==2");
                        return ;
                    }
                }
            }
            catch(Exception e){
                e.printStackTrace();
                System.out.println("run exception...");
            }
            finally{
                if(lock1.isHeldByCurrentThread()){
                    lock1.unlock();
                }
                if(lock2.isHeldByCurrentThread()){
                    lock2.unlock();
                }
                if(lock == 1)
                    System.out.println("running over bye in lock==1");
                else
                    System.out.println("running over bye in lock==2");
            }
        }
    }
}

上面程式碼輸出結果為:
lock2 trylock failure in lock==1
running over bye in lock==1
lock1 trylock failure in lock==2
running over bye in lock==2
表示test1無法獲得lock2的鎖,test2無法獲得lock1的鎖,不過在這裡使用的是
tryLock方法,兩者不會僵持,會立刻返回true/false,不會阻塞執行緒

公平鎖

大多數情況下,鎖的申請都是非公平的,多個執行緒對鎖資源的請求會進入競爭狀態,系統會從該鎖的等待佇列中隨機挑選一個。而對於公平鎖而言,它會保證執行緒的先來先得,保證所有的執行緒都要進行排隊,不管高優先順序還是低優先順序。建立公平鎖可以使用ReentrantLock的一個構造器
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}
如下是使用公平鎖的測試程式碼
public class Main{
    public static void main(String[] args) throws InterruptedException{
        fairthread t1 = new fairthread("test1");
        fairthread t2 = new fairthread("test2");
        fairthread t3 = new fairthread("test3");
        fairthread t4 = new fairthread("test4");
        t1.start();
        t2.start();
        t3.start();
        t4.start();
    }

    public static ReentrantLock fairlock = new ReentrantLock();
    //public static ReentrantLock fairlock = new ReentrantLock(true);

    public static class fairthread extends Thread{

        public fairthread(String name){
            super(name);
        }

        public void run(){
            fairlock.lock();
            System.out.println("my name is "+Thread.currentThread().getName());
            fairlock.unlock();
        }
    }
}


不使用公平鎖的時候四個執行緒會隨機執行,
使用公平鎖的時候不管執行多少次,總是按照執行緒的啟動順序執行
公平鎖看起來很優美,不過其實現還是需要增大系統的開銷:實現公平鎖必然需要系統維護一個有序佇列(先進先出),導致其實現成本比較高,效能也相對比較低,所以預設鎖是非公平的

Condition條件類

Condition條件類是ReentrantLock鎖的好搭檔,他們兩個之間的關係其實和之前文章介紹的Object.wait()和Object.notify()是一樣的。通過ReentrantLock的newCondition方法可以建立對應的Condition物件,其方法簽名如下
public ConditionObject newCondition()

ConditionObject是Condition的一個實現類,其類宣告為:
public class ConditionObject implements Condition, java.io.Serializable {...}
通過呼叫ReentrantLock物件的newCondition方法可以獲得對應的Condition。Condition類中提供了很多的方法,方法簽名列表如下:
public void await() throws InterruptedException;
該方法會使當前執行緒等待,同時釋放當前鎖lock,當其他執行緒呼叫signal/signalAll時,
執行緒才能重新進入就緒狀態,或者當執行緒被interrupted中斷時,
才能跳出等待狀態並丟擲interruptedException異常

public void awaitUninterruptibly();
它和await方法基本相同,但是它並不會在等待過程中響應中斷事件

public long awaitNanos(long nanouTimeout) throws InterruptedException;


public boolean await(long time, TimeUnit unit) throws InterruptedException;
等待指定時間,指定兩個引數,分別是等待時間和時間單位


public boolean awaitUntil(Date deadline) throws InterruptedException;



public void signal();
用於隨機從等待佇列中喚醒一個執行緒使之進入就緒狀態


public void signalAll();
用於喚醒等待佇列中的所有執行緒,使他們都進入就緒狀態

Condition實現原理

await方法的實現程式碼:
public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter(); // 將當前執行緒包裝下後,
                                      // 新增到Condition自己維護的一個連結串列中。
    int savedState = fullyRelease(node);// 釋放當前執行緒佔有的鎖

    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {// 釋放完畢後,遍歷AQS的佇列,看當前節點是否在佇列中,
        // 不在 說明它還沒有競爭鎖的資格,所以繼續將自己沉睡。
        // 直到它被加入到佇列中,聰明的你可能猜到了,
        // 沒有錯,在singal的時候加入不就可以了?
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 被喚醒後,重新開始正式競爭鎖,同樣,如果競爭不到還是會將自己沉睡,等待喚醒重新開始競爭。
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}
當執行緒呼叫await進入等待狀態時,它會進入到Condition內部維護的一個表中,這個表用於儲存等待執行緒;然後,他會在獲得鎖之前一直進入while迴圈狀態,知道獲得該鎖跳出迴圈,跳出迴圈時只是處於了就緒狀態,還沒有真正獲得物件鎖。可以看到while後面還有一系列獲得鎖判斷的操作
而signal方法的實現程式碼如下:
public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter; // firstWaiter為condition自己維護的一個連結串列的頭結點,
                              // 取出第一個節點後開始喚醒操作
    if (first != null)
        doSignal(first);
}
在Condition內部其實一直維護了等待佇列的頭部結點和尾部結點,該佇列的作用就是用於存放等待執行緒佇列,
public class ConditionObject implements Condition, java.io.Serializable{
    public static final long serialVersionUID = 117398...L;
    public static final Node firstWaiter;
    public static final Node lastWaiter;
}
await/signal的本質和wait/notify是一樣的,都是用於等待和喚醒,不過區別在於前者是針對ReentrantLock鎖物件,而後者是處於synchronized同步程式碼塊/方法中使用的
lock/unlock/await/signal四個方法的執行順序為:lock->await->signal->unlock;

參考文獻

java高併發程式設計第三章