1. 程式人生 > 其它 >Java協程實踐指南(一)

Java協程實踐指南(一)

一. 協程產生的背景

說起協程,大多數人的第一印象可能就是GoLang,這也是Go語言非常吸引人的地方之一,它內建的併發支援。Go語言併發體系的理論是C.A.R Hoare在1978年提出的CSP(Communicating Sequential Process,通訊順序程序)。CSP有著精確的數學模型,並實際應用在了Hoare參與設計的T9000通用計算機上。從NewSqueak、Alef、Limbo到現在的Go語言,對於對CSP有著20多年實戰經驗的Rob Pike來說,他更關注的是將CSP應用在通用程式語言上產生的潛力。作為Go併發程式設計核心的CSP理論的核心概念只有一個:同步通訊。

首先要明確一個概念:併發不是並行。併發更關注的是程式的設計層面,併發的程式完全是可以順序執行的,只有在真正的多核CPU上才可能真正地同時執行。並行更關注的是程式的執行層面,並行一般是簡單的大量重複,例如GPU中對影象處理都會有大量的並行運算。為更好的編寫併發程式,從設計之初Go語言就注重如何在程式語言層級上設計一個簡潔安全高效的抽象模型,讓程式設計師專注於分解問題和組合方案,而且不用被執行緒管理和訊號互斥這些繁瑣的操作分散精力。

在併發程式設計中,對共享資源的正確訪問需要精確的控制,在目前的絕大多數語言中,都是通過加鎖等執行緒同步方案來解決這一困難問題,而Go語言卻另闢蹊徑,它將共享的值通過Channel傳遞(實際上多個獨立執行的執行緒很少主動共享資源)。在任意給定的時刻,最好只有一個Goroutine能夠擁有該資源。資料競爭從設計層面上就被杜絕了。為了提倡這種思考方式,Go語言將其併發程式設計哲學化為一句口號:

Do not communicate by sharing memory; instead, share memory by communicating.
不要通過共享記憶體來通訊,而應通過通訊來共享記憶體。

這是更高層次的併發程式設計哲學(通過管道來傳值是Go語言推薦的做法)。雖然像引用計數這類簡單的併發問題通過原子操作或互斥鎖就能很好地實現,但是通過Channel來控制訪問能夠讓你寫出更簡潔正確的程式。

在《七週七併發模型》中描述的七種併發程式設計模型。

參考: https://www.cnblogs.com/barrywxx/p/10406978.html

  1. 執行緒與鎖:執行緒與鎖模型有很多眾所周知的不足,但仍是其他模型的技術基礎,也是很多併發軟體開發的首選。

  2. 函數語言程式設計:函數語言程式設計日漸重要的原因之一,是其對併發程式設計和並行程式設計提供了良好的支援。函數語言程式設計消除了可變狀態,所以從根本上是執行緒安全的,而且易於並行執行。

  3. Clojure之道——分離標識與狀態:程式語言Clojure是一種指令式程式設計和函數語言程式設計的混搭方案,在兩種程式設計方式上取得了微妙的平衡來發揮兩者的優勢。

  4. actor:actor模型是一種適用性很廣的併發程式設計模型,適用於共享記憶體模型和分散式記憶體模型,也適合解決地理分佈型問題,能提供強大的容錯性。

  5. 通訊順序程序(Communicating Sequential Processes,CSP):表面上看,CSP模型與actor模型很相似,兩者都基於訊息傳遞。不過CSP模型側重於傳遞資訊的通道,而actor模型側重於通道兩端的實體,使用CSP模型的程式碼會帶有明顯不同的風格。

  6. 資料級並行:每個膝上型電腦裡都藏著一臺超級計算機——GPU。GPU利用了資料級並行,不僅可以快速進行影象處理,也可以用於更廣闊的領域。如果要進行有限元分析、流體力學計算或其他的大量數字計算,GPU的效能將是不二選擇。

  7. Lambda架構:大資料時代的到來離不開並行——現在我們只需要增加計算資源,就能具有處理TB級資料的能力。Lambda架構綜合了MapReduce和流式處理的特點,是一種可以處理多種大資料問題的架構。

通常語言的併發模型有以下幾種。

  • 執行緒模型

    作業系統抽象,開發效率高,IO密集、高併發下切換開銷大。

  • 非同步模型

    程式設計框架抽象,執行效率高,破壞結構化程式設計,開發門檻高。

  • 協程模型

    語言執行時抽象,輕量級執行緒,兼顧開發效率和執行效率。

二. Java協程發展歷程

Java本身有著豐富的非同步程式設計框架,比如說CompletableFuture,在一定程度上緩解了Java使用協程的緊迫性。

