1. 程式人生 > >Spark入門基礎教程

Spark入門基礎教程

from: http://www.linuxidc.com/Linux/2016-03/129506.htm

背景

  目前按照大資料處理型別來分大致可以分為:批量資料處理、互動式資料查詢、實時資料流處理,這三種資料處理方式對應的業務場景也都不一樣;
  關注大資料處理的應該都知道Hadoop,而Hadoop的核心為HDFSMapReduce,HDFS分散式檔案系統在Hadop中是用來儲存資料的;MapReduce為Hadoop處理資料的核心,接觸過函數語言程式設計的都知道函式式語言中也存在著Map、Reduce函式其實這兩者的思想是一致的;也正是因為Hadoop資料處理核心為MapReduce奠定了它註定不是適用場景廣泛的大資料框架;
  可以這麼說Hadoop適用於Map、Reduce存在的任何場景,具體場景比如:WordCount、排序、PageRank、使用者行為分析、資料統計等,而這些場景都算是批量資料處理,而Hadoop並不適用於互動式資料查詢、實時資料流處理;
  這時候就出現了各種資料處理模型下的專用框架如:Storm、Impala、GraphLab等;
  1、Storm:

針對實時資料流處理的分散式框架;
  2、Impala:適用於互動式大資料查詢的分散式框架;
  3、GraphLab:基於圖模型的機器學習框架;

MapReduce簡單模型
            1、MapReduce簡單模型
  
  這時候如果一個團隊或一個公司中同時都有設計到大資料批量處理、互動式查詢、實時資料流處理這三個場景;這時候就會有一些問題:
  1、學習成本很高,每個框架都是不同的實現語言、不同的團隊開發的;
  2、各個場景組合起來代價必然會很大;
  3、各個框架中共享的中間資料共享與移動成本高;
  

Spark

  就在這時候UC Berkeley AMP推出了全新的大資料處理框架:Spark提供了全面、統一適用與不同場景的大資料處理需求(批量資料處理、互動式資料查詢、實時資料流處理、機器學習);Spark不僅效能遠勝於Hadoop而卻還相容Hadoop生態系統,Spark可以執行在Hadoop HDFS之上提供爭強 功能,可以說Spark

替代了Hadoop MapReduce,但Spark依然相容Hadoop中的YARN與Apache Mesos元件,現有Hadoop使用者可以很容易就遷移到Spark;
  Spark提出了RDD(Resilient Distributed Datasets)這麼一個全新的概念,RDD彈性分散式資料集是並行、容錯的分散式資料結構;RDD可以持久化到硬碟或記憶體當中,為一個分割槽的資料集,分割槽的多少決定了平行計算的粒度;並且提供了一系列的操作RDD中的資料:
  1、建立操作(Creation Operation):RDD由SparkContext通過記憶體資料或外部檔案系統建立;
  2、轉換操作(Transformation Operation):將RDD通過轉換操作變為另一個RDD,Spark提供了map、flatMap、filter等一系列的轉換操作;
  3、控制操作(Control Operation):將RDD持久化到記憶體或硬碟當中,如cache將filterRDD快取到記憶體;
  4、行動操作:(Action Operation):Spark採用了惰性計算,對於任何行動操作都會產生Spark Job執行產生最終結果;提供了join、groupBy、count等操作,Spark中存在兩種操作產生的結果為Scala集合或者標量與RDD儲存到檔案或資料庫;

Spark結構
             1、Spark結構圖

  Spark RDD:Spark RDD提供了一系列的操作介面,為不變的資料儲存結構並存儲與記憶體中使用DAG進行任務規劃使更好的處理MapReduce類似的批處理;
  Shark/Spark SQL:分散式SQL引擎,相容Hive效能遠比Hive高很多;
  Spark Streaming:將資料流分解為一系列批處理作業使用Spark排程框架更好的支援資料流操作,支援的資料輸入源有:Kafka、Flume等;
  GraphX:相容Pregel、GraphLab介面為基於Spark的圖計算框架;
  MLlib:為Spark的機器學習演算法庫,支援常用的演算法有:分類演算法、推薦演算法、聚類演算法等等;

  效能卓越、支援多種大資料處理模型、支援多種程式語言介面:Java、Scala、Python,許多大公司如IBM等大力支援推廣Spark的發展;

