1. 程式人生 > 實用技巧 >spark調優--程式碼調優

spark調優--程式碼調優

一、程式碼調優

1、避免建立重複的RDD:對於同一份資料,只應該建立一個RDD,不能建立多個RDD來代表同一份資料。

  一些Spark初學者在剛開始開發Spark作業時,或者是有經驗的工程師在開發RDD lineage極其冗長的Spark作業時,可能會忘了自己之前對於某一份資料已經建立過一個RDD了,從而導致對於同一份資料,建立了多個RDD。這就意味著,我們的Spark作業會進行多次重複計算來建立多個代表相同資料的RDD,進而增加了作業的效能開銷。

// 需要對名為“hello.txt”的HDFS檔案進行一次map操作,再進行一次reduce操作。也就是說,需要對一份資料執行兩次運算元操作。

// 錯誤的做法:對於同一份資料執行多次運算元操作時,建立多個RDD。
// 這裡執行了兩次textFile方法,針對同一個HDFS檔案,建立了兩個RDD出來,然後分別對每個RDD都執行了一個運算元操作。
// 這種情況下,Spark需要從HDFS上兩次載入hello.txt檔案的內容,並建立兩個單獨的RDD;第二次載入HDFS檔案以及建立RDD的效能開銷,很明顯是白白浪費掉的。
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt")
rdd1.map(...)
val rdd2 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt")
rdd2.reduce(...)

// 正確的用法:對於一份資料執行多次運算元操作時,只使用一個RDD。
// 這種寫法很明顯比上一種寫法要好多了,因為我們對於同一份資料只建立了一個RDD,然後對這一個RDD執行了多次運算元操作。
// 但是要注意到這裡為止優化還沒有結束,由於rdd1被執行了兩次運算元操作,第二次執行reduce操作的時候,還會再次從源頭處重新計算一次rdd1的資料,因此還是會有重複計算的效能開銷。
// 要徹底解決這個問題,必須結合“原則三:對多次使用的RDD進行持久化”,才能保證一個RDD被多次使用時只被計算一次。
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt")
rdd1.map(...)
rdd1.reduce(...)

2、儘可能複用同一個RDD:對不同的資料執行運算元操作時還要儘可能地複用一個RDD。比如說,有一個RDD的資料格式是key-value型別的,另一個是單value型別的,這兩個RDD的value資料是完全一樣的。那麼此時我們可以只使用key-value型別的那個RDD,因為其中已經包含了另一個的資料。對於類似這種多個RDD的資料有重疊或者包含的情況,我們應該儘量複用一個RDD,這樣可以儘可能地減少RDD的數量,從而儘可能減少運算元執行的次數。

// 錯誤的做法。

// 有一個<Long, String>格式的RDD,即rdd1。
// 接著由於業務需要,對rdd1執行了一個map操作,建立了一個rdd2,而rdd2中的資料僅僅是rdd1中的value值而已,也就是說,rdd2是rdd1的子集。
JavaPairRDD<Long, String> rdd1 = ...
JavaRDD<String> rdd2 = rdd1.map(...)

// 分別對rdd1和rdd2執行了不同的運算元操作。
rdd1.reduceByKey(...)
rdd2.map(...)

// 正確的做法。

// 上面這個case中,其實rdd1和rdd2的區別無非就是資料格式不同而已,rdd2的資料完全就是rdd1的子集而已,卻建立了兩個rdd,並對兩個rdd都執行了一次運算元操作。
// 此時會因為對rdd1執行map運算元來建立rdd2,而多執行一次運算元操作,進而增加效能開銷。

// 其實在這種情況下完全可以複用同一個RDD。
// 我們可以使用rdd1,既做reduceByKey操作,也做map操作。
// 在進行第二個map操作時,只使用每個資料的tuple._2,也就是rdd1中的value值,即可。
JavaPairRDD<Long, String> rdd1 = ...
rdd1.reduceByKey(...)
rdd1.map(tuple._2...)

// 第二種方式相較於第一種方式而言,很明顯減少了一次rdd2的計算開銷。
// 但是到這裡為止,優化還沒有結束,對rdd1我們還是執行了兩次運算元操作,rdd1實際上還是會被計算兩次。
// 因此還需要配合“原則三:對多次使用的RDD進行持久化”進行使用,才能保證一個RDD被多次使用時只被計算一次。

