1. 程式人生 > 實用技巧 >SparkCore | Rdd依賴關係| 資料讀取儲存| 廣播變數和累加器

SparkCore | Rdd依賴關係| 資料讀取儲存| 廣播變數和累加器

閱讀目錄

Spark中三大資料結構:RDD; 廣播變數: 分散式只讀共享變數;累加器:分散式只寫共享變數;執行緒和程序之間

1.RDD中的函式傳遞

自己定義一些RDD的操作,那麼此時需要主要的是,初始化工作是在Driver端進行的,而實際執行程式是在Executor端進行的,這就涉及到了跨程序通訊,是需要序列化的。

傳遞一個方法

複製程式碼

class Search(query: String){ // extends Serializable
  //過濾出包含字串的資料
  def isMatch(s: String): Boolean = {
    s.contains(query)
  }
  //過濾出包含字串的RDD
  def getMatch1
(rdd: RDD[String]): RDD[String] = { rdd.filter(isMatch) } //過濾出包含字串的RDD def getMatch2(rdd: RDD[String]): RDD[String] = { val str: String = this.query //將類變數賦值給區域性變數str,即可序列化; rdd.filter(x => x.contains(str)) } }

複製程式碼

複製程式碼

object TestSearch {
  def main(args: Array[String]): Unit = {
    //初始化配置資訊以及 sc
    val conf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)

    val rdd = sc.makeRDD(List("kris", "Baidu", "Google")) //建立一個RDD
    val search = new Search("ris")  //建立一個search物件
    println("===============")
    //運用第一個過濾函式並列印結果;
    val res: RDD[String] = search.getMatch1(rdd)
//java.io.NotSerializableException: com.atguigu.spark.Search //class Search(query: String) extends Serializable res.foreach(println(_)) } }

複製程式碼

//過濾出包含字串的RDD

def getMatch1 (rdd: RDD[String]): RDD[String] = {

rdd.filter(isMatch)

}

在這個方法中所呼叫的方法isMatch()是定義在Search這個類中的,實際上呼叫的是this. isMatch(),this表示Search這個類的物件,程式在執行過程中需要將Search物件序列化以後傳遞到Executor端。

解決方案使類繼承scala.Serializable即可。class Search() extends Serializable{...}

傳遞一個屬性

複製程式碼

//初始化sc
    val conf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)

    val rdd = sc.makeRDD(List("kris", "Baidu", "Google"))
    val search = new Search("ris")
    println("===============")

     val res2: RDD[String] = search.getMatch2(rdd)
     res2.foreach(println(_))

複製程式碼

rdd.filter(x => x.contains(query))

在這個方法中所呼叫的方法query是定義在Search這個類中的欄位,實際上呼叫的是this. query,this表示Search這個類的物件,程式在執行過程中需要將Search物件序列化以後傳遞到Executor端。

解決方案:將類變數query賦值給區域性變數如上所示;

2. RDD依賴關係

Lineage 血統

  RDD只支援粗粒度轉換,即在大量記錄上執行的單個操作。將建立RDD的一系列Lineage(血統)記錄下來,以便恢復丟失的分割槽。RDD的Lineage會記錄RDD的元資料資訊和轉換行為,當該RDD的部分分割槽資料丟失時,它可以根據這些資訊來重新運算和恢復丟失的資料分割槽

  RDD在Lineage依賴方面分為兩種Narrow Dependencies與Wide Dependencies用來解決資料容錯時的高效性。

  Narrow Dependencies是指父RDD的每一個分割槽最多被一個子RDD的分割槽所用

    表現為一個父RDD的分割槽對應於一個子RDD的分割槽或多個父RDD的分割槽對應於一個子RDD的分割槽,也就是說一個父RDD的一個分割槽不可能對應一個子RDD的多個分割槽。

  Wide Dependencies是指子RDD的分割槽依賴於父RDD的多個分割槽或所有分割槽