Spark執行模式與Standalone模式部署

前面簡單的介紹了Spark的一些概念還有Spark生態圈的一些情況,這裡主要是介紹Spark執行模式與Spark Standalone模式的部署;

Spark執行模式

  在Spark中存在著多種執行模式,可使用本地模式執行、可使用偽分散式模式執行、使用分散式模式也存在多種模式如:Spark Mesos模式、Spark YARN模式;

Spark Mesos模式:官方推薦模式,通用叢集管理,有兩種排程模式:粗粒度模式(Coarse-grained Mode)與細粒度模式(Fine-grained Mode);
Spark YARN模式:Hadoop YARN資源管理模式;
Standalone模式: 簡單模式或稱獨立模式,可以單獨部署到一個叢集中,無依賴任何其他資源管理系統。不使用其他排程工具時會存在單點故障,使用Zookeeper等可以解決;
Local模式:本地模式,可以啟動本地一個執行緒來執行job,可以啟動N個執行緒或者使用系統所有核執行job;

Standalone模式部署實踐

  Standalone模式需要將Spark複製到叢集中的每個節點,然後分別啟動每個節點即可;Spark Standalone模式的叢集由Master與Worker節點組成,程式通過與Master節點互動申請資源,Worker節點啟動Executor執行;
  這裡使用了兩節點部署Spark叢集:192.168.2.131、192.168.2.133,下面簡稱為:133與131節點;其中133節點既是Master節點同時又是Worker節點,131節點為Worker節點;

結構圖
節點結構圖

部署步驟:
  一、
首先在133節點上下載Java、Scala與Spark並解壓到/usr/local目錄下,這裡使用的Spark是帶有Hadoop的版本
下載
  下載解壓到local

  二、配置Java、Scala與Spark環境變數,這裡把環境變數配置到/etc/profile檔案中,請忽略Hadoop環境變數;
環境變數
  環境變數配置

  三、測試Java、Scala是否配置成功,在終端輸入:java -version與scala -version

  四、配置Spark環境變數,進入Spark目錄下的conf目錄把slaves.template重新命名為slaves,接著把spark-env.sh.template重新命名為:spark-env.sh;
重新命名
    重新命名
    修改spark-env.sh檔案,新增環境變數;
修改env
    spark-env修改
  五、
在133節點使用scp把下載好的Java、Scala、Spark傳送到131節點,並在131節點上重複以上所有步驟;
  六、在兩個節點都完成以上所有步驟後開始啟動Spark,133節點既是Master又是Worker;
    1、首先在133啟動Spark,進入Spark目錄的sbin目錄執行./start-all.sh:
master啟動
    Master啟動
    使用jps命令發現存在Master與Worker程序,說明Spark已啟動成功;

    2、啟動131節點的Spark,進入Spark目錄的sbin目錄執行:./start-slave.sh spark://192.168.2.133:7077
    start-slave.sh後面的地址為Master節點的通訊地址,指定當前slave節點連線到的Master;
slave啟動
    slave啟動:
    使用jps命令,存在Worker程序則說明當前的Spark Worker節點啟動成功;
  七、 Spark Web頁面
    可以通過http://192.168.2.133:8080/ 地址檢視到當前Spark叢集的資訊,這地址為Master節點的地址;
SparkWeb
SparkWeb:

Spark中最核心的概念為RDD(Resilient Distributed DataSets)中文為:彈性分散式資料集,RDD為對分散式記憶體物件的 抽象它表示一個被分割槽不可變且能並行操作的資料集;RDD為可序列化的、可快取到記憶體對RDD進行操作過後還可以存到記憶體中,下次操作直接把記憶體中RDD作為輸入,避免了Hadoop MapReduce的大IO操作;

