scala 並行集合在spark中的應用
阿新 • • 發佈:2019-02-10
一.scala並行集合
現在有一個集合,對它的每個元素進行處理,比如:
val arr = List[String]("a","b","c")
arr.foreach(println(_))
//輸出結果:
a
b
c
//並行集合
arr.par.foreach(println(_))
//輸出結果:
a
c
b
觀察執行緒
println((0 to 1000).map{r => Thread.currentThread.getName}.distinct)
Vector(main)
println((0 to 1000).par.map{r => Thread.currentThread. getName}.distinct)
ParVector(ForkJoinPool-1-worker-5, ForkJoinPool-1-worker-7, ForkJoinPool-1-worker-3, ForkJoinPool-1-worker-1)
二.在spark中的使用
現在你有n個相互之間沒有關聯的任務需要執行,比如說從100張表中取出資料並進行資料清洗。按照spark的機制,job初使的併發度為資料的partition數(在hdfs中為資料的分塊數,在es和cass中讀取的資料為資料的副本數)。
所有即使你給任務分配了100個core,在資料讀取階段也只會使用到其中幾個。
val table_list = List[String]("table1" ,"table2","table3","...")
val iter = arr.iterator
while(iter.hasNext){
val tableName = iter.next
//根據table從資料來源讀取資料
val data = ...
//資料清洗操作
data.map(...)
}
改成並行集合的模式,程式會以最大的併發度(你設定的core數量)從資料來源拉取資料,大大提高了程式執行效率。
val table_list = List[String]("table1","table2","table3","...")
table_list.par.foreach(r=>{
val tableName = r
//根據table從資料來源讀取資料
val data = ...
//資料清洗操作
data.map(...)
})
當然也可以使用scala.concurrent
包中的Future等可以達到相同的作用
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
val arr = List[String]("a","b","c")
val futures = arr.map(r=>Future{
val tableName = r
//根據table從資料來源讀取資料
val data = ...
//資料清洗操作
data.map(...)
})
val future = Future.sequence(futures)
//任務最長執行時間
Await.result(future, 1 hour)