    也就是說存在一個父RDD的一個分割槽對應一個子RDD的多個分割槽。對與Wide Dependencies,這種計算的輸入和輸出在不同的節點上,lineage方法對與輸入節點完好,而輸出節點宕機時,通過重新計算,這種情況下,這種方法容錯是有效的,否則無效,因為無法重試,需要向上其祖先追溯看是否可以重試(這就是lineage,血統的意思),Narrow Dependencies對於資料的重算開銷要遠小於Wide Dependencies的資料重算開銷。

複製程式碼

scala> val x = sc.textFile("./wc.txt").flatMap(_.split(" ")).map((_,1))
x: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[104] at map at <console>:24
scala> x.toDebugString
res112: String =
(2) MapPartitionsRDD[104] at map at <console>:24 []  ## new MapPartitionsRDD(
 |  MapPartitionsRDD[103] at flatMap at <console>:24 []  ##new MapPartitionsRDD
 |  ./wc.txt MapPartitionsRDD[102] at textFile at <console>:24 []
 |  ./wc.txt HadoopRDD[101] at textFile at <console>:24 []
 
scala> x.dependencies ##可以看到它的依賴OneToOneDependency窄依賴
res113: Seq[org.apache.spark.Dependency[_]] = List([email protected])

複製程式碼

複製程式碼

scala> val y = x.reduceByKey(_+_)
y: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[105] at reduceByKey at <console>:26

scala> y.toDebugString
res114: String =
(2) ShuffledRDD[105] at reduceByKey at <console>:26 []
 +-(2) MapPartitionsRDD[104] at map at <console>:24 []
    |  MapPartitionsRDD[103] at flatMap at <console>:24 []
    |  ./wc.txt MapPartitionsRDD[102] at textFile at <console>:24 []
    |  ./wc.txt HadoopRDD[101] at textFile at <console>:24 []

scala> y.dependencies
res115: Seq[org.apache.spark.Dependency[_]] = List([email protected]) ##寬依賴,產生shuffle

複製程式碼

跨節點傳輸資料就產生shuffle

窄依賴和 寬依賴

DAG

DAG(Directed Acyclic Graph)叫做有向無環圖,原始的RDD通過一系列的轉換就就形成了DAG,根據RDD之間的依賴關係的不同將DAG劃分成不同的Stage,對於窄依賴,partition的轉換處理在Stage中完成計算。對於寬依賴,由於有Shuffle的存在,只能在parent RDD處理完成後,才能開始接下來的計算,因此寬依賴是劃分Stage的依據。

3. 任務劃分(重點)

RDD任務切分中間分為:Application、Job、Stage和Task

1)Application:初始化一個SparkContext即生成一個Application;提交一個jar包就是Application(一個Application可以 有多個job)

2)Job:一個Action運算元就會生成一個Job ;

3)Stage:根據RDD之間的依賴關係的不同將Job劃分成不同的Stage,遇到一個寬依賴則劃分一個Stage。

4)Task:Stage是一個TaskSet將Stage劃分的結果傳送到不同的Executor執行即為一個Task。

  注意:Application->Job->Stage->Task每一層都是1對n的關係。

  有多少個task,由你當前stage的最後一個RDD的分割槽數決定

窄依賴分兩種:OneToOneDependency和RangeDependency(如兩個分割槽union兩個分割槽 => 四個分割槽) NarrowDependency窄依賴

Union會產生窄依賴(檢視原始碼);map也是窄依賴; ReduceByKey是寬依賴,shuffledRDD---shufleDependency

scala> sc.makeRDD(1 to 8).map((_,1)).reduceByKey(_+_).collect
res3: Array[(Int, Int)] = Array((4,1), (6,1), (8,1), (2,1), (1,1), (7,1), (3,1), (5,1))

在寬依賴運算元reduceByKey那切一刀;

