1. 程式人生 > >JVM為大資料圈帶來的問題 & 解決方法

JVM為大資料圈帶來的問題 & 解決方法

Hadoop的成功固然是由於其順應了新世紀以來網際網路技術的發展趨勢,同時其基於JVM的平臺開發也為Hadoop的快速發展起到了促進作用。Hadoop生態圈的專案大都基於Java,Scala,Clojure等JVM語言開發,這些語言良好的語法規範,豐富的第三方類庫以及完善的工具支援,為Hadoop這樣的超大型專案提供了基礎支撐。同時,作為在程式設計師中普及率的語言之一,它也降低了更多程式設計師使用,或是參與開發Hadoop專案的門檻。同時,基於Scala開發的Spark,甚至因為專案的火熱反過來極大的促進了Scala語言的推廣。但是隨著Hadoop平臺的逐步發展,Hadoop生態圈的專案之間的競爭加劇,越來越多的Hadoop專案注意到了這些JVM語言的一些不足之處,希望通過更有效率的處理方式,提升分散式系統的執行效率與健壯性。本文主要以Spark和Flink專案為例,介紹Hadoop社群觀察到的一些因為JVM語言的不足導致的問題,以及相應的解決方案與未來可能的發展方向。

  注:本文假設讀者對Java和Hadoop系統有基本瞭解。

  背景

  目前Hadoop生態圈共有MapReduce,Tez,Spark及Flink等分散式計算引擎,分散式計算引擎專案之間的競爭也相當激烈。MapReduce作為Hadoop平臺的個分散式計算引擎,具有非常良好的可擴充套件性,Yahoo曾成功的搭建了上萬臺節點的MapReduce系統。但是MapReduce只支援Map和Reduce程式設計正規化,使得複雜資料計算邏輯需要分割為多個Hadoop Job,而每個Hadoop Job都需要從HDFS讀取資料,並將Job執行結果寫回HDFS,所以會產生大量額外的IO開銷,目前MapReduce正在逐漸被其他三個分散式計算引擎替代。Tez,Spark和Flink都支援圖結構的分散式計算流,可在同一Job內支援任意複雜邏輯的計算流。Tez的抽象層次較低,使用者不易直接使用,Spark與Flink都提供了抽象的分散式資料集以及可在資料集上使用的操作符,使用者可以像操作Scala資料集合類似的方式在Spark/FLink中的操作分散式資料集,非常的容易上手,同時,Spark與Flink都在分散式計算引擎之上,提供了針對SQL,流處理,機器學習和圖計算等特定資料處理領域的庫。

  隨著各個專案的發展與日益成熟,通過改進分散式計算框架本身大幅提高效能的機會越來越少。同時,在當前資料中心的硬體配置中,採用了越來越多更先進的IO裝置,例如SSD儲存,10G甚至是40Gbps網路,IO頻寬的提升非常明顯,許多計算密集型別的工作負載的瓶頸已經取決於底層硬體系統的吞吐量,而不是傳統上人們認為的IO頻寬,而CPU和記憶體的利用效率,則很大程度上決定了底層硬體系統的吞吐量。所以越來越多的專案將眼光投向了JVM本身,希望通過解決JVM本身帶來的一些問題,提高分散式系統的效能或是健壯性,從而增強自身的競爭力。

  JVM本身作為一個各種型別應用執行的平臺,其對Java物件的管理也是基於通用的處理策略,其垃圾回收器通過估算Java物件的生命週期對Java物件進行有效率的管理。針對不同型別的應用,使用者可能需要針對該型別應用的特點,配置針對性的JVM引數更有效率的管理Java物件,從而提高效能。這種JVM調優的黑魔法需要使用者對應用本身以及JVM的各引數有深入的瞭解,極大的提高了分散式計算平臺的調優門檻(例如這篇文章中對Spark的調優 Tuning Java Garbage Collection for Spark Applications )。然而類似Spark或是Flink的分散式計算框架,框架本身瞭解計算邏輯每個步驟的資料傳輸,相比於JVM垃圾回收器,其瞭解更多的Java物件生命週期,從而為更有效率的管理Java物件提供了可能。

  JVM存在的問題

  1. Java物件開銷

  相對於c/c++等更加接近底層的語言,Java物件的儲存密度相對偏低,例如【1】,“abcd”這樣簡單的字串在UTF-8編碼中需要4個位元組儲存,但Java採用UTF-16編碼儲存字串,需要8個位元組儲存“abcd”,同時Java物件還物件header等其他額外資訊,一個4位元組字串物件,在Java中需要48位元組的空間來儲存。對於大部分的大資料應用,記憶體都是稀缺資源,更有效率的記憶體儲存,則意味著CPU資料訪問吞吐量更高,以及更少的磁碟落地可能。

  2. 物件儲存結構引發的cache miss

  為了緩解CPU處理速度與記憶體訪問速度的差距【2】,現代CPU資料訪問一般都會有多級快取。當從記憶體載入資料到快取時,一般是以cache line為單位載入資料,所以當CPU訪問的資料如果是在記憶體中連續儲存的話,訪問的效率會非常高。如果CPU要訪問的資料不在當前快取所有的cache line中,則需要從記憶體中載入對應的資料,這被稱為一次cache miss。當cache miss非常高的時候,CPU大部分的時間都在等待資料載入,而不是真正的處理資料。Java物件並不是連續的儲存在記憶體上,同時很多的Java資料結構的資料聚集性也不好,在Spark的效能調優中,經常能夠觀測到大量的cache miss。Java社群有個專案叫做Project Valhalla,可能會部分的解決這個問題,有興趣的可以看看這兒 OpenJDK: Valhalla 。

  3. 大資料的垃圾回收

  Java的垃圾回收機制,一直讓Java開發者又愛又恨,一方面它免去了開發者自己回收資源的步驟,提高了開發效率,減少了記憶體洩漏的可能,另一方面,垃圾回收也是Java應用的一顆不定時炸彈,有時秒級甚至是分鐘級的垃圾回收極大的影響了Java應用的效能和可用性。在當前的資料中心中,大容量的記憶體得到了廣泛的應用,甚至出現了單臺機器配置TB記憶體的情況,同時,大資料分析通常會遍歷整個源資料集,對資料進行轉換,清洗,處理等步驟。在這個過程中,會產生海量的Java物件,JVM的垃圾回收執行效率對效能有很大影響。通過JVM引數調優提高垃圾回收效率需要使用者對應用和分散式計算框架以及JVM的各引數有深入的瞭解,而且有時候這也遠遠不夠。

  4. OOM問題

  OutOfMemoryError是分散式計算框架經常會遇到的問題,當JVM中所有物件大小超過分配給JVM的記憶體大小時,就會fOutOfMemoryError錯誤,JVM崩潰,分散式框架的健壯性和效能都會受到影響。通過JVM管理記憶體,同時試圖解決OOM問題的應用,通常都需要檢查Java物件的大小,並在某些儲存Java物件特別多的資料結構中設定閾值進行控制。但是JVM並沒有提供官方的檢查Java物件大小的工具,第三方的工具類庫可能無法準確通用的確定Java物件的大小【6】。侵入式的閾值檢查也會為分散式計算框架的實現增加很多額外的業務邏輯無關的程式碼。

  解決方案

  為了解決以上提到的問題,高效能分散式計算框架通常需要以下技術:

  1. 定製的序列化工具。顯式記憶體管理的前提步驟就是序列化,將Java物件序列化成二進位制資料儲存在記憶體上(on heap或是off-heap)。通用的序列化框架,如Java預設的java.io.Serializable將Java物件以及其成員變數的所有元資訊作為其序列化資料的一部分,序列化後的資料包含了所有反序列化所需的資訊。這在某些場景中十分必要,但是對於Spark或是Flink這樣的分散式計算框架來說,這些元資料資訊可能是冗餘資料。定製的序列化框架,如Hadoop的org.apache.hadoop.io.Writable,需要使用者實現該介面,並自定義類的序列化和反序列化方法。這種方式效率,但需要使用者額外的工作,不夠友好。

  2. 顯式的記憶體管理。一般通用的做法是批量申請和釋放記憶體,每個JVM例項有一個統一的記憶體管理器,所有的記憶體的申請和釋放都通過該記憶體管理器進行。這可以避免常見的記憶體碎片問題,同時由於資料以二進位制的方式儲存,可以大大減輕垃圾回收的壓力。

  3. 快取友好的資料結構和演算法。只將操作相關的資料連續儲存,可以化的利用L1/L2/L3快取,減少Cache miss的概率,提升CPU計算的吞吐量。以排序為例,由於排序的主要操作是對Key進行對比,如果將所有排序資料的Key與Value分開,對Key連續儲存,則訪問Key時的Cache命中率會大大提高。

  定製的序列化工具

  分散式計算框架可以使用定製序列化工具的前提是要處理的資料流通常是同一型別,由於資料集物件的型別固定,對於資料集可以只儲存一份物件Schema資訊,節省大量的儲存空間。同時,對於固定大小的型別,也可通過固定的偏移位置存取。當我們需要訪問某個物件成員變數的時候,通過定製的序列化工具,並不需要反序列化整個Java物件,而是可以直接通過偏移量,只是反序列化特定的物件成員變數。如果物件的成員變數較多時,能夠大大減少Java物件的建立開銷,以及記憶體資料的拷貝大小。Spark與Flink資料集都支援任意Java或是Scala型別,通過自動生成定製序列化工具,Spark與Flink既保證了API介面對使用者的友好度(不用像Hadoop那樣資料型別需要繼承實現org.apache.hadoop.io.Writable介面),同時也達到了和Hadoop類似的序列化效率。

  Spark的序列化框架

  Spark支援通用的計算框架,如Java Serialization和Kryo。其缺點之前也略有論述,總結如下:

  佔用較多記憶體。Kryo相對於Java Serialization更高,它支援一種型別到Integer的對映機制,序列化時用Integer代替型別資訊,但還不及定製的序列化工具效率。

  反序列化時,必須反序列化整個Java物件。

  無法直接操作序列化後的二進位制資料。

  Project Tungsten 提供了一種更好的解決方式,針對於DataFrame API(Spark針對結構化資料的類SQL分析API,參考 Spark DataFrame Blog ),由於其資料集是有固定Schema的Tuple(可大概類比為資料庫中的行),序列化是針對每個Tuple儲存其型別資訊以及其成員的型別資訊是非常浪費記憶體的,對於Spark來說,Tuple型別資訊是全域性可知的,所以其定製的序列化工具只儲存Tuple的資料,如下圖所示

  圖1 Spark off-heap object layout

  對於固定大小的成員,如int,long等,其按照偏移量直接內聯儲存。對於變長的成員,如String,其儲存一個指標,指向真正的資料儲存位置,並在資料儲存開始處儲存其長度。通過這種儲存方式,保證了在反序列化時,當只需訪問某一個成員時,只需根據偏移量反序列化這個成員,並不需要反序列化整個Tuple。

  Project Tungsten的定製序列化工具應用在Sort,HashTable,Shuffle等很多對Spark效能影響的地方。比如在Shuffle階段,定製序列化工具不僅提升了序列化的效能,而且減少了網路傳輸的資料量,根據DataBricks的Blog介紹,相對於Kryo,Shuffle800萬複雜Tuple資料時,其效能至少提高2倍以上。此外,Project Tungsten也計劃通過Code generation技術,自動生成序列化程式碼,將定製序列化工具推廣到Spark Core層,從而使得更多的Spark應用受惠於此優化。

  Flink的序列化框架

  Flink在系統設計之初,就借鑑了很多傳統RDBMS的設計,其中之一就是對資料集的型別資訊進行分析,對於特定Schema的資料集的處理過程,進行類似RDBMS執行計劃優化的優化。同時,資料集的型別資訊也可以用來設計定製的序列化工具。和Spark類似,Flink支援任意的Java或是Scala型別,Flink通過Java Reflection框架分析基於Java的Flink程式UDF(User Define Function)的返回型別的型別資訊,通過Scala Compiler分析基於Scala的Flink程式UDF的返回型別的型別資訊。型別資訊由TypeInformation類表示,這個類有諸多具體實現類,例如(更多詳情參考Flink官方部落格 Apache Flink: Juggling with Bits and Bytes ):

  1. BasicTypeInfo: 任意Java基本型別(裝包或未裝包)和String型別。

  2. BasicArrayTypeInfo: 任意Java基本型別陣列(裝包或未裝包)和String陣列。

  3. WritableTypeInfo: 任意Hadoop’s Writable介面的實現類.

  4. TupleTypeInfo: 任意的Flink tuple型別(支援Tuple1 to Tuple25). Flink tuples是固定長度固定型別的Java Tuple實現。

  5. CaseClassTypeInfo: 任意的 Scala CaseClass(包括 Scala tuples).

  6. PojoTypeInfo: 任意的POJO (Java or Scala),例如,Java物件的所有成員變數,要麼是public修飾符定義,要麼有getter/setter方法。

  7. GenericTypeInfo: 任意無法匹配之前幾種型別的類。)

  前6種類型資料集幾乎覆蓋了絕大部分的Flink程式,針對前6種類型資料集,Flink皆可以自動生成對應的TypeSerializer定製序列化工具,非常有效率的對資料集進行序列化和反序列化。對於第7中型別,Flink使用Kryo進行序列化和反序列化。此外,對於可被用作Key的型別,Flink還同時自動生成TypeComparator,用來輔助直接對序列化後的二進位制資料直接進行compare,hash等之類的操作。對於Tuple,CaseClass,Pojo等組合型別,Flink自動生成的TypeSerializer,TypeComparator同樣是組合的,並把其成員的序列化/反序列化代理給其成員對應的TypeSerializer,TypeComparator,如下圖所示:

  圖2 Flink組合型別序列化

  此外,如有需要,使用者可通過整合TypeInformation介面,定製實現自己的序列化工具。

  顯式的記憶體管理

  垃圾回收的JVM記憶體管理迴避不了的問題,JDK8的G1演算法改善了JVM垃圾回收的效率和可用範圍,但對於大資料處理的實際環境中,還是遠遠不夠。這也和現在分散式框架的發展趨勢有衝突,越來越多的分散式計算框架希望儘可能多的將待處理的資料集放在記憶體中,而對於JVM垃圾回收來說,記憶體中Java物件越少,存活時間越短,其效率越高。通過JVM進行記憶體管理的話,OutOfMemoryError也是一個很難解決的問題。同時,在JVM記憶體管理中,Java物件有潛在的碎片化儲存問題(Java物件所有資訊可能不是在記憶體中連續儲存),也有可能在所有Java物件大小沒有超過JVM分配記憶體時,出現OutOfMemoryError問題。

  Flink的記憶體管理

  Flink將記憶體分為三個部分,每個部分都有不同的用途:

  1. Network buffers: 一些以32KB Byte陣列為單位的buffer,主要被網路模組用於資料的網路傳輸。

  2. Memory Manager pool: 大量以32KB Byte陣列為單位的記憶體池,所有的執行時演算法(例如Sort/Shuffle/Join)都從這個記憶體池申請記憶體,並將序列化後的資料儲存其中,結束後釋放回記憶體池。

  3. Remaining (Free) Heap: 主要留給UDF中使用者自己建立的Java物件,由JVM管理。

  Network buffers在Flink中主要基於Netty的網路傳輸,無需多講。Remaining Heap用於UDF中使用者自己建立的Java物件,在UDF中,使用者通常是流式的處理資料,並不需要很多記憶體,同時Flink也不鼓勵使用者在UDF中快取很多資料,因為這會引起前面提到的諸多問題。Memory Manager pool(以後以記憶體池代指)通常會配置為的一塊記憶體,接下來會詳細介紹。

  在Flink中,記憶體池由多個MemorySegment組成,每個MemorySegment代表一塊連續的記憶體,底層儲存是byte[],預設32KB大小。MemorySegment提供了根據偏移量訪問資料的各種方法,如get/put int,long,float,double等,MemorySegment之間資料拷貝等方法,和java.nio.ByteBuffer類似。對於Flink的資料結構,通常包括多個向記憶體池申請的MemeorySegment,所有要存入的物件,通過TypeSerializer序列化之後,將二進位制資料儲存在MemorySegment中,在取出時,通過TypeSerializer反序列化。資料結構通過MemorySegment提供的set/get方法訪問具體的二進位制資料。

  Flink這種看起來比較複雜的記憶體管理方式帶來的好處主要有:

  1. 二進位制的資料儲存大大提高了資料儲存密度,節省了儲存空間。

  2. 所有的執行時資料結構和演算法只能通過記憶體池申請記憶體,保證了其使用的記憶體大小是固定的,不會因為執行時資料結構和演算法而發生OOM。而對於大部分的分散式計算框架來說,這部分由於要快取大量資料,是很有可能導致OOM的地方。

  3. 記憶體池雖然佔據了大部分記憶體,但其中的MemorySegment容量較大(預設32KB),所以記憶體池中的Java物件其實很少,而且一直被記憶體池引用,所有在垃圾回收時很快進入持久代,大大減輕了JVM垃圾回收的壓力。

  4. Remaining Heap的記憶體雖然由JVM管理,但是由於其主要用來儲存使用者處理的流式資料,生命週期非常短,速度很快的Minor GC就會全部回收掉,一般不會觸發Full GC。

  Flink當前的記憶體管理在很底層是基於byte[],所以資料很終還是on-heap,很近Flink增加了off-heap的記憶體管理支援,將會在下一個release中正式出現。Flink off-heap的記憶體管理相對於on-heap的優點主要在於(更多細節,請參考 Apache Flink: Off-heap Memory in Apache Flink and the curious JIT compiler ):

  1. 啟動分配了大記憶體(例如100G)的JVM很耗費時間,垃圾回收也很慢。如果採用off-heap,剩下的Network buffer和Remaining heap都會很小,垃圾回收也不用考慮MemorySegment中的Java物件了。

  2. 更有效率的IO操作。在off-heap下,將MemorySegment寫到磁碟或是網路,可以支援zeor-copy技術,而on-heap的話,則至少需要一次記憶體拷貝。

  3. off-heap可用於錯誤恢復,比如JVM崩潰,在on-heap時,資料也隨之丟失,但在off-heap下,off-heap的資料可能還在。此外,off-heap上的資料還可以和其他程式共享。

  Spark的記憶體管理

  Spark的off-heap記憶體管理與Flink off-heap模式比較相似,也是通過Java UnSafe API直接訪問off-heap記憶體,通過定製的序列化工具將序列化後的二進位制資料儲存與off-heap上,Spark的資料結構和演算法直接訪問和操作在off-heap上的二進位制資料。Project Tungsten是一個正在進行中的專案,想了解具體進展可以訪問: [SPARK-7075] Project Tungsten (Spark 1.5 Phase 1) , [SPARK-9697] Project Tungsten (Spark 1.6)。

  快取友好的計算

  磁碟IO和網路IO之前一直被認為是Hadoop系統的瓶頸,但是隨著Spark,Flink等新一代的分散式計算框架的發展,越來越多的趨勢使得CPU/Memory逐漸成為瓶頸,這些趨勢包括:

  1. 更先進的IO硬體逐漸普及。10GB網路和SSD硬碟等已經被越來越多的資料中心使用。

  2. 更高效的儲存格式。Parquet,ORC等列式儲存被越來越多的Hadoop專案支援,其非常高效的壓縮效能大大減少了落地儲存的資料量。

  3. 更高效的執行計劃。例如Spark DataFrame的執行計劃優化器的Fliter-Push-Down優化會將過濾條件儘可能的提前,甚至提前到Parquet的資料訪問層,使得在很多實際的工作負載中,並不需要很多的磁碟IO。

  由於CPU處理速度和記憶體訪問速度的差距,提升CPU的處理效率的關鍵在於化的利用L1/L2/L3/Memory,減少任何不必要的Cache miss。定製的序列化工具給Spark和Flink提供了可能,通過定製的序列化工具,Spark和Flink訪問的二進位制資料本身,因為佔用記憶體較小,儲存密度比較大,而且還可以在設計資料結構和演算法時,儘量連續儲存,減少記憶體碎片化對Cache命中率的影響,甚至更進一步,Spark與Flink可以將需要操作的部分資料(如排序時的Key)連續儲存,而將其他部分的資料儲存在其他地方,從而可能的提升Cache命中的概率。

  Flink中的資料結構

  以Flink中的排序為例,排序通常是分散式計算框架中一個非常重的操作,Flink通過特殊設計的排序演算法,獲得了非常好了效能,其排序演算法的實現如下:

  1. 將待排序的資料經過序列化後儲存在兩個不同的MemorySegment集中。資料全部的序列化值存放於其中一個MemorySegment集中。資料序列化後的Key和指向個MemorySegment集中其值的指標存放於第二個MemorySegment集中。

  2. 對第二個MemorySegment集中的Key進行排序,如需交換Key位置,只需交換對應的Key+Pointer的位置,個MemorySegment集中的資料無需改變。 當比較兩個Key大小時,TypeComparator提供了直接基於二進位制資料的對比方法,無需反序列化任何資料。

  3. 排序完成後,訪問資料時,按照第二個MemorySegment集中Key的順序訪問,並通過Pinter值找到資料在個MemorySegment集中的位置,通過TypeSerializer反序列化成Java物件返回。

  圖3 Flink排序演算法

  這樣實現的好處有:

  1. 通過Key和Full data分離儲存的方式,儘量將被操作的資料很小化,提高Cache命中的概率,從而提高CPU的吞吐量。

  2. 移動資料時,只需移動Key+Pointer,而無須移動資料本身,大大減少了記憶體拷貝的資料量。

  3. TypeComparator直接基於二進位制資料進行操作,節省了反序列化的時間。

  Spark的資料結構

  Spark中基於off-heap的排序與Flink幾乎一模一樣,在這裡就不多做介紹了,感興趣的話,請參考: https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html

      總結

  本文主要介紹了Hadoop生態圈的一些專案遇到的一些因為JVM記憶體管理導致的問題,以及社群是如何應對的。基本上,以記憶體為中心的分散式計算框架,大都開始了部分脫離JVM,走上了自己管理記憶體的路線,Project Tungsten甚至更進一步,提出了通過LLVM,將部分邏輯編譯成原生代碼,從而更加深入的挖掘SIMD等CPU潛力。此外,除了Spark,Flink這樣的分散式計算框架,HBase(HBASE-11425),HDFS(HDFS-7844)等專案也在部分效能相關的模組通過自己管理記憶體來規避JVM的一些缺陷,同時提升效能。