1. 程式人生 > >scala 並行集合在spark中的應用

scala 並行集合在spark中的應用

一.scala並行集合

現在有一個集合,對它的每個元素進行處理,比如:

val arr = List[String]("a","b","c")
arr.foreach(println(_))
//輸出結果: 
a
b
c

//並行集合
arr.par.foreach(println(_))
//輸出結果: 
a
c
b

觀察執行緒

println((0 to 1000).map{r => Thread.currentThread.getName}.distinct)

Vector(main)

println((0 to 1000).par.map{r => Thread.currentThread.
getName}.distinct) ParVector(ForkJoinPool-1-worker-5, ForkJoinPool-1-worker-7, ForkJoinPool-1-worker-3, ForkJoinPool-1-worker-1)

二.在spark中的使用

現在你有n個相互之間沒有關聯的任務需要執行,比如說從100張表中取出資料並進行資料清洗。按照spark的機制,job初使的併發度為資料的partition數(在hdfs中為資料的分塊數,在es和cass中讀取的資料為資料的副本數)。

所有即使你給任務分配了100個core,在資料讀取階段也只會使用到其中幾個。

val table_list = List[String]("table1"
,"table2","table3","...") val iter = arr.iterator while(iter.hasNext){ val tableName = iter.next //根據table從資料來源讀取資料 val data = ... //資料清洗操作 data.map(...) }

改成並行集合的模式,程式會以最大的併發度(你設定的core數量)從資料來源拉取資料,大大提高了程式執行效率。

val table_list = List[String]("table1","table2","table3","...")
table_list.par.foreach(r=>{
    val tableName = r
    //根據table從資料來源讀取資料
    val data = ...
//資料清洗操作 data.map(...) })

當然也可以使用scala.concurrent包中的Future等可以達到相同的作用

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._

val arr = List[String]("a","b","c")

val futures = arr.map(r=>Future{
    val tableName = r
    //根據table從資料來源讀取資料
    val data = ...
    //資料清洗操作
    data.map(...)
})

val future = Future.sequence(futures)
//任務最長執行時間
Await.result(future, 1 hour)