scala> sc.makeRDD(1 to 8).map((_,1)).reduceByKey(_+_).map((_,1)).reduceByKey(_+_).collect
res4: Array[((Int, Int), Int)] = Array(((6,1),1), ((3,1),1), ((8,1),1), ((2,1),1), ((5,1),1), ((1,1),1), ((7,1),1), ((4,1),1))

  兩個reduceByKey寬依賴,分成了3個stage;

RDD分割槽數對應Task數;

scala> sc.makeRDD(1 to 8,4).map((_,1)).coalesce(3,false).reduceByKey(_+_).coalesce(2,false).collect
res5: Array[(Int, Int)] = Array((6,1), (3,1), (4,1), (1,1), (7,1), (8,1), (5,1), (2,1))
  4個分割槽==> map運算元 4個分割槽 ==> 經過coalesce轉換為3個分割槽  reduceByKey寬依賴運算元切分了兩個stage   coalesce產生 2個分割槽 

  可以推斷得到產生了1個Application,2個stage,第一個stage產生了3個task;第二個stage產生了2個task;

3個task

2個task

4. 鍵值對RDD資料分割槽

Spark目前支援Hash分割槽和Range分割槽,使用者也可以自定義分割槽Hash分割槽為當前的預設分割槽,Spark中分割槽器直接決定了RDD中分割槽的個數、RDD中每條資料經過Shuffle過程屬於哪個分割槽和Reduce的個數

注意:(1)只有Key-Value型別的RDD才有分割槽器的,非Key-Value型別的RDD分割槽的值是None

   (2)每個RDD的分割槽ID範圍:0~numPartitions-1,決定這個值是屬於那個分割槽的。

獲取RDD分割槽:可以通過使用RDD的partitioner屬性來獲取 RDD 的分割槽方式。它會返回一個 scala.Option 物件, 通過get方法獲取其中的值。

複製程式碼

scala> val pairs = sc.parallelize(List((1,1),(2,2),(3,3)))
pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[3] at parallelize at <console>:24
scala> pairs.partitioner  檢視RDD的分割槽器
res5: Option[org.apache.spark.Partitioner] = None
scala>  import org.apache.spark.HashPartitioner  匯入HashPartitioner類
import org.apache.spark.HashPartitioner

scala> val partitioned = pairs.partitionBy(new HashPartitioner(2)) //使用HashPartitioner對RDD進行重新分割槽
partitioned: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[4] at partitionBy at <console>:27

scala> partitioned.partitioner
res6: Option[org.apache.spark.Partitioner] = Some([email protected])

複製程式碼

Hash分割槽

HashPartitioner分割槽的原理:對於給定的key,計算其hashCode,併除以分割槽的個數取餘,如果餘數小於0,則用餘數+分割槽的個數(否則加0),最後返回的值就是這個key所屬的分割槽ID。

複製程式碼

原始碼:  
def getPartition(key: Any): Int = key match {
    case null => 0  //key為null直接進0號分割槽;
    case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
  }
def nonNegativeMod(x: Int, mod: Int): Int = {
  val rawMod = x % mod
  rawMod + (if (rawMod < 0) mod else 0)
}

複製程式碼