3、對多次使用的RDD進行持久化:

  儘可能複用RDD的基礎上,需要進行第二步優化,也就是要保證對一個RDD執行多次運算元操作時,這個RDD本身僅僅被計算一次。

  Spark中對於一個RDD執行多次運算元的預設原理是這樣的每次對一個RDD執行一個運算元操作時,都會重新從源頭處計算一遍,計算出那個RDD來,然後再對這個RDD執行你的運算元操作。這種方式的效能是很差的。

  因此對於這種情況,我們的建議是:對多次使用的RDD進行持久化。此時Spark就會根據持久化策略,將RDD中的資料儲存到記憶體或者磁碟中。以後每次對這個RDD進行運算元操作時,都會直接從記憶體或磁碟中提取持久化的RDD資料,然後執行運算元,而不會從源頭處重新計算一遍這個RDD,再執行運算元操作。

  持久化操作有:cache()、persist()、checkpoint()

// 如果要對一個RDD進行持久化,只要對這個RDD呼叫cache()和persist()即可。

// 正確的做法。
// cache()方法表示:使用非序列化的方式將RDD中的資料全部嘗試持久化到記憶體中。
// 此時再對rdd1執行兩次運算元操作時,只有在第一次執行map運算元時,才會將這個rdd1從源頭處計算一次。
// 第二次執行reduce運算元時,就會直接從記憶體中提取資料進行計算,不會重複計算一個rdd。
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt").cache()
rdd1.map(...)
rdd1.reduce(...)

// persist()方法表示:手動選擇持久化級別,並使用指定的方式進行持久化。
// 比如說,StorageLevel.MEMORY_AND_DISK_SER表示,記憶體充足時優先持久化到記憶體中,記憶體不充足時持久化到磁碟檔案中。
// 而且其中的_SER字尾表示,使用序列化的方式來儲存RDD資料,此時RDD中的每個partition都會序列化成一個大的位元組陣列,然後再持久化到記憶體或磁碟中。
// 序列化的方式可以減少持久化的資料對記憶體/磁碟的佔用量,進而避免記憶體被持久化資料佔用過多,從而發生頻繁GC。
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt").persist(StorageLevel.MEMORY_AND_DISK_SER)
rdd1.map(...)
rdd1.reduce(...)

4、儘量避免使用shuffle類的運算元:要儘量避免使用shuffle類運算元。因為Spark作業執行過程中,最消耗效能的地方就是shuffle過程。shuffle過程,簡單來說,就是將分佈在叢集中多個節點上的同一個key,拉取到同一個節點上,進行聚合或join等操作。比如reduceByKey、join等運算元,都會觸發shuffle操作。

  shuffle過程中,各個節點上的相同key都會先寫入本地磁碟檔案中,然後其他節點需要通過網路傳輸拉取各個節點上的磁碟檔案中的相同key。而且相同key都拉取到同一個節點進行聚合操作時,還有可能會因為一個節點上處理的key過多,導致記憶體不夠存放,進而溢寫到磁碟檔案中。因此在shuffle過程中,可能會發生大量的磁碟檔案讀寫的IO操作,以及資料的網路傳輸操作。磁碟IO和網路資料傳輸也是shuffle效能較差的主要原因。

  因此在我們的開發過程中,能避免則儘可能避免使用reduceByKey、join、distinct、repartition等會進行shuffle的運算元,儘量使用map類的非shuffle運算元。這樣的話,沒有shuffle操作或者僅有較少shuffle操作的Spark作業,可以大大減少效能開銷。

典型的解決:廣播變數+map類的transformation類運算元代替join

// 傳統的join操作會導致shuffle操作。
// 因為兩個RDD中,相同的key都需要通過網路拉取到一個節點上,由一個task進行join操作。
val rdd3 = rdd1.join(rdd2)

// Broadcast+map的join操作,不會導致shuffle操作。
// 使用Broadcast將一個數據量較小的RDD作為廣播變數。
val rdd2Data = rdd2.collect()
val rdd2DataBroadcast = sc.broadcast(rdd2Data)