RDD生成

  Spark所要處理的任何資料都是儲存在RDD之中,目前兩種方式可以生成一個RDD:
  1、從RDD進行轉換操作
  2、使用外部儲存系統建立,如:HDFS;

RDD操作

  RDD支援兩種操作:
    轉換(transformation operation)
    轉換操作將一個RDD經過操作後返回一個全新的RDD,轉換操是lazy(惰性)的這期間不會產生任何資料的計算;
    轉換函式有:distinct、filter、map、flatMap、union、groupByKey等;
    行動(action operation)
    每一個行動操作都會觸發Spark Job進行計算並返回最終的結果,行動操作有這麼幾類:返回標量,count返回元素的個數;返回Scala集合,task(n)返回0到n-1組成的集合;寫入外部儲存,saveAsHadoopFile(path)儲存到HDFS;
    行動函式有:count、top、task、saveAsHadoopFile等;
  RDD為不可變的資料集,可以使用轉換操作“修改”一個RDD,但這操作過後返回的是一個全新的RDD 原本RDD並沒有改變;

轉換圖
          RDD狀態轉換圖

Lineage

  Spark RDD只支援粗粒度的操作,對一個RDD的操作都會被作用於該RDD的所有資料;為了保證RDD的高可用性RDD通過使用Lineage(血統)記錄了RDD演變流程(從其他RDD到當前RDD所做的操作) 當RDD分割槽資料丟失時可以通過Lineage的資訊重新計算與恢復分割槽資料,或進行RDD的重建;
  RDD的依賴關係(dependencies)
  由於對RDD的操作都是粗粒度的一個轉換操作過後都會產生一個新的RDD,RDD之間會形成一個前後依賴關係;Spark中存在兩種依賴:窄依賴(Narrow Dependencies)、寬依賴(Wide Dependencies);
  窄依賴(Narrow Dependencies):一個父RDD的分割槽只能被一個子RDD的一個分割槽使用;
  寬依賴(Wide Dependencies):多個子RDD的分割槽依賴於一個父RDD的同一個分割槽;
  窄依賴的節點(RDD)關係如果流水一般,所以當節點失敗後只需重新計算父節點的分割槽即可,寬依賴需要重新計算父節點的多個分割槽代價是非常昂貴的;

窄依賴
          窄依賴Narrow

寬依賴
          寬依賴Wide

編譯打包

  Spark支援Maven與SBT兩種編譯工具,這裡使用了Maven進行編譯打包;
  在執行make-distribution指令碼時它會檢查本地是否已經存在Maven還有當前Spark所依賴的Scala版本,如果不存在它會自動幫你下載到build目錄中並解壓使用;Maven源最好配置成OSChina的中央庫,這下載依賴包比較快;
  耐心等待,我編譯過多次所以沒有下載依賴包,大概半個小時左右編譯完成;注意:如果使用的是Java 1.8需要給JVM配置堆與非堆記憶體,如:export MAVEN_OPTS="-Xmx1.5g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m";

  進入Spark根目錄下,執行:

./make-distribution.sh --tgz

 --tgz 引數是指編譯後生成tgz包  
 - PHadoop 支援Hadoop
 -Pyarn :支援yarn
 -Phive :支援hive
 --with-tachyon:支援tachyon記憶體檔案系統
 -name:與--tgz一起用時,name代替Hadoop版本號

 ./make-distribution.sh --tgz --name 2.6.0 -Pyarn -Phadoop-2.6 -Phive 

   開始編譯檢查本地環境,如不存在合適的Scala與Maven就在後臺下載;

開始編譯,檢查本地環境中

編譯中:
編譯中

編譯完成並打包生成tgz:

編譯完成
  

編譯完成後把生成的檔案拷貝到當前Spark的dist目錄中並且打包生成spark-1.5.3-SNAPSHOT-bin-2.2.0.tgz檔案;

Spark目錄

Spark——共享變數