rdd.partitionBy(new org.apache.spark.HashPartitioner(7) .partitioner檢視分割槽器

Ranger分割槽

HashPartitioner分割槽弊端:可能導致每個分割槽中資料量的不均勻,極端情況下會導致某些分割槽擁有RDD的全部資料。

RangePartitioner作用:將一定範圍內的數對映到某一個分割槽內,儘量保證每個分割槽中資料量的均勻,而且分割槽與分割槽之間是有序的,一個分割槽中的元素肯定都是比另一個分割槽內的元素小或者大,但是分割槽內的元素是不能保證順序的。簡單的說就是將一定範圍內的數對映到某一個分割槽內。實現過程為:

第一步:先從整個RDD中抽取出樣本資料,將樣本資料排序,計算出每個分割槽的最大key值,形成一個Array[KEY]型別的陣列變數rangeBounds;

第二步:判斷key在rangeBounds中所處的範圍,給出該key值在下一個RDD中的分割槽id下標;該分割槽器要求RDD中的KEY型別必須是可以排序的

自定義分割槽

要實現自定義的分割槽器,你需要繼承 org.apache.spark.Partitioner類並實現下面三個方法。

(1)numPartitions: Int:返回創建出來的分割槽數。

(2)getPartition(key: Any): Int:返回給定鍵的分割槽編號(0到numPartitions-1)。

(3)equals():Java 判斷相等性的標準方法。這個方法的實現非常重要,Spark 需要用這個方法來檢查你的分割槽器物件是否和其他分割槽器例項相同,這樣 Spark 才可以判斷兩個 RDD 的分割槽方式是否相同。

複製程式碼

class MyPartitioner(partitions: Int) extends Partitioner{ //傳入引數,可指定分割槽
  override def numPartitions: Int = partitions
  override def getPartition(key: Any): Int = { //只能根據key進行分割槽;而hadoop的分割槽既可根據key也可根據value (Partitioner.java)
    key.toString.toInt % partitions
    //0 //也可以寫0,不管傳進來什麼key,資料只進入0號分割槽;
  }
}

複製程式碼

測試

複製程式碼

object TextPartition {
  def main(args: Array[String]): Unit = {
    //初始化sc
    val conf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)

    val rdd: RDD[(Int, Int)] = sc.makeRDD(List((1,1), (2,1), (3,1), (4,1)))
    val rdd2: RDD[(Int, Int)] = rdd.partitionBy(new MyPartitioner(2))
    //val rdd: RDD[String] = sc.textFile("E:\\wc.txt")
    rdd2.saveAsTextFile("E:\\output")
      sc.stop()
  }
}

複製程式碼

結果是2個檔案(2個分割槽,資料進入了2個分割槽)

5. 資料讀取與儲存

Spark的資料讀取及資料儲存可以從兩個維度來作區分:檔案格式以及檔案系統。

檔案格式分為:Text檔案、Json檔案、Csv檔案、Sequence檔案以及Object檔案;

檔案系統分為:本地檔案系統、HDFS、HBASE以及資料庫。

檔案類資料讀取與儲存

Text檔案

1)資料讀取:textFile(String)

scala> val hdfsFile = sc.textFile("hdfs://hadoop101:9000/fruit.txt")

hdfsFile: org.apache.spark.rdd.RDD[String] = hdfs://hadoop102:9000/fruit.txt MapPartitionsRDD[21] at textFile at <console>:24

2)資料儲存: saveAsTextFile(String)

scala> hdfsFile.saveAsTextFile("/fruitOut")

Json檔案

如果JSON檔案中每一行就是一個JSON記錄,那麼可以通過將JSON檔案當做文字檔案來讀取,然後利用相關的JSON庫對每一條資料進行JSON解析。

注意:使用RDD讀取JSON檔案處理很複雜,同時SparkSQL集成了很好的處理JSON檔案的方式,所以應用中多是採用SparkSQL處理JSON檔案。

複製程式碼

scala> import scala.util.parsing.json.JSON
import scala.util.parsing.json.JSON

scala> sc.textFile("/opt/module/spark/spark-local/examples/src/main/resources/people.json").collect
res13: Array[String] = Array({"name":"Michael"}, {"name":"Andy", "age":30}, {"name":"Justin", "age":19})
scala> val y = sc.textFile("/opt/module/spark/spark-local/examples/src/main/resources/people.json")
y: org.apache.spark.rdd.RDD[String] = /opt/module/spark/spark-local/examples/src/main/resources/people.json MapPartitionsRDD[25] at textFile at <console>:25
scala> y.map(JSON.parseFull).collect
res19: Array[Option[Any]] = Array(Some(Map(name -> Michael)), Some(Map(name -> Andy, age -> 30.0)), Some(Map(name -> Justin, age -> 19.0)))

複製程式碼

Sequence檔案

