1. 程式人生 > 其它 >Java同步器之LockSupport原始碼解析

Java同步器之LockSupport原始碼解析

一、簡介

LockSupport是用來建立鎖和其他同步類的基本執行緒阻塞原語。

LockSupport中的park()unpark()的作用分別是阻塞執行緒和解除阻塞執行緒,而且park()unpark()不會遇到“Thread.suspendThread.resume所可能引發的死鎖”問題。因為park()unpark()有許可的存在;呼叫park()的執行緒和另一個試圖將其unpark()的執行緒之間的競爭將保持活性。

二、原始碼

2.1 構造方法

LockSupport類只提供了一個被private修飾的構造方法,意味著LockSupport不能在任何地方被例項化,但所有方法都是靜態方法,可以在任意地方被呼叫。

/**
 * 私有構造
 */
private LockSupport() {}

2.2 屬性

/**
 * 這個方法是由於多執行緒隨機數生成器ThreadLocalRandom的package訪問許可權限制不能被這個包下的類使用,
 * 複製了一份實現出來,在StampedLock中被使用
 */
static final int nextSecondarySeed() {
    int r;
    Thread t = Thread.currentThread();
    if ((r = UNSAFE.getInt(t, SECONDARY)) != 0) {
        r ^= r << 13;   // xorshift
        r ^= r >>> 17;
        r ^= r << 5;
    }
    else if ((r = java.util.concurrent.ThreadLocalRandom.current().nextInt()) == 0)
        r = 1; // avoid zero
    UNSAFE.putInt(t, SECONDARY, r);
    return r;
}

/**
 * /**
 *  * 三種情況停止阻塞:
 *  * 1. 呼叫unpark
 *  * 2. 執行緒被中斷
 *  * 3. 設定時間到了
 *  * @Param isAbsolute 是否絕對時間
 *  * @Param time 時間 為0時代表無線等待
 *  *
 * Unsafe.park(boolean isAbsolute,long time);
 *
 * /**
 *  * 釋放某執行緒,需要保證釋放時執行緒存活
 *  *
 * Unsafe.unpark(Thread thread);
 */
private static final sun.misc.Unsafe UNSAFE;

/**
 * 獲取每個欄位的偏移地址
 */
private static final long parkBlockerOffset;
private static final long SEED;
private static final long PROBE;
private static final long SECONDARY;
static {
    try {
    	/** 獲取一個Unsafe例項 後續利用該例項獲取偏移量 */
        UNSAFE = sun.misc.Unsafe.getUnsafe();
        /** 建立一個Thread的class物件 後續利用反射獲取欄位 */
        Class<?> tk = Thread.class;
        parkBlockerOffset = UNSAFE.objectFieldOffset
            (tk.getDeclaredField("parkBlocker"));
        SEED = UNSAFE.objectFieldOffset
            (tk.getDeclaredField("threadLocalRandomSeed"));
        PROBE = UNSAFE.objectFieldOffset
            (tk.getDeclaredField("threadLocalRandomProbe"));
        SECONDARY = UNSAFE.objectFieldOffset
            (tk.getDeclaredField("threadLocalRandomSecondarySeed"));
    } catch (Exception ex) { throw new Error(ex); }
}

2.3 blocker

2.3.1 blocker的作用

在分析LockSupport原始碼前,什麼是blockerblocker的作用是什麼?

Thread類的原始碼中,可以找到這樣一個被volatile修飾的變數,LockSupport類中所有blocker的相關變數以及方法都是為這個parkBlocker變數服務的。

volatile Object parkBlocker;

當執行緒被阻塞時,如果該執行緒的parkBlocker變數不為空,則在列印堆疊異常時,控制檯會列印輸出具體阻塞物件的資訊,方便錯誤排查,後文會對此進行演示

2.3.2 blocker相關方法

首先關注blocker

相關方法:

/**
 * 設定blocker
 * 將執行緒t的parkBlocker欄位的值更新為arg
 */
private static void setBlocker(Thread t, Object arg) {
    UNSAFE.putObject(t, parkBlockerOffset, arg);
}

/**
 * 獲取blocker
 */
public static Object getBlocker(Thread t) {
    if (t == null)
        throw new NullPointerException();
    return UNSAFE.getObjectVolatile(t, parkBlockerOffset);
}

原始碼中可以清晰的看到方法中使用了parkBlockerOffset,即在類初始化時獲取的parkBlocker在記憶體中的偏移量,putObjectgetObjectVolatile方法採用地址加偏移量的方式從記憶體直接設定或獲取parkBlockerUnsafe包下的方法可以直接操作記憶體,因此該類被命名為Unsafe),這樣做的原因是因為執行緒被阻塞時無法被賦值或取值。

