1. 程式人生 > 實用技巧 >java學習-併發-java.lang.Thread API

java學習-併發-java.lang.Thread API

java.lang.Thread API定義
    • new Thread 建立 Thread 物件
      • 對於當前操作並不涉及os執行緒的建立,只涉及java物件的建立
    • Thread#start 啟動執行緒: 這裡主要實現 jvm 和 java 程式碼通訊(呼叫)實現系統執行緒的建立和執行
    • Thread#join 等待執行緒執行完成: 本質是通過一直探測jvm中c++的javaThread物件是否存在,如果存在表示當前執行緒執行還未結束,反之則代表結束,這裡的核心點在於object#wait操作
  public static void main(String[] args) throws
InterruptedException { threadApi(); } public static void threadApi() throws InterruptedException { /** * 在載入Thread.class 到jvm中時會執行當前Thread 類中的static程式碼塊 以及 static變數 * 此時就會執行 registerNatives 的native 方法對應的就是 thread.c 中的 Java_java_lang_Thread_registerNatives 方法; * 該方法的執行會將Thread中所有對映的native 方法和變數進行載入到jvm中
*/ // 建立一個執行緒物件,對於這裡的操作只涉及到java層面的物件建立,並不涉及os執行緒操作;對於其中的屬性操作 // target -> 當前執行緒物件指定的runnable方法 // group -> 執行緒組: 對於一個新的執行緒物件而言,預設會使用當前呼叫執行緒的所屬執行緒組; // 對於stackSize 屬性在@since 1.4 後就可以人工指定,而不需要只使用 xss的限制 Thread thread = new Thread(() -> { log.info("java.lang.Thread api 學習"); });
// 啟動執行緒 : thread#start /** * start 首先是被synchronized 修飾表示當前方法操作是執行緒安全的同步操作: 因此就可以理解java language 規範中 happens-before 對於 start 和 run 的定義 * 在start 中 首先會將當前執行緒物件新增到thread.threadGroup; 然後會呼叫 start0(native) * 對於start0是屬於JNI方法,其對應的是jdk原始碼中的c++程式碼對應的就是"JVM_StartThread" 其對應的實現就是jvm.cpp中的 JVM_ENTRY(void, JVM_StartThread(JNIEnv* env, jobject jthread)) 方法; * "java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread)) != NULL" 對於 當前判斷是通過判斷當前jobject 是否已被例項化,如果已經被例項化說明當前物件已被建立表示對應的執行緒已被建立; 因此對於一個Thread物件而言如果呼叫了多次start方法則會丟擲 異常"java_lang_IllegalThreadStateException"-> 對應的就是 "java.lang.IllegalThreadStateException" 在openjdk中 使用了 "THROW(vmSymbols::java_lang_IllegalThreadStateException());" 來丟擲異常,其對映是存在於 vmSymbols.h中的模板定義" template(java_lang_IllegalThreadStateException, "java/lang/IllegalThreadStateException") \" * * "native_thread = new JavaThread(&thread_entry, sz);" 此時會建立 JavaThread 物件, 在這裡傳遞了兩個引數一個是 "thread_entry方法的引用(類似於java lambda)",第二個引數為當前執行緒棧大小 * 對 "thread_entry"方法的分析: * static void thread_entry(JavaThread* thread, TRAPS) { * HandleMark hm(THREAD); * Handle obj(THREAD, thread->threadObj()); // 該操作就是為了java Thread 物件 * JavaValue result(T_VOID); * JavaCalls::call_virtual(&result, // 對於 call_virtual 實際就是執行的回撥操作,該操作會呼叫java Thread#run 方法 * obj, * SystemDictionary::Thread_klass(), * vmSymbols::run_method_name(), // 此處就是將對應的 run方法加入其中 * vmSymbols::void_method_signature(), * THREAD); * } * 此時進入 JavaThread::JavaThread(ThreadFunction entry_point, size_t stack_sz) 構造方法 * 1:執行當前JavaThread 物件的初始化操作:initialize() * 2:判斷當前執行緒型別時 java_thread 還是 compiler_thread, 此時會呼叫 os::create_thread(Thread* thread, ThreadType thr_type, size_t req_stack_size) * 此時進入 os::create_thread(Thread* thread, ThreadType thr_type, size_t req_stack_size) (os_linux.cpp): * 在當前程式碼的關鍵點就在於 pthread api的使用,利用POSIX_THREAD高階api來建立os執行緒; * 對於 pthread_create第三個引數傳遞的 thread_native_entry 方法引用分析 * 重點在於對於 "thread->call_run();"通過呼叫 "JavaThread::run()" -> "JavaThread::thread_main_inner()" -> "this->entry_point()(this, this);" (這裡可以看上面"thread_entry"的解釋) -> 該操作是正式執行Thread#run回撥; * 噹噹前執行緒執行結束後會執行"JavaThread::post_run()" 進行資源回收 * 對於當前方法而言,有可能由於記憶體不足導致執行緒建立失敗,通過 * // Allocate the OSThread object * OSThread* osthread = new OSThread(NULL, NULL); * if (osthread == NULL) { * return false; * } * 來判斷是否有空間建立執行緒物件;如果空間分配失敗則表示執行緒建立失敗,但其不會丟擲OOM異常 * 對於 以下程式碼分析: * if (native_thread->osthread() != NULL) { * // Note: the current thread is not being used within "prepare". * native_thread->prepare(jthread); * } * osthread() 實際是呼叫的 "thread.hpp中的 OSThread* osthread() const { return _osthread; }" * 對於 os::create_thread 中如果OSThread 物件建立成功則會呼叫 "void set_osthread(OSThread* thread) { _osthread = thread; }",來設定 "_osthread" 屬性,因此當物件OSThread 建立失敗時,當前欄位為null,因此可以通過當前欄位判斷執行緒是否真正建立成功 */ thread.start(); // thread.start();// 丟擲 IllegalThreadStateException /** * 阻塞等待當前執行緒執行結束 * 這裡主要是通過呼叫 native java.lang.Thread#isAlive() 實現,對應的c++ 中的操作就是去獲取JavaThread 物件是否存在,對於 thread#start 中 在c++中有一個操作就是在post_run方法中會進行資源回收,因此當c++中javaThread執行結束後,物件就會被資源回收,代表當前執行緒已經執行結束; * * 這裡還有一點關於 Object#wait native 分析: 對於wait操作實際是直接執行膨脹鎖並不會有偏向鎖的存在 * 對於 Object#wait 對應的 jni 定義 存在於 jvm.h 的 * JNIEXPORT void JNICALL * JVM_MonitorWait(JNIEnv *env, jobject obj, jlong ms); * 其具體實現為 jvm.cpp 中的 "JVM_ENTRY(void, JVM_MonitorWait(JNIEnv* env, jobject handle, jlong ms))" 主要為 呼叫 "ObjectSynchronizer::wait(obj, ms, CHECK);" * 此時進入 synchronizer.cpp中的 * int ObjectSynchronizer::wait(Handle obj, jlong millis, TRAPS) * 對於當前方法首先判斷JVM是否支援偏向鎖,如果支援偏向鎖,"BiasedLocking::revoke(obj, THREAD);" 會對當前物件頭執行偏向鎖撤銷操作 * 後續會直接呼叫 "ObjectMonitor* ObjectSynchronizer::inflate(Thread* self, oop object,const InflateCause cause)" 進入膨脹過程 * 對於膨脹的過程實際就是對 Inflated(已持有到鎖) -> Stack-locked -> INFLATING -> Neutral -> BIASED 按照鎖的粒度從高到低進行判斷,最終轉換為膨脹鎖階段 返回 ObjectMonitor; * 對於當前inflate實際就是 object轉換為 ObjectMonitor的過程 * 對於 ObjectMonitor分析 : 其中存在一個 waitSet 屬性值, 其操作就是類似於 AQS 的雙向佇列 * ObjectWaiter* volatile _WaitSet; // LL of threads wait()ing on the monitor * 對於 ObjectWaiter 也可以等價於 java.util.concurrent.locks.AbstractQueuedSynchronizer.Node * 再次進入 objectMonitor.cpp中的 * void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) * 會首先建立一個ObjectWaiter 節點準備加入到等待佇列中,會使用當前Thread的parkevent 執行reset進行重置掛起操作,使用OrderAccess.fence建立記憶體屏障 * 對於addWaiter 操作也是受到 spinlock的保護,因此需要 在入隊之前先執行 Thread::SpinAcquire 操作 對於當前入隊操作 和 java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire(java.util.concurrent.locks.AbstractQueuedSynchronizer.Node, int, boolean, boolean, boolean, long) 操作類似, 不同點在於 對於 c++是直接使用了 spinLock; * 對於 spinLock acquire 首先會執行cas操作作為fast,如果失敗,則執行下一個階段, 在此階段如果為單核則直接阻塞當前執行緒; 反之則 會首先執行 執行緒 yield ,並執行cas操作如果操作持續失敗,且yield次數達到閾值,則會執行執行緒休眠1ms操作,並直到cas操作成功,則加鎖成功; * 在執行AddWaiter 操作中 和 aqs node acquire不同點在於初始化連結串列時並不會建立一個無意義的頭節點; 釋放鎖 * 呼叫 Thread#ParkEvent屬性 的park 方法 實際 就是 os::PlatformEvent::park 內部使用了 POSIXThread中的相關的lock以及condition 相關api * */ thread.join(); /** * 分析 jdk.internal.misc.Unsafe#park(boolean, long) * 對應的就是 unsafe.cpp中的 "UNSAFE_ENTRY(void, Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time))" 其最終操作也是執行到了os::PlatformEvent::park 內部使用了 POSIXThread中的相關的lock以及condition 相關api */ } public static void executors() { ExecutorService executorService = Executors.newFixedThreadPool(2); // 由於 core = 2, 當前只提交了一個任務,因此會理解建立任務,並建立新的thread物件,並呼叫start 方法來執行當前執行緒的runnable executorService.submit(() -> { log.info("執行緒啟動"); }); executorService.shutdown(); }

對於這裡大量的操作實際都是使用了 POSIX_Thread 高階api相關操作