SequenceFile檔案是Hadoop用來儲存二進位制形式的key-value對而設計的一種平面檔案(Flat File)。Spark 有專門用來讀取 SequenceFile 的介面。在 SparkContext 中,可以呼叫 sequenceFile[keyClass, valueClass](path)。

注意:SequenceFile檔案只針對PairRDD

scala> val rdd = sc.parallelize(Array((1,2),(3,4),(5,6)))
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[28] at parallelize at <console>:25
scala> rdd.saveAsSequenceFile("./output/seque") //
scala> sc.sequenceFile[Int,Int]("/opt/module/spark/spark-local/output/seque").collect  //必須加泛型,不然會報錯 ambiguous implicit values:
res22: Array[(Int, Int)] = Array((5,6), (1,2), (3,4))

物件檔案

物件檔案是將物件序列化後儲存的檔案,採用Java的序列化機制。可以通過objectFile[k,v](path) 函式接收一個路徑,讀取物件檔案,返回對應的 RDD,也可以通過呼叫saveAsObjectFile() 實現對物件檔案的輸出。因為是序列化所以要指定型別。

scala> val rdd = sc.parallelize(Array(1,2,3,4))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[32] at parallelize at <console>:25
scala> rdd.saveAsObjectFile("./output/object")
scala> sc.objectFile[(Int)]("/opt/module/spark/spark-local/output/object").collect //也要加泛型
res25: Array[Int] = Array(4, 2, 3, 1)

檔案系統類資料讀取與儲存

①HDFS

Spark的整個生態系統與Hadoop是完全相容的,所以對於Hadoop所支援的檔案型別或者資料庫型別,Spark也同樣支援.另外,由於Hadoop的API有新舊兩個版本,所以Spark為了能夠相容Hadoop所有的版本,也提供了兩套建立操作介面.對於外部儲存建立操作而言,hadoopRDD和newHadoopRDD是最為抽象的兩個函式介面,主要包含以下四個引數.

1)輸入格式(InputFormat): 制定資料輸入的型別,如TextInputFormat等,新舊兩個版本所引用的版本分別是org.apache.hadoop.mapred.InputFormat和org.apache.hadoop.mapreduce.InputFormat(NewInputFormat)

2)鍵型別: 指定[K,V]鍵值對中K的型別

3)值型別: 指定[K,V]鍵值對中V的型別

4)分割槽值: 指定由外部儲存生成的RDD的partition數量的最小值,如果沒有指定,系統會使用預設值defaultMinSplits。

② MySQL

支援通過Java JDBC訪問關係型資料庫。需要通過JdbcRDD進行,

從Mysql讀取資料

複製程式碼

    //初始化
    val conf = new SparkConf().setAppName("WorldCount").setMaster("local[*]")
    val sc = new SparkContext(conf)

    //定義連線mysql的引數
    val driver = "com.mysql.jdbc.Driver"
    val url = "jdbc:mysql://hadoop101:3306/rdd"
    val userName = "root"
    val password = "123456"
    //讀取 //建立JdbcRDD
    val rdd: JdbcRDD[(Int, String)] = new JdbcRDD(sc, () => {
      Class.forName(driver)
      DriverManager.getConnection(url, userName, password)},
      "select * from test where id >= ? and id <= ?;",
      1, 4, 2,
      r => (r.getInt(1), r.getString(2))
    )
    println(rdd.count())
    rdd.foreach(println(_))

    sc.stop()

複製程式碼

從rdd寫入mysql

複製程式碼

   //rdd資料輸出到mysql
    //寫入資料,foreachPartition是每個分割槽建立一個連線
    val rdd: RDD[(Int, String)] = sc.makeRDD(List((5, "Amazon")))
    rdd.foreachPartition(x => {
      Class.forName(driver)
      val conn: Connection = DriverManager.getConnection(url, userName, password)
      x.foreach(x => {
        val id: Int = x._1
        val name: String = x._2
        val statement: PreparedStatement = conn.prepareStatement("insert into test (id, name) values(?, ?)")
        statement.setInt(1, id)
        statement.setString(2, name)
        statement.execute()
      })
    } )
    sc.stop()