Spark執行不少操作時都依賴於閉包函式的呼叫,此時如果閉包函式使用到了外部變數驅動程式在使用行動操作時傳遞到叢集中各worker節點任務時就會進行一系列操作:
  1、驅動程式使將閉包中使用變數封裝成物件,驅動程式序列化物件,傳給worker節點任務;
  2、worker節點任務接收到物件,執行閉包函式;
由於使用外部變數勢必會通過網路、序列化、反序列化,如外部變數過大或過多使用外部變數將會影響Spark程式的效能;
  Spark提供了兩種型別的共享變數(Shared Variables):廣播變數(Broadcast Variables)、累加器(Accumulators );
  
廣播變數(Broadcast Variables)
  Spark提供的廣播變數可以解決閉包函式引用外部大變數引起的效能問題;廣播變數將只讀變數快取在每個worker節點中,Spark使用了高效廣播演算法分發變數從而提高通訊效能;如直接在閉包函式中使用外部 變數該變數會快取在每個任務(jobTask)中如果多個任務同時使用了一個大變數勢必會影響到程式效能;
  廣播變數:每個worker節點中快取一個副本,通過高效廣播演算法提高傳輸效率,廣播變數是隻讀的;
  Spark Scala Api與Java Api預設使用了Jdk自帶序列化庫,通過使用第三方或使用自定義的序列化庫還可以進一步提高廣播變數的效能;

廣播變數使用示例:

val sc = SparkContext("");
val eigenValue = sc.bradcast(loadEigenValue())
val eigen = computer.map{x => 
    val temp = eigenValue.value
    ...
    ...
}

廣播變數

      左節點不使用廣播變數,右使用廣播變數
累加器(Accumulators)

  累加器可以使得worker節點中指定的值聚合到驅動程式中,如統計Spark程式執行過程中的事件總數等;

val sc = new SparkContext(...)
val file = sc.textFile("xxx.txt")
val eventCount = sc.accumulator(0,"EventAccumulator")  //累加器初始值為0

val formatEvent = file.flatMap(line => {
     if(line.contains("error")){
         eventCount +=1
     }
    })
formatEvent.saveAsTextFile("eventData.txt")
println("error event count : " + eventCount);

  在使用累加器(Accumulators)時需要注意,只有在行動操作中才會觸發累加器,也就是說上述程式碼中由於flatMap()為轉換操作因為Spark惰性特徵所以只用當saveAsTextFile() 執行時累加器才會被觸發;累加器只有在驅動程式中才可訪問,worker節點中的任務不可訪問累加器中的值;
  Spark原生支援了數字型別的的累加器如:Int、Double、Long、Float等;此外Spark還支援自定義累加器使用者可以通過繼承AccumulableParam特徵來實現自定義的累加器此外Spark還提供了accumulableCollection()累加集合用於;建立累加器時可以使用名字也可以不是用名字,當使用了名字時在Spark UI中可看到當中程式中定義的累加器, 廣播變數儲存級別為MEMORY_AND_DISK;

Spark作業排程階段分析

Spark作為分散式的大資料處理框架必然或涉及到大量的作業排程,如果能夠理解Spark中的排程對我們編寫或優化Spark程式都是有很大幫助的;
  在Spark中存在轉換操作(Transformation Operation)行動操作(Action Operation)兩種;而轉換操作只是會從一個RDD中生成另一個RDD且是lazy的,Spark中只有行動操作(Action Operation)才會觸發作業的提交,從而引發作業排程;在一個計算任務中可能會多次呼叫 轉換操作這些操作生成的RDD可能存在著依賴關係,而由於轉換都是lazy所以當行動操作(Action Operation )觸發時才會有真正的RDD生成,這一系列的RDD中就存在著依賴關係形成一個DAG(Directed Acyclc Graph),在Spark中DAGScheuler是基於DAG的頂層排程模組;