2.4 park

LockSupport為阻塞操作提供了兩組三類方法,一組是不設定blocker的方法,另一組是設定blocker方法的,JDK推薦為Thread設定blocker方便除錯。

/** 
 * 基礎阻塞方法,無時限 
 */
public static void park(Object blocker) {
	/** 獲取當前執行緒 */
    Thread t = Thread.currentThread();
    setBlocker(t, blocker);
    /** 阻塞核心方法 直到被喚醒前不會執行下一語句 內部邏輯後文展開討論 */
    UNSAFE.park(false, 0L);
    /** 執行緒被喚醒後parkBlocker重新置null */
    setBlocker(t, null);
}

/** 
 * 邏輯與park基本相同 阻塞nanos納秒 超時後執行緒被自動喚醒 
 */
public static void parkNanos(Object blocker, long nanos) {
    if (nanos > 0) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        UNSAFE.park(false, nanos);
        setBlocker(t, null);
    }
}

/** 
 * 邏輯與park基本相同 阻塞到日期deadline為止 超過deadline日期後自動被喚醒  
 */
public static void parkUntil(Object blocker, long deadline) {
    Thread t = Thread.currentThread();
    setBlocker(t, blocker);
    UNSAFE.park(true, deadline);
    setBlocker(t, null);
}

/**
 * 無限等待其它執行緒將該執行緒unpark 或 中斷該執行緒
 */
public static void park() {
    UNSAFE.park(false, 0L);
}

/**
 * 指定等待時間
 */
public static void parkNanos(long nanos) {
    if (nanos > 0)
        UNSAFE.park(false, nanos);
}

/**
 * 阻塞到指定時刻
 */
public static void parkUntil(long deadline) {
    UNSAFE.park(true, deadline);
}

2.5 unpark

unpark方法如下,UNSAFE.unpark為喚醒執行緒的核心方法

/**
 * 釋放許可,使執行緒繼續執行。
 * 如果執行緒沒被阻塞,則下次park不會阻塞該執行緒?
 */
public static void unpark(Thread thread) {
    if (thread != null)
        UNSAFE.unpark(thread);
}

說明LockSupport是通過呼叫Unsafe函式中的介面實現阻塞和解除阻塞的。

2.6 底層實現

Unsafe的原始碼中可見,parkunpark都是native方法(意味著由C++底層實現)

void Parker::park(bool isAbsolute, jlong time) {
  // 先原子的將_counter的值設為0,並返回_counter的原值,如果原值>0說明有通行證,直接返回
  // 首先嚐試能否獲取許可
  // 利用原子操作設_counter(許可)為0,同時返回_counter原值
  // 原值為1時獲取許可成功 直接return;
  if (Atomic::xchg(0, &_counter) > 0) return;

  Thread* thread = Thread::current();
  assert(thread->is_Java_thread(), "Must be JavaThread");
  JavaThread *jt = (JavaThread *)thread;
  //如果執行緒被中斷 直接返回
  if (Thread::is_interrupted(thread, false)) {
    return;
  }

  timespec absTime;
  // 如果出現time小於0 或
  // 呼叫了parkUntil方法(只用呼叫parkUntil時isAbsolute為true)且time為0的情況
  // 意味著不需要嘗試獲取許可 直接返回
  if (time < 0 || (isAbsolute && time == 0) ) {
    return;
  }
  // 只有在java種呼叫了parkNanos或parkUntil方法才會進入此分支
  if (time > 0) {
    // 定時喚醒
    unpackTime(&absTime, isAbsolute, time);
  }

  ThreadBlockInVM tbivm(jt);

  // 如果執行緒被中斷,直接返回
  // 如果沒有被中斷,且獲取互斥鎖失敗,直接返回
  if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) {
    return;
  }

  int status ;
  // 如果_counter > 0, 不需要等尤拉尤拉待,這裡再次檢查_counter的值
  if (_counter > 0)  {
    _counter = 0;
    status = pthread_mutex_unlock(_mutex);
    assert (status == 0, "invariant") ;
    // 插入寫屏障
    OrderAccess::fence();
    return;
  }


  OSThreadWaitState osts(thread->osthread(), false);
  // 暫停Java執行緒
  jt->set_suspend_equivalent();

  assert(_cur_index == -1, "invariant");
  if (time == 0) {
    _cur_index = REL_INDEX; // arbitrary choice when not timed
    // 執行緒進入阻塞狀態 並等待_cond[_cur_index]訊號
    status = pthread_cond_wait (&_cond[_cur_index], _mutex) ;
  } else {
    _cur_index = isAbsolute ? ABS_INDEX : REL_INDEX;
    // 執行緒進入限時阻塞狀態
    status = os::Linux::safe_cond_timedwait (&_cond[_cur_index], _mutex, &absTime) ;
    if (status != 0 && WorkAroundNPTLTimedWaitHang) {
      pthread_cond_destroy (&_cond[_cur_index]) ;
      pthread_cond_init    (&_cond[_cur_index], isAbsolute ? NULL : os::Linux::condAttr());
    }
  }
  _cur_index = -1;
  _counter = 0 ;
  // 互斥鎖釋放
  status = pthread_mutex_unlock(_mutex) ;
  assert_status(status == 0, status, "invariant") ;
  OrderAccess::fence();

}


