Spark-共享變數
阿新 • • 發佈:2018-12-16
共享變數
通常,當在遠端叢集節點上執行傳遞給Spark操作(例如mapor reduce)的函式時,它將在函式中使用的所有變數的單獨副本上工作。這些變數將複製到每臺計算機,並且遠端計算機上的變數更新不會傳播回驅動程式。支援跨任務的通用,讀寫共享變數效率低下。但是,Spark確實為兩種常見的使用模式提供了兩種有限型別的共享變數:廣播變數和累加器。
廣播變數
廣播變數允許程式設計師在每臺機器上保留一個只讀變數,而不是隨副本一起傳送它的副本。例如,它們可用於以有效的方式為每個節點提供大輸入資料集的副本。Spark還嘗試使用有效的廣播演算法來分發廣播變數,以降低通訊成本。
Spark動作通過一組階段執行,由分散式“shuffle”操作分隔。Spark自動廣播每個階段中任務所需的公共資料。以這種方式廣播的資料以序列化形式快取並在執行每個任務之前反序列化。這意味著顯式建立廣播變數僅在跨多個階段的任務需要相同資料或以反序列化形式快取資料很重要時才有用。
廣播變數是v通過呼叫從變數建立的SparkContext.broadcast(v)。廣播變數是一個包裝器v,可以通過呼叫該value 方法來訪問它的值。下面的程式碼顯示了這個:
官方文件
//來自官方文件 //scala版 scala> val broadcastVar = sc.broadcast(Array(1, 2, 3)) broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0) scala> broadcastVar.value res0: Array[Int] = Array(1, 2, 3) //java版 Broadcast<int[]> broadcastvar=sc.broadcast(new int[]{1,2,3}); broadcastvar.value(); //練習scala.廣播變數BroadcastVariable import org.apache.spark.SparkConf import org.apache.spark.SparkContext Object BroadcastVariable{ val conf=new SparkConf().setAppName("BroadcastVariable").setMaster("local") val sc = new SparkContext(conf) val factor=3 val factorBroadcast=sc.broadcast(factor) val numberArray=Array(1,2,3,4,5) val numbers=sc.parallelize(numberArray,1) val multipleNumbers=numbers.map(num=>num*factorBroadcast.value) multipleNumbers.foreach(num=>println(num)) } //累加變數AccumulatorVariable object AccumulatorVariable { def main(args: Array[String]) { val conf = new SparkConf() .setAppName("AccumulatorVariable") .setMaster("local") val sc = new SparkContext(conf) val sum = sc.accumulator(0) val numberArray = Array(1, 2, 3, 4, 5) val numbers = sc.parallelize(numberArray, 1) numbers.foreach { num => sum += num } println(sum) } }
圖解