// 在rdd1.map運算元中,可以從rdd2DataBroadcast中,獲取rdd2的所有資料。
// 然後進行遍歷,如果發現rdd2中某條資料的key與rdd1的當前資料的key是相同的,那麼就判定可以進行join。
// 此時就可以根據自己需要的方式,將rdd1當前資料與rdd2中可以連線的資料,拼接在一起(String或Tuple)。
val rdd3 = rdd1.map(rdd2DataBroadcast...)

// 注意,以上操作,建議僅僅在rdd2的資料量比較少(比如幾百M,或者一兩G)的情況下使用。
// 因為每個Executor的記憶體中,都會駐留一份rdd2的全量資料。

5、使用map-side預聚合的shuffle操作:儘量使用map端有預聚合的運算元

  如果因為業務需要,一定要使用shuffle操作,無法用map類的運算元來替代,那麼儘量使用可以map-side預聚合的運算元。

  所謂的map-side預聚合,就是在每個節點本地對相同的key進行一次聚合操作,類似於MapReduce中的本地combiner。map-side預聚合之後,每個節點本地就只會有一條相同的key,因為多條相同的key都被聚合起來了。其他節點在拉取所有節點上的相同key時,就會大大減少需要拉取的資料數量,從而也就減少了磁碟IO以及網路傳輸開銷。

  通常來說,在可能的情況下,建議使用reduceByKey或者aggregateByKey運算元來替代掉groupByKey運算元。因為reduceByKey和aggregateByKey運算元都會使用使用者自定義的函式對每個節點本地的相同key進行預聚合。而groupByKey運算元是不會進行預聚合的,全量的資料會在叢集的各個節點之間分發和傳輸,效能相對來說比較差。

  常見的帶有map端預聚合的運算元有:reduceByKey,combineByKey,aggregateByKey

  比如如下兩幅圖,就是典型的例子,分別基於reduceByKey和groupByKey進行單詞計數。其中第一張圖是groupByKey的原理圖,可以看到,沒有進行任何本地聚合時,所有資料都會在叢集節點之間傳輸;第二張圖是reduceByKey的原理圖,可以看到,每個節點本地的相同key資料,都進行了預聚合,然後才傳輸到其他節點上進行全域性聚合。

6 使用高效能的運算元:

(1)reduceByKey 代替 groupByKey:前者有map端預聚合,減少shuffle網路傳輸。

(2)mapPartitions 代替 map:

  mapPartitions類的運算元,一次函式呼叫會處理一個partition所有的資料,而不是一次函式呼叫處理一條,效能相對來說會高一些。但是有的時候,使用mapPartitions會出現OOM(記憶體溢位)的問題。因為單次函式呼叫就要處理掉一個partition所有的資料,如果記憶體不夠,垃圾回收時是無法回收掉太多物件的,很可能出現OOM異常。所以使用這類操作時要慎重!

  如果是普通的 map,比如一個 partition 中有 1 萬條資料;ok,那麼你的 function 要執行 和計算 1 萬次。但是,使用 MapPartitions 操作之後,一個 task 僅僅會執行一次 function,function 一次接收所有的 partition 資料。只要執行一次就可以了,效能比較高。    

  MapPartitions 操作的缺點:

  如果是普通的 map 操作,一次 function 的執行就處理一條資料;那麼如果記憶體不夠用 的情況下,比如處理了 1 千條資料了,那麼這個時候記憶體不夠了,那麼就可以將已經處理完 的 1 千條資料從記憶體裡面垃圾回收掉,或者用其他方法,騰出空間來吧。 但是 MapPartitions 操作,對於大量資料來說,比如甚至一個 partition,100 萬資料,一 次傳入一個 function 以後,那麼可能一下子記憶體不夠,但是又沒有辦法去騰出記憶體空間來, 可能就 OOM,記憶體溢位。

  注意: 在專案中,自己先去估算一下 RDD 的資料量,以及每個 partition 的量,還有自己分配 給每個 executor 的記憶體資源。看看一下子記憶體容納所有的 partition 資料,行不行。如果行, 可以試一下,能跑通就好。效能肯定是有提升的。 但是試了一下以後,發現,不行,OOM 了,那就放棄吧

