1. 程式人生 > >使用 Spark 中的共享變量

使用 Spark 中的共享變量

cast tap 足夠 情況 什麽 font aslist bsp 什麽事

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.broadcast.Broadcast;

import java.util.Arrays;
import java.util.List;


public class BroadcastVariable {
public static void main(String[] args) {

SparkConf conf = new SparkConf()
.setAppName("BroadcastVariable")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);

List<Integer> numbers = Arrays.asList(1,2,3,4,5);

JavaRDD<Integer> rdd = sc.parallelize(numbers);

final int factor = 3;

JavaRDD<Integer> newNumbers = rdd.map(new Function<Integer, Integer>() {
public Integer call(Integer v1) throws Exception {
return v1 * factor;
}
});


newNumbers.foreach(new VoidFunction<Integer>() {
public void call(Integer number) throws Exception {
System.out.println(number);
}
});
}
}

如上代碼在Driver端定義了一個變量 factor,在函數中調用這個factor。實際的執行過程中會發生什麽事呢?
假設一個節點上有100個task,那麽Spark會為每個task復制一份factor變量放在內存中。
但其實我們只是在函數中讀取了這個變量的值進行了計算,完全沒有必要復制100份,只需要在當前的Executor中保留一份,所有的task都來讀取這一份數據就足夠了。
設想一下,如果要共享一個很大的變量,在每個task中都復制一份無疑會消耗巨大的網絡帶寬和節點內存,這是非常不合理的。

基於這種情況,我們就可以使用廣播變量。
package com.rabbit;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.broadcast.Broadcast;

import java.util.Arrays;
import java.util.List;

public class BroadcastVariable {
public static void main(String[] args) {

SparkConf conf = new SparkConf()
.setAppName("BroadcastVariable")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);

List<Integer> numbers = Arrays.asList(1,2,3,4,5);
JavaRDD<Integer> rdd = sc.parallelize(numbers);

final int factor = 3;
//將factor轉為廣播變量
final Broadcast<Integer> broadcastFactor = sc.broadcast(factor);
JavaRDD<Integer> newNumbers = rdd.map(new Function<Integer, Integer>() {
public Integer call(Integer v1) throws Exception {
//使用廣播變量時,調用 value()方法獲得其內部封裝的值
int factor = broadcastFactor.value();
return v1 * factor;
}
});

newNumbers.foreach(new VoidFunction<Integer>() {
public void call(Integer number) throws Exception {
System.out.println(number);
}
});
}
}

Scala 版本:
import org.apache.spark.{SparkConf, SparkContext}

object BroadcastVariable {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("BroadcastVariable")
.setMaster("local")

val sc = new SparkContext(conf)

val arr = Array(1,2,3,4,5)
val numbers = sc.parallelize(arr)
val factor = 3;
val broadcastFactor = sc.broadcast(factor)

val newNumbers = numbers.map(number => number * broadcastFactor.value)

newNumbers.foreach(number => println(number))
}

}
 

使用 Spark 中的共享變量