複製程式碼

③ Hbase

從HBase讀取資料

由於 org.apache.hadoop.hbase.mapreduce.TableInputFormat類的實現,Spark 可以通過Hadoop輸入格式訪問HBase。這個輸入格式會返回鍵值對資料,其中鍵的型別為org. apache.hadoop.hbase.io.ImmutableBytesWritable,而值的型別為org.apache.hadoop.hbase.client.

Result。

複製程式碼

    //初始化sc
    val conf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)

    //從hbase表讀取資料
    val configuration: Configuration = HBaseConfiguration.create()
    configuration.set("hbase.zookeeper.quorum", "hadoop101,hadoop102,hadoop103")
    configuration.set(TableInputFormat.INPUT_TABLE, "fruit")

    val rdd = sc.newAPIHadoopRDD(configuration, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
    rdd.foreach(x => {
      val cells: Array[Cell] = x._2.rawCells()
      cells.foreach(cell => {
        val rowkey: String = Bytes.toString(CellUtil.cloneRow(cell))
        val family: String = Bytes.toString(CellUtil.cloneFamily(cell))
        val column: String = Bytes.toString(CellUtil.cloneQualifier(cell))
        val value: String = Bytes.toString(CellUtil.cloneValue(cell))
        println(s"$rowkey  $family  $column  $value")
      })
    })
    sc.stop()

複製程式碼

往HBase寫入

複製程式碼

  //初始化sc
    val conf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)
    //rdd資料寫入到hbase表
    val rdd: RDD[(String, String, String, String)] = sc.makeRDD(List(("1004", "info", "name", "pineapple")))
    val rdd2 = rdd.map(x => {
      val put: Put = new Put(Bytes.toBytes(x._1))
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(x._4))
      (new ImmutableBytesWritable(), put)
    })

    //建立配置
    val configuration: Configuration = HBaseConfiguration.create()
    configuration.set("hbase.zookeeper.quorum", "hadoop101,hadoop102,hadoop103")
    configuration.set(TableOutputFormat.OUTPUT_TABLE, "fruit")
    //設定OutputFormat型別
    val job: Job = Job.getInstance(configuration)
    job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
    job.setOutputKeyClass(classOf[ImmutableBytesWritable])
    job.setOutputValueClass(classOf[Result])

    rdd2.saveAsNewAPIHadoopDataset(job.getConfiguration)

    sc.stop()

複製程式碼

6. 累加器

程式碼程式都在Driver,序列化傳到Executor節點上去執行;

複製程式碼

    val rdd = sc.makeRDD(1 to 4) //建立RDD
    var a = 0
    rdd.foreach(x => {
      a += 1
    })
    println(a)  //列印的a是driver端的,而不是executor端的;
執行,輸出的卻是0; 程式碼在Driver端,具體執行是在Executor,executor中會有副本a = 0,每個節點的executor都各自有各自的副本,在自己節點上修改

複製程式碼

累加器用來對資訊進行聚合,通常在向 Spark傳遞函式時,比如使用 map() 函式或者用 filter() 傳條件時,可以使用驅動器程式中定義的變數,但是叢集中執行的每個任務都會得到這些變數的一份新的副本,更新這些副本的值也不會影響驅動器中的對應變數。如果我們想實現所有分片處理時更新共享變數的功能,那麼累加器可以實現我們想要的效果。

複製程式碼

   //初始化sc
    val conf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)

    val rdd = sc.makeRDD(1 to 4) //建立RDD
    val acc: LongAccumulator = sc.longAccumulator //它有一個初始值
      /* 原始碼:   class LongAccumulator extends AccumulatorV2[jl.Long, jl.Long] {
            private var _sum = 0L
            private var _count = 0L*/
    println("初始值: "+ acc.value) //0
    rdd.foreach(x => {
      acc.add(1)
    })
    println(acc.value) //4