(3)foreachPartition 代替 foreach:

  原理類似於“使用mapPartitions替代map”,也是一次函式呼叫處理一個partition的所有資料,而不是一次函式呼叫處理一條資料。在實踐中發現,foreachPartitions類的運算元,對效能的提升還是很有幫助的。比如在foreach函式中,將RDD中所有資料寫MySQL,那麼如果是普通的foreach運算元,就會一條資料一條資料地寫,每次函式呼叫可能就會建立一個數據庫連線,此時就勢必會頻繁地建立和銷燬資料庫連線,效能是非常低下;但是如果用foreachPartitions運算元一次性處理一個partition的資料,那麼對於每個partition,只要建立一個數據庫連線即可,然後執行批量插入操作,此時效能是比較高的。實踐中發現,對於1萬條左右的資料量寫MySQL,效能可以提升30%以上。

  在實際生產環境中,清一色,都是使用 foreachPartition 操作;但是有個問題,跟 mapPartitions 操作一樣,如果一個 partition 的數量真的特別特別大,比如真的是 100 萬,那基本上就不太 靠譜了。(一下子進來,很有可能會發生 OOM,記憶體溢位的問題)

  此外,foreachPartition也可以解決程式中的序列化問題,若在呼叫某個類的方法的時候,這個類沒有實現序列化介面,或者這個類裡面依賴的類沒法實現序列化介面。就可以將new操作放在foreachpartion裡面,將物件的建立過程推遲到excutor內,而不是driver端。若在driver端建立物件,就需要將物件傳輸到excutor裡,所以需要序列化。

// 每個小時的資料分佈情況統計
      baseData.map(t => ("B-" + t._2, t._3)).reduceByKey((list1, list2) => {
        //根據小時進行聚合
        (list1 zip list2) map (x => x._1 + x._2)
      }).foreachPartition(itr => {

        val client = JedisUtils.getJedisClient()

        itr.foreach(tp => {
          // B-2017111816
          client.hincrBy(tp._1, "total", tp._2(0).toLong)//計算充值總量
          client.hincrBy(tp._1, "succ", tp._2(1).toLong)//計算充值成功量

          client.expire(tp._1, 60 * 60 * 24 * 2)
        })
        client.close()
      })

(4)filter 對大量資料過濾之後使用 coalesce 減少分割槽

  通常對一個RDD執行filter運算元過濾掉RDD中較多資料後(比如30%以上的資料),建議使用coalesce運算元,手動減少RDD的partition數量,將RDD中的資料壓縮到更少的partition中去。因為filter之後,RDD的每個partition中都會有很多資料被過濾掉,此時如果照常進行後續的計算,其實每個task處理的partition中的資料量並不是很多,有一點資源浪費,而且此時處理的task越多,可能速度反而越慢。因此用coalesce減少partition數量,將RDD中的資料壓縮到更少的partition之後,只要使用更少的task即可處理完所有的partition。在某些場景下,對於效能的提升會有一定的幫助。

  預設情況下,經過了這種 filter 之後,RDD 中的每個 partition 的資料量,可能都不太一樣了。 (原本每個 partition 的資料量可能是差不多的)
問題:
1、每個 partition 資料量變少了,但是在後面進行處理的時候,還是要跟 partition 數量一樣 數量的 task,來進行處理;有點浪費 task 計算資源。
2、每個 partition 的資料量不一樣,會導致後面的每個 task 處理每個 partition 的時候,每個 task 要處理的資料量就不同,這個時候很容易發生資料傾斜。。。。
比如說,第二個 partition 的資料量才 100;但是第三個 partition 的資料量是 900;那麼在後 面的 task 處理邏輯一樣的情況下,不同的 task 要處理的資料量可能差別達到了 9 倍,甚至 10 倍以上;同樣也就導致了速度的差別在 9 倍,甚至 10 倍以上。
解決:
1、針對第一個問題,我們希望可以進行 partition 的壓縮吧,因為資料量變少了,那麼 partition 其實也完全可以對應的變少。比如原來是 4 個 partition,現在完全可以變成 2 個 partition。 那麼就只要用後面的 2 個 task 來處理即可。就不會造成 task 計算資源的浪費。
2、針對第二個問題,其實解決方案跟第一個問題是一樣的;也是去壓縮 partition,儘量讓 每個 partition 的資料量差不多。那麼這樣的話,後面的 task 分配到的 partition 的資料量也就 差不多。不會造成有的 task 執行速度特別慢,有的 task 執行速度特別快。避免了資料傾斜 的問題。

