1. 程式人生 > >java 併發之LockSupport

java 併發之LockSupport

LockSupport

LockSupport提供park()和unpark()方法實現阻塞執行緒和解除執行緒阻塞,實現的阻塞和解除阻塞是基於”許可(permit)”作為關聯,permit相當於一個訊號量(0,1),預設是0. 執行緒之間不再需要一個Object或者其它變數來儲存狀態,不再需要關心對方的狀態.

wait()和notify()、notifyAll()的比較

public static void main(String[] args) {
       final Object lock = new Object();
       ExecutorService service = Executors.newCachedThreadPool();
       service.submit(new
Runnable() { @Override public void run() { try { TimeUnit.SECONDS.sleep(1); System.out.println("進入等待"); synchronized (lock) { lock.wait(); } System.out
.println("結束等待"); } catch (InterruptedException e) { e.printStackTrace(); } } }); service.submit(new Runnable() { @Override public void run() { synchronized (lock){ lock.notifyAll(); System.out
.println("通知所有執行緒結束等待"); } } }); while(!service.isShutdown()); }

測試結果:

通知所有執行緒結束等待
進入等待

測試完,會發現執行緒並沒有結束等待。如果我們使用LockSupport,則不會出現這種情況。

final Thread t1 = new Thread() {
            @Override
            public void run() {
                try {
                    TimeUnit.SECONDS.sleep(1);
                    System.out.println("進入等待");
                    LockSupport.park(this);
                    System.out.println("結束等待");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        t1.start();
        final Thread t2 = new Thread() {
            @Override
            public void run() {
                LockSupport.unpark(t1);
                System.out.println("通知結束等待");
            }
        };
        t2.start();

        while(t1.isAlive() || t2.isAlive());
        System.out.println("程式碼結束");

測試結果:

通知結束等待
進入等待
結束等待
程式碼結束

LockSupport阻塞和解除阻塞執行緒直接操作的是Thread,而Object的wait/notify它並不是直接對執行緒操作,它是被動的方法,它需要一個object來進行執行緒的掛起或喚醒.
Thead在呼叫wait之前, 當前執行緒必須先獲得該物件的監視器(synchronized),被喚醒之後需要重新獲取到監視器才能繼續執行.而LockSupport可以隨意進行park或者unpark.

原理解釋

LockSupport是用來建立鎖和其他同步類的基本執行緒阻塞原語。LockSupport 提供park()和unpark()方法實現阻塞執行緒和解除執行緒阻塞,LockSupport和每個使用它的執行緒都與一個許可(permit)關聯。permit相當於1,0的開關,預設是0,呼叫一次unpark就加1變成1,呼叫一次park會消費permit, 也就是將1變成0,同時park立即返回。再次呼叫park會變成block(因為permit為0了,會阻塞在這裡,直到permit變為1), 這時呼叫unpark會把permit置為1。每個執行緒都有一個相關的permit, permit最多隻有一個,重複呼叫unpark也不會積累。

如果呼叫執行緒被中斷,則park方法會返回。同時park也擁有可以設定超時時間的版本。

需要特別注意的一點:park 方法還可以在其他任何時間“毫無理由”地返回,因此通常必須在重新檢查返回條件的迴圈裡呼叫此方法。從這個意義上說,park 是“忙碌等待”的一種優化,它不會浪費這麼多的時間進行自旋,但是必須將它與 unpark 配對使用才更高效。

三種形式的 park 還各自支援一個 blocker 物件引數。此物件線上程受阻塞時被記錄,以允許監視工具和診斷工具確定執行緒受阻塞的原因。(這樣的工具可以使用方法 getBlocker(java.lang.Thread) 訪問 blocker。)建議最好使用這些形式,而不是不帶此引數的原始形式。在鎖實現中提供的作為 blocker 的普通引數是 this。

原始碼解讀

LockSupport中主要的兩個成員變數

// Hotspot implementation via intrinsics API
    private static final sun.misc.Unsafe UNSAFE;
    private static final long parkBlockerOffset;

unsafe:全名sun.misc.Unsafe可以直接操控記憶體,被JDK廣泛用於自己的包中,如java.nio和java.util.concurrent。但是不建議在生產環境中使用這個類。因為這個API十分不安全、不輕便、而且不穩定。

LockSupport的方法底層都是呼叫Unsafe的方法實現。

再來看parkBlockerOffset:
parkBlocker就是第一部分說到的用於記錄執行緒被誰阻塞的,用於執行緒監控和分析工具來定位原因的,可以通過LockSupport的getBlocker獲取到阻塞的物件

 static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> tk = Thread.class;
            parkBlockerOffset = UNSAFE.objectFieldOffset
                (tk.getDeclaredField("parkBlocker"));
        } catch (Exception ex) { throw new Error(ex); }
 }

從這個靜態語句塊可以看的出來,先是通過反射機制獲取Thread類的parkBlocker欄位物件。然後通過sun.misc.Unsafe物件的objectFieldOffset方法獲取到parkBlocker在記憶體裡的偏移量,parkBlockerOffset的值就是這麼來的.

JVM的實現可以自由選擇如何實現Java物件的“佈局”,也就是在記憶體裡Java物件的各個部分放在哪裡,包括物件的例項欄位和一些元資料之類。 sun.misc.Unsafe裡關於物件欄位訪問的方法把物件佈局抽象出來,它提供了objectFieldOffset()方法用於獲取某個欄位相對 Java物件的“起始地址”的偏移量,也提供了getInt、getLong、getObject之類的方法可以使用前面獲取的偏移量來訪問某個Java 物件的某個欄位。

為什麼要用偏移量來獲取物件?幹嗎不要直接寫個get,set方法。多簡單?
仔細想想就能明白,這個parkBlocker就是線上程處於阻塞的情況下才會被賦值。執行緒都已經阻塞了,如果不通過這種記憶體的方法,而是直接呼叫執行緒內的方法,執行緒是不會迴應呼叫的。

LockSupport的其他變數

private static final long SEED;
private static final long PROBE;
private static final long SECONDARY;
static {
     try {
         UNSAFE = sun.misc.Unsafe.getUnsafe();
         Class<?> tk = Thread.class;
         parkBlockerOffset = UNSAFE.objectFieldOffset
             (tk.getDeclaredField("parkBlocker"));
         SEED = UNSAFE.objectFieldOffset
             (tk.getDeclaredField("threadLocalRandomSeed"));
         PROBE = UNSAFE.objectFieldOffset
             (tk.getDeclaredField("threadLocalRandomProbe"));
         SECONDARY = UNSAFE.objectFieldOffset
             (tk.getDeclaredField("threadLocalRandomSecondarySeed"));
     } catch (Exception ex) { throw new Error(ex); }
 }

都是Thread類中的記憶體偏移地址,主要用於ThreadLocalRandom類進行隨機數生成,它要比Random效能好很多,可閱讀了解詳情。

LockSupport中的方法

park方法

public static void park(Object blocker) {
    Thread t = Thread.currentThread();
    setBlocker(t, blocker);
    UNSAFE.park(false, 0L);
    setBlocker(t, null);
}

先獲取當前執行緒,設定當前執行緒的parkBlocker欄位(parkBlocker), 呼叫Unsafe類的park方法,最後再次呼叫setBlocker,為什麼呢?

因為當前執行緒首先設定好parkBlocker欄位後再呼叫Unsafe的park方法,之後,當前執行緒已經被阻塞,等待unpark方法被呼叫, unpark方法被呼叫,該執行緒獲得許可後,可以繼續進行下面的程式碼,第二個setBlocker引數parkBlocker欄位設定為null,這樣就完成了整個park方法的邏輯. 如果沒有第二個setBlocker,那麼之後沒有呼叫park(Object blocker),而直接呼叫getBlocker方法,得到的還是前一個park(Object blocker)設定的blocker,顯然是不符合邏輯的。 所以,park(Object) 方法裡必須要呼叫setBlocker方法兩次。

/** 禁用當前執行緒並等待多長時間. */
public static void parkNanos(Object blocker, long nanos) {
    if (nanos > 0) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        UNSAFE.park(false, nanos);
        setBlocker(t, null);
    }
}
/**  為了執行緒排程,在指定的時限前禁用當前執行緒,除非許可可用。*/
public static void parkUntil(Object blocker, long deadline) {
    Thread t = Thread.currentThread();
    setBlocker(t, blocker);
    UNSAFE.park(true, deadline);
    setBlocker(t, null);
}

呼叫了park方法後,會禁用當前執行緒,除非許可可用,在以下三種情況發生之前,執行緒處於阻塞狀態:
1.其他某個執行緒將當前執行緒作為目標呼叫 unpark
2.其他某個執行緒中斷當前執行緒
3.該呼叫不合邏輯地(即毫無理由地)返回

unpark方法

public static void unpark(Thread thread) {
    if (thread != null)
        UNSAFE.unpark(thread);
}