複製程式碼

自定義累加器

自定義累加器型別的功能在1.X版本中就已經提供了,但是使用起來比較麻煩,在2.0版本後,累加器的易用性有了較大的改進,而且官方還提供了一個新的抽象類:AccumulatorV2來提供更加友好的自定義型別累加器的實現方式。實現自定義型別累加器需要繼承AccumulatorV2並覆寫要求的方法。

copy每個節點都要copy Driver端的;每個節點再對它進行重置reset;add在自己各自節點操作;merge是其他Executor節點中的和Driver端的進行合併;

複製程式碼

//自定義一個類:
class MyAcc1 extends AccumulatorV2[Int, Int]{
  private var init = 0

  //判斷是否為空
  override def isZero: Boolean = init == 0
  //複製
  override def copy(): AccumulatorV2[Int, Int] = {
    val acc: MyAcc1 = new MyAcc1
    acc.init = this.init
    acc
  }

  override def reset(): Unit = { //重置
    init = 0
  }

  override def add(v: Int): Unit = { //累加
    init += v
  }

  override def merge(other: AccumulatorV2[Int, Int]): Unit = { //合併
    init += other.value
  }

  override def value: Int = init //返回值
}

複製程式碼

呼叫自定義累加器

複製程式碼

object TestAcc {
  def main(args: Array[String]): Unit = {
    //初始化sc
    val conf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)

    val rdd: RDD[Range.Inclusive] = sc.makeRDD(1 to 4) //建立RDD
    val acc: MyAcc1 = new MyAcc1 //建立自定義累加器物件
    
    //註冊累加器, 在Driver中sc
    sc.register(acc, "MyAcc1")

    rdd.foreach(x => { //在行動運算元中對累加器的值進行修改
      acc.add(1)
      println(x) //2 1 4 3
    })
    println("累加器:" + acc.value) //列印累加器的值 累加器:4
    sc.stop() //關閉SparkContext

  }
}

複製程式碼

廣播變數(調優策略-不用它也可以實現功能,作為調優使用)

* Broadcast a read-only variable to the cluster, returning a
* [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions.
* The variable will be sent to each cluster only once.

廣播變數用來高效分發較大的物件。向所有工作節點發送一個較大的只讀值,以供一個或多個Spark操作使用。比如,如果你的應用需要向所有節點發送一個較大的只讀查詢表,甚至是機器學習演算法中的一個很大的特徵向量,廣播變數用起來都很順手。 在多個並行操作中使用同一個變數,但是 Spark會為每個任務分別傳送。

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))

broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(35)

scala> broadcastVar.value

res33: Array[Int] = Array(1, 2, 3)

使用廣播變數的過程如下:

(1) 通過對一個型別T的物件呼叫SparkContext.broadcast創建出一個Broadcast[T]物件。任何可序列化的型別都可以這麼實現。

(2) 通過value屬性訪問該物件的值(在Java中為value()方法)。

(3) 變數只會被髮到各個節點一次,應作為只讀值處理(修改這個值不會影響到別的節點)。

複製程式碼

object TestBroadcast {
  def main(args: Array[String]): Unit = {
    //初始化sc
    val conf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("ERROR")

    val rdd: RDD[String] = sc.makeRDD(List("kris", "alex", "smile"))
    val temp = "ris"   //
    sc.broadcast(temp) //廣播變數是Driver給每個Executor發一份,而不是每個Task(如果不是廣播變數就會給每個task傳送),每個task共享;減小資料傳輸量 ;

  /**
      * 累加器和廣播變數的區別:
      * 都是共享變數
      * 累加器只能寫
      * 廣播變數只能讀
      */
    val result = rdd.filter(x => {
      x.contains(temp)
    })
    result.foreach(println(_)) //kris


  }
}

複製程式碼

分類:Spark