void Parker::unpark() {
  int s, status ;
  status = pthread_mutex_lock(_mutex);
  assert (status == 0, "invariant") ; 
  //儲存原始許可值 用於後續判斷
  s = _counter; 
  //許可置1
  _counter = 1;
  if (s < 1) {
     if (WorkAroundNPTLTimedWaitHang) {
        //喚醒等待執行緒
        status = pthread_cond_signal (_cond) ;
        assert (status == 0, "invariant") ;
        //釋放鎖操作
        status = pthread_mutex_unlock(_mutex);
        assert (status == 0, "invariant") ;
     } else {
        status = pthread_mutex_unlock(_mutex);
        assert (status == 0, "invariant") ;
        status = pthread_cond_signal (_cond) ;
        assert (status == 0, "invariant") ;
     }
  } else {
   	//釋放鎖
    pthread_mutex_unlock(_mutex);
    assert (status == 0, "invariant") ;
  }
}

三、示例

對比下面的“示例1”和“示例2”可以更清晰的瞭解LockSupport的用法。

示例1

public class WaitTest1 {
 
    public static void main(String[] args) {
        ThreadA ta = new ThreadA("ta");
        synchronized(ta) { // 通過synchronized(ta)獲取“物件ta的同步鎖”
            try {
                System.out.println(Thread.currentThread().getName()+" start ta");
                ta.start();

                System.out.println(Thread.currentThread().getName()+" block");
                // 主執行緒等待
                ta.wait();

                System.out.println(Thread.currentThread().getName()+" continue");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    static class ThreadA extends Thread{
        public ThreadA(String name) {
            super(name);
        }
        public void run() {
            synchronized (this) { // 通過synchronized(this)獲取“當前物件的同步鎖”
                System.out.println(Thread.currentThread().getName()+" wakup others");
                notify();    // 喚醒“當前物件上的等待執行緒”
            }
        }
    }
}

示例2

import java.util.concurrent.locks.LockSupport;

public class LockSupportTest1 {

    private static Thread mainThread;

    public static void main(String[] args) {
        ThreadA ta = new ThreadA("ta");
        // 獲取主執行緒
        mainThread = Thread.currentThread();

        System.out.println(Thread.currentThread().getName()+" start ta");
        ta.start();

        System.out.println(Thread.currentThread().getName()+" block");
        // 主執行緒阻塞
        LockSupport.park(mainThread);

        System.out.println(Thread.currentThread().getName()+" continue");
    }

    static class ThreadA extends Thread{
        public ThreadA(String name) {
            super(name);
        }

        public void run() {
            System.out.println(Thread.currentThread().getName()+" wakup others");
            // 喚醒“主執行緒”
            LockSupport.unpark(mainThread);
        }
    }
}

執行結果

main start ta
main block
ta wakup others
main continue

四、總結

LocalSupport也是實現執行緒間通訊的一種有效的方式。unparkpark之間不需要有嚴格的順序。可以先執行unpark,之後再執行park。這樣再編碼的過程中就比使用wait/notify方法要簡單很多。此外,LocalSUpport全部是採用UnSafe類來實現的。這個類通過使用park/unpark以及相關cas操作,就實現了javaJUC的各種複雜的資料結構和容器。而且效率非常高。

五、拓展

5.1 park/unpark與wait/notify的區別

  1. 先喚醒再阻塞操作,由LockSupport實現,即先呼叫unpark再呼叫park,則該執行緒不會被阻塞;由Object實現,即先呼叫notify再呼叫wait,則該執行緒會被阻塞。
  2. LockSupport允許在任意地方阻塞喚醒執行緒,Objectwait/notify必須在synchronized同步程式碼塊內呼叫。因為park/unpark依賴許可量,wait/notify依賴鎖。
  3. LockSupport允許喚醒指定執行緒,notify只能喚醒隨機執行緒,notifyAll喚醒全部阻塞執行緒。