相關名詞

  Application:使用Spark編寫的應用程式,通常需要提交一個或多個作業;
  Job:在觸發RDD Action操作時產生的計算作業
  Task:一個分割槽資料集中最小處理單元也就是真正執行作業的地方
  TaskSet:由多個Task所組成沒有Shuffle依賴關係的任務集
  Stage:一個任務集對應的排程階段 ,每個Job會被拆分成諾幹個Stage

    作業排程關係圖
          1.1 作業排程關係圖

RDD Action作業提交流程

  這裡根據Spark原始碼跟蹤觸發Action操作時觸發的Job提交流程,Count()是RDD中的一個Action操作所以呼叫Count時會觸發Job提交;
  在RDD原始碼count()呼叫SparkContext的runJob,在runJob方法中根據partitions(分割槽)大小建立Arrays存放返回結果;

RDD.scala

/**
* Return the number of elements in the RDD.
*/
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

SparkContext.scala

def runJob[T, U: ClassTag](
  rdd: RDD[T],
  func: (TaskContext, Iterator[T]) => U,
  partitions: Seq[Int],
  resultHandler: (Int, U) => Unit): Unit = {

  val callSite = getCallSite
  val cleanedFunc = clean(func)
  logInfo("Starting job: " + callSite.shortForm)
  if (conf.getBoolean("spark.logLineage", false)) {
    logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
  }
  dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
}

  在SparkContext中將呼叫DAGScheduler的runJob方法提交作業,DAGScheduler主要任務是計算作業與任務依賴關係,處理呼叫邏輯;DAGScheduler提供了submitJob與runJob方法用於 提交作業,runJob方法會一直等待作業完成,submitJob則返回JobWaiter物件可以用於判斷作業執行結果;
  在runJob方法中將呼叫submitJob,在submitJob中把提交操作放入到事件迴圈佇列(DAGSchedulerEventProcessLoop)中;

def submitJob[T, U](
  rdd: RDD[T],
  func: (TaskContext, Iterator[T]) => U,
  partitions: Seq[Int],
  callSite: CallSite,
  resultHandler: (Int, U) => Unit,
  properties: Properties): JobWaiter[U] = {
      ......  
      eventProcessLoop.post(JobSubmitted(
      jobId, rdd, func2, partitions.toArray, callSite, waiter,
      SerializationUtils.clone(properties)))
      ......
  }  

  在事件迴圈佇列中將呼叫eventprocessLoop的onReceive方法;

Stage拆分

  提交作業時DAGScheduler會從RDD依賴鏈尾部開始,遍歷整個依賴鏈劃分排程階段;劃分階段以ShuffleDependency為依據,當沒有ShuffleDependency時整個Job 只會有一個Stage;在事件迴圈佇列中將會呼叫DAGScheduler的handleJobSubmitted方法,此方法會拆分Stage、提交Stage;

 private[scheduler] def handleJobSubmitted(jobId: Int,
  finalRDD: RDD[_],
  func: (TaskContext, Iterator[_]) => _,
  partitions: Array[Int],
  callSite: CallSite,
  listener: JobListener,
  properties: Properties) {
var finalStage: ResultStage = null
......
  finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)
......

val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
......
val jobSubmissionTime = clock.getTimeMillis()
jobIdToActiveJob(jobId) = job
activeJobs += job
finalStage.setActiveJob(job)
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
listenerBus.post(
  SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
submitStage(finalStage)

submitWaitingStages()
}

排程階段提交

  在提交Stage時會先呼叫getMissingParentStages獲取父階段Stage,迭代該階段所依賴的父排程階段如果存在則先提交該父階段的Stage 當不存在父Stage或父Stage執行完成時會對當前Stage進行提交;

 private def submitStage(stage: Stage) {
  val jobId = activeJobForStage(stage)
  if (jobId.isDefined) {
    if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
      val missing = getMissingParentStages(stage).sortBy(_.id)
      if (missing.isEmpty) {
        submitMissingTasks(stage, jobId.get)
      } else {
        for (parent <- missing) {
          submitStage(parent)
        }
        waitingStages += stage
      }
    }
  }
  ......
}

Scala 的詳細介紹請點這裡
Scala 的下載地址請點這裡