1. 程式人生 > >spark程式碼原始碼解析

spark程式碼原始碼解析

first Codec

public class Friend {
	public static void main(String[] args){
		System.out.println("BigData:"+"--"+"947967114");
	}
}

SPARK原始碼解析 首先看一段使用scala編寫的spark程式碼: package spark0719

import org.apache.spark.SparkConf import org.apache.spark.SparkContext

object Ttspark {

val conf =new SparkConf().setAppName(“sp”).setMaster(“local”)

def main(args: Array[String]): Unit = {

val sc=new SparkContext(conf)

val number=sc.parallelize(1 to 9)

def mapDoubleFunc(num:Int):(Int,Int)={ (num,num*2) }

val mapResult=number.map(mapDoubleFunc)

mapResult.collect//foreach(println)/savaAsTextFile(“file:///”)

		  //sc.testFile("file:///")

} } 第一行(package spark0719)最開始是程式的包名package。

第二、三行(import org.apache.spark.SparkConf) 是使用import引入程式包。

第四行(object Ttspark )開始是有一個object開始的,這個是伴生物件,spark或者說scala引入伴生物件的原始是需要靜態的程式碼塊,因為scala裡不存在static的概念,但是scala的底層又是用java實現的,java記憶體在著大量的static部分程式,最具代表的就是main函式,在scala中也是使用main函式執行的,這就造成了一個矛盾,java的main是static,scala又沒有static代表main不能執行,為了解決這個矛盾,scala引入了伴生物件的概念,所有的static的東西都放在object(伴生物件)中實現。

第五行( val conf =new SparkConf().setAppName(“sp”).setMaster(“local”) )定義一個常量conf,重點是SparkConf,SparkConf是用於設定Spark的配置屬性

由第一幅圖可以看到getSystemProperties獲取的是配置檔案,由第二圖可以看到,sparkConf裡設定master節點、appName、jar包等。所以程式中sparkConf後面直接呼叫了setMaster和setAppname函式進行兩個引數的設定,第二個的local表示本地執行。

第五行(def main(args: Array[String]): Unit =)表示主執行函式開始,main括號內表示args是一個Array型別,Array中是String型別,返回值是Unit(無返回型別相當於java的void),

第六行(val sc=new SparkContext(conf)),定義一個sc接受例項化的SparkContext。 sparkContext在Spark應用程式的執行過程中起著主導作用,它負責與程式和spark叢集進行互動,包括申請叢集資源、建立RDD、accumulators及廣播變數等。sparkContext與叢集資源管理器、work節點互動圖如下:

官網對圖下面幾點說明: (1)不同的Spark應用程式對應不同的Executor,這些Executor在整個應用程式執行期間都存在並且Executor中可以採用多執行緒的方式執行Task。這樣做的好處是,各個Spark應用程式的執行是相互隔離的。除Spark應用程式向外部儲存系統寫資料進行資料互動這種方式外,各Spark應用程式間無法進行資料共享。 (2)Spark對於其使用的叢集資源管理器沒有感知能力,只要它能對Executor進行申請並通訊即可。這意味著不管使用哪種資源管理器,其執行流程都是不變的。這樣Spark可以與不同的資源管理器進行互動。 (3)Spark應用程式在整個執行過程中要與Executors進行來回通訊。 (4)Driver端負責Spark應用程式任務的排程,因此最好Driver應該靠近Worker節點。 1.原始碼鑑賞-綜述 在spark程式執行起來後,程式就會建立sparkContext,解析使用者的程式碼,當遇到action運算元時開始執行,但是在執行之前還有很多前提工作要在sparkContext中做的,請記住你要了解了sparkContext,你就瞭解了spark。

sparkContext構建的頂級三大核心:DAGScheduler,TaskScheduler,SchedulerBackend.

1.DAGScheduler是面向Job的Stage的高層排程器。 2.TaskScheduler是一個介面,是低層排程器,根據具體的ClusterManager的不同會有不同的實現。Standalone模式下具體實現的是TaskSchedulerlmpl。 3.SchedulerBackend是一個介面,根據具體的ClusterManger的不同會有不同的實現,Standalone模式下具體的實現是SparkDeloySchedulerBackend。 從整個程式執行的角度來講,sparkContext包含四大核心物件:DAGScheduler,TaskScheduler,SchedulerBackend,MapOutputTrackerMaster。 SparkDeploySchedulerBackend有三大核心功能: 1.負責接收Master接受註冊當前程式RegisterWithMaster。 2.接受叢集中為當前應用程式而分配的計算資源Executor的註冊並管理Executor。 3.負責傳送Task到具體的Executor執行。 4.SparkDeploySchedulerBackend是被TaskSchedulerlmpl管理的。 sparkContext變數初始化 建立sparkContext的時候會做很多初始化事情,初始化很多變數。