coalesce 運算元 主要就是用於在 filter 操作之後,針對每個 partition 的資料量各不相同的情況,來壓縮 partition 的數量。減少 partition 的數量,而且讓每個 partition 的資料量都儘量均勻緊湊。

(5)repartitionAndSortWithinPartitions 替代 repartition 與 sort 類操作

  repartitionAndSortWithinPartitions 是Spark 官網推薦的一個運算元,官方建議,如果需要在 repartition 重分割槽之後,還要進行排序,建議直接使用 repartitionAndSortWithinPartitions 運算元。因為該運算元可以一邊進行重分割槽的 shuffle 操作,一邊進行排序。shuffle與sort兩個操作同時進行,比先shuffle再sort來說,效能可能是要高的。

7、廣播大變數

  有時在開發過程中,會遇到需要在運算元函式中使用外部變數的場景(尤其是大變數,比如100M以上的大集合),那麼此時就應該使用Spark的廣播(Broadcast)功能來提升效能。

  在運算元函式中使用到外部變數時,預設情況下,Spark會將該變數複製多個副本,通過網路傳輸到task中,此時每個task都有一個變數副本。如果變數本身比較大的話(比如100M,甚至1G),那麼大量的變數副本在網路中傳輸的效能開銷,以及在各個節點的Executor中佔用過多記憶體導致的頻繁GC,都會極大地影響效能。

  因此對於上述情況,如果使用的外部變數比較大,建議使用Spark的廣播功能,對該變數進行廣播。廣播後的變數,會保證每個Executor的記憶體中,只駐留一份變數副本,而Executor中的task執行時共享該Executor中的那份變數副本。這樣的話,可以大大減少變數副本的數量,從而減少網路傳輸的效能開銷,並減少對Executor記憶體的佔用開銷,降低GC的頻率。

// 以下程式碼在運算元函式中,使用了外部的變數。
// 此時沒有做任何特殊操作,每個task都會有一份list1的副本。
val list1 = ...
rdd1.map(list1...)

// 以下程式碼將list1封裝成了Broadcast型別的廣播變數。
// 在運算元函式中,使用廣播變數時,首先會判斷當前task所在Executor記憶體中,是否有變數副本。
// 如果有則直接使用;如果沒有則從Driver或者其他Executor節點上遠端拉取一份放到本地Executor記憶體中。
// 每個Executor記憶體中,就只會駐留一份廣播變數副本。
val list1 = ...
val list1Broadcast = sc.broadcast(list1)
rdd1.map(list1Broadcast...)

8、使用Kryo優化序列化效能

在Spark中,主要有三個地方涉及到了序列化:

  • 在運算元函式中使用到外部變數時,該變數會被序列化後進行網路傳輸(見“原則七:廣播大變數”中的講解)。
  • 將自定義的型別作為RDD的泛型型別時(比如JavaRDD,Student是自定義型別),所有自定義型別物件,都會進行序列化。因此這種情況下,也要求自定義的類必須實現Serializable介面。
  • 使用可序列化的持久化策略時(比如MEMORY_ONLY_SER),Spark會將RDD中的每個partition都序列化成一個大的位元組陣列。

  對於這三種出現序列化的地方,我們都可以通過使用Kryo序列化類庫,來優化序列化和反序列化的效能。Spark預設使用的是Java的序列化機制,也就是ObjectOutputStream/ObjectInputStream API來進行序列化和反序列化。但是Spark同時支援使用Kryo序列化庫,Kryo序列化類庫的效能比Java序列化類庫的效能要高很多。官方介紹,Kryo序列化機制比Java序列化機制,效能高10倍左右。Spark之所以預設沒有使用Kryo作為序列化類庫,是因為Kryo要求最好要註冊所有需要進行序列化的自定義型別,因此對於開發者來說,這種方式比較麻煩。

  以下是使用Kryo的程式碼示例,我們只要設定序列化類,再註冊要序列化的自定義型別即可(比如運算元函式中使用到的外部變數型別、作為RDD泛型型別的自定義型別等):