在2010年,JKU大學發表了一篇論文《高效的協程》,向OpenJdk社群提了一個協程框架的Patch,在2013年Quasar和Coroutine,這兩種協程框架不需要修改Runtime,在協程切換時本來是要儲存呼叫棧的,但是它們不儲存這個呼叫棧,而是在切換時回溯呼叫鏈,生成一個狀態機,將狀態機儲存起來。

Quasar和Coroutine並不是OpenJdk社群原生的協程解決方案,直到2018年1月,官方提出了Project Loom,到了2019年,Loom的首個EA版本問世,此時Java的協程類叫做Fiber,但社群覺得這引入了一個新的概念,於是在2019年10月將Fiber重新實現為了Thread的子類VirtualThread,相容Thread的所有操作。

這時Project Loom的基本雛形已經完成了,在它的概念中,協程就是一個特殊的執行緒,是執行緒的一個子類,從Project Loom已經可以看到Open Jdk社群未來協程發展的方向, 但Loom還有很多的工作需要完成,並沒有完全開發完。

三. Project Loom的目標與挑戰

  • 目標

    易於理解的Java協程系統解決方案,協程即執行緒。

Virtual threads are just threads that are scheduled by the Java virtual machine rather than the operating system.

  • 挑戰

    相容龐大而複雜的標準類庫、JVM特性,同時支援協程和執行緒。

四. Loom實現架構

在API層面Loom引入最重要的概念就是Virtual Thread,對於使用者來說可以當做Thread來理解。

下面是協程生命週期的描述,與執行緒相同需要一個start函式開始執行,接下來VirtualThread就會被排程執行,與執行緒不同的是,協程的上層需要一個排程器來排程它,而不是被作業系統直接排程,被排程執行後就是執行業務程式碼,此時我們業務程式碼可能會遇到一個數據庫訪問或者IO操作,這時當前協程就會被Park起來,與執行緒相同,此時我們的協程需要在切換前儲存上下文,這步操作是由Runtime的Freeze來執行,等到IO操作完成,協程被喚醒繼續執行,這時就要恢復上下文,這一步叫做Thaw。

1. Freeze操作

上圖左側是對Freeze的介紹,首先一個協程要被執行需要一個排程器,在Java生態本身就有一個非常不錯的排程器ForkJoinPool,Loom也預設使用ForkJoinPool來作為排程器。

圖中ForkJoinWorkerThread呼叫棧前半部分直到enterSpecial都是類庫的呼叫棧,使用者不需要考慮,A可以理解為使用者自己的實現,從函式A呼叫到函式B,函式B呼叫函式C,函式C此時有一個數據訪問,就會將當前協程掛起,yield操作會去儲存當前協程的執行上下文,呼叫freeze,freeze會做一個stack walk,從當前呼叫棧的最後一層(yield)回溯到使用者呼叫(函式A),將這些內容拷貝到一個stack。這也是協程棧大小不固定的原因,我們可以動態擴縮協程需要的空間,而執行緒棧大小預設1M,不管用沒用到。而協程按需使用的特點,可以建立的數量非常多。extract_pop是Loom非常好的一個優化,它將ABC呼叫棧中的Java物件單獨拷貝到一個refStack,在GC root時,如果把協程棧也當做root,幾百萬個協程會導致掃描停頓很久,Loom將所有物件都提到一個refStack裡面,只需要處理這個stack即可,避免過多的協程棧增加GC時間。

2. Thaw操作

Thaw用於恢復執行,如果將stack裡面ABC、yield全部拷貝回執行棧裡面可能是很耗時的,因為執行棧可能非常深了,Loom社群成員在調研後發現,函式C可能不止一個數據訪問操作,在恢復執行棧之後,可能因為C的IO操作又會再次切換上下文,所以Loom用了一種lazy copy的方式,每次只拷貝一部分,執行完成之後遇到return barrier則繼續去stack中拷貝。這樣除了第一次切換開銷比較大,其他所有的切換開銷都會很小。

另一方面refStack裡面儲存的OOP要restore回來,因為很多的GC可能在執行時將OOP地址改了,如果不restore之後訪問可能會出現問題。

五. Loom使用

  • Virtual Thread建立

    • 通過Thread.builder建立VirtualThread

    • 通過Thread.builder建立VirtualThread工廠

    • 預設ForkJoinPool排程器(負載均衡、自動擴充套件),支援定製排程器

  • 定製排程器
static ExecutorService SCHEDULER_1 = Executors.newFixedThreadPool(1);
Thread thread = Thread.ofVirtual().scheduler(SCHEDULER_1).start(() -> System.out.println("Hello"));
thread.join();
  • 建立協程池
ThreadFactory factory;
if (usrFiber == false) {
    factory = Thread.builder().factory();
} else {
    factory = Thread.builder().ofVirtual().factory();
}
ExecutorService e = Executors.newFixThreadPool(threadCount, factory);
for (int i=0; i < requestCount; i++) {
    e.execute(r);
}