事件監控匯流排:

第一個重要的初始化來了:這個地方是建立sparkEnv,就是建立actor,根據判斷建立dirver-actor

sparkContext的三大核心:這個只是一個定義getter和setter的方法,scala和java是有區別的,可以看看語法。但請時刻記住這三個核心。

從try開始了真正意義上的初始化操作了:396行。

_conf = config.clone():呼叫clone函式進行conf的複製 _conf.validateSettings():檢查一些關鍵配置和是否存在,一些預設配置如果不存在,新增預設設定引數。 _conf.set(“spark.executor.id”, SparkContext.DRIVER_IDENTIFIER):請注意這個,其實在spark眼裡沒有driver的概念,都是Executor,只是id標籤標記為了driver而已。

下面是三大核心的建立:

建立createTaskScheduler:根據master的執行情況建立:

這個地方用到了正則匹配來判斷master的模式,我們以standalone的模式來講解:

根據模式匹配:TaskSchedulerImpl 建立,注意叢集模式預設重試4次,本地模式只嘗試1次(val backend = new LocalBackend(sc.getConf, scheduler, 1))。

可以自己觀察一下其他模式的建立情況,但是會發現TaskSchedulerlmpl基本上是一樣。 val scheduler = new TaskSchedulerImpl(sc):主要的是初始化一些變數。 scheduler.initialize(backend):建立資源配置池和資源排程演算法,同時通過SchdulableBuilder.addTaskSetmanager:SchdulableBuilder會確定TaskSetManager的排程順序,然後按照TaskSetManager來確定每個Task具體執行在哪個ExecutorBackend中。

建立_dagScheduler = new DAGScheduler(this)

啟動taskScheduler

第六行(val number=sc.parallelize(1 to 9))表示在本地載入9個數,還有另一種載入方式,使用外部資料 sc.textFile(“file:///”).下圖為parallelize的原始碼

1、我們看到[T:ClassTag]可能非常奇怪是什麼意思? ClassTag 原始碼中的解釋,在泛型中,type T是被擦除的。這個是非常有用的,在我們構建陣列的時候,但是陣列的型別我們也不知道,編譯的時候不知道,但是執行的時候要知道,ClassTag只包含實際執行時的類的型別。 ClassTag會幫我們儲存T的資訊,根據型別推到可以指定T是I什麼型別,這時候ClassTag就可以把此型別資訊傳遞給編譯器。ClassTag執行時指定在編譯的時候無法指定的型別資訊。 不需要寫這樣寫,implicit m : Manifest[T]

2、緊接著小括號內是兩個引數,一個是Seq是一個集合,代表任何型別任何資料的集合,第二個引數是輸入:(seq: Seq[T],numSlices: Int = defaultParallelism)seq是一個序列,numSlices有一個預設值defaultParallelism,它的大小是有task決定,task在預設情況下又是core決定,RDD[T]代表返回型別是任何型別的RDD(即在此刻構建RDD),後面的withScope是一個函式,它是用來做DAG視覺化的(DAG visualization on SparkUI),以前的sparkUI中只有stage的執行情況,也就是說我們不可以看到上個RDD到下個RDD的具體資訊。於是為了在sparkUI中能展示更多的資訊。所以把所有建立的RDD的方法都包裹起來,同時用RDDOperationScope 記錄 RDD 的操作歷史和關聯,就能達成目標。 3、後面的assertNotStopped()是用來檢查context時候存在的,由下圖可以看出首先獲取activeContext,檢查是否為空,不空獲取資訊,如果獲取的是一個挺屍的SparkContext丟擲異常。

4、最後一句是關鍵:new ParallelCollectionRDD[T](this, seq, numSlices, MapInt, Seq[String]),如下是ParallelCollectionRDD原始碼

ParallelCollectionRDD這個類重寫了RDD內部原始的三個方法。重點部分是getPartition這個函式。用於獲取資料的分片資訊。以上解釋我們知道該函式是用於建立RDD,獲取資料,獲取sparkcontext資訊,獲取分片等重要資訊,可以說該函式是程式的開始。

第七、八、九行是自己寫的一個函式 def mapDoubleFunc(num:Int):(Int,Int)={ (num,num*2) } 該函式一個引數是Int型別,返回值是兩個都是Int型別,大括號內是函式體,表示返回引數本身和引數成2

第十行(val mapResult=number.map(mapDoubleFunc))表示把載入好的1到9交給map處理,map是一個RDD的運算元,裡面傳了自定義的函式作為引數,實現所有的數輸出的結果是本身和本身乘2

第十一行(mapResult.foreach(println))表示把map的結果使用foreach進行迴圈,把所有值都逐一打印出來。

對於map和foreach這類的RDD後續會重點介紹。