// 建立SparkConf物件。
val conf = new SparkConf().setMaster(...).setAppName(...)
// 設定序列化器為KryoSerializer。
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 註冊要序列化的自定義型別。
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))

9、優化資料結構

Java中,有三種類型比較耗費記憶體:

  • 物件,每個Java物件都有物件頭、引用等額外的資訊,因此比較佔用記憶體空間。
  • 字串,每個字串內部都有一個字元陣列以及長度等額外資訊。
  • 集合型別,比如HashMap、LinkedList等,因為集合型別內部通常會使用一些內部類來封裝集合元素,比如Map.Entry。

因此Spark官方建議,在Spark編碼實現中,特別是對於運算元函式中的程式碼,儘量不要使用上述三種資料結構,儘量使用字串替代物件,使用原始型別(比如Int、Long)替代字串,使用陣列替代集合型別,這樣儘可能地減少記憶體佔用,從而降低GC頻率,提升效能。

但是在筆者的編碼實踐中發現,要做到該原則其實並不容易。因為我們同時要考慮到程式碼的可維護性,如果一個程式碼中,完全沒有任何物件抽象,全部是字串拼接的方式,那麼對於後續的程式碼維護和修改,無疑是一場巨大的災難。同理,如果所有操作都基於陣列實現,而不使用HashMap、LinkedList等集合型別,那麼對於我們的編碼難度以及程式碼可維護性,也是一個極大的挑戰。因此筆者建議,在可能以及合適的情況下,使用佔用記憶體較少的資料結構,但是前提是要保證程式碼的可維護性。

10、使用高效能的庫 fastutil:

  fastutil 是擴充套件了 Java 標準集合框架(Map、List、Set;HashMap、ArrayList、HashSet)的類庫,提供了特殊型別的 map、set、list 和 queue;

  fastutil 能夠提供更小的記憶體佔用,更快的存取速度;我們使用 fastutil 提供的集合類,來替代自己平時使用的 JDK 的原生的 Map、List、Set,好處在於,fastutil集合類,可以減小記憶體的佔用,並且在進行集合的遍歷、根據索引(或者 key)獲取元素的值和設定元素的值的時候,提供更快的存取速度;
  fastutil 也提供了 64 位的 array、set 和 list,以及高效能快速的,以及實用的 IO類,來處理二進位制和文字型別的檔案;
  fastutil 的每一種集合型別,都實現了對應的 Java 中的標準介面(比如 fastutil 的map,實現了 Java 的 Map 介面),因此可以直接放入已有系統的任何程式碼中。
  fastutil 還提供了一些 JDK 標準類庫中沒有的額外功能(比如雙向迭代器)。
  fastutil 除了物件和原始型別為元素的集合,fastutil 也提供引用型別的支援,但是對引用型別是使用等於號(=)進行比較的,而不是 equals()方法。
  fastutil 儘量提供了在任何場景下都是速度最快的集合類庫。

Spark 中應用 fastutil 的場景:

1 、如果運算元函式使用了外部變數;那麼第一,你可以使用 Broadcast 廣播變數優化;第二,可以使用 Kryo 序列化類庫,提升序列化效能和效率;第三,如果外部變數是某種比較大的集合,那麼可以考慮使用 fastutil 改寫外部變數,首先從源頭上就減少記憶體的佔用,通過廣播變數進一步減少記憶體佔用,再通過 Kryo 序列化類庫進一步減少記憶體佔用。
2 、在你的運算元函式裡,也就是 task 要執行的計算邏輯裡面,如果有邏輯中出現要建立比較大的 Map、List 等集合,可能會佔用較大的記憶體空間,而且可能涉及到消耗效能的遍歷、存取等集合操作;那麼此時,可以考慮將這些集合型別使用 fastutil 類庫重寫,使用了 fastutil 集合類以後,就可以在一定程度上,減少task 創建出來的集合型別的記憶體佔用。避免 executor 記憶體頻繁佔滿,頻繁喚起GC,導致效能下降。

fastutil 的使用:
第一步:在 pom.xml 中引用 fastutil 的包

<dependency>
<groupId>fastutil</groupId>
<artifactId>fastutil</artifactId>
<version>5.0.9</version>
</dependency>

List<Integer> => IntList