1. 程式人生 > >Spark核心概念理解

Spark核心概念理解

固定 tex 入口 HA 持久 其它 ota amd 適合

本文主要內容來自於《Hadoop權威指南》英文版中的Spark章節,能夠說是個人的翻譯版本號,涵蓋了基本的Spark概念。假設想獲得更好地閱讀體驗,能夠訪問這裏.

安裝Spark

首先從spark官網下載穩定的二進制分發版本號,註意與你安裝的Hadoop版本號相匹配:

技術分享圖片

wget http://archive.apache.org/dist/spark/spark-1.6.0/spark-1.6.0-bin-hadoop2.6.tgz

解壓:

tar xzf spark-x.y.z-bin-distro.tgz

為了方便執行,將bin文件夾增加到PATH中:

export SPARK_HOME=/home/spark/
export PATH=$PATH:$SPARK_HOME/bin

完畢。

簡單樣例

Spark提供了交互式的Spark-shell。這是入門的好起點。

spark-shell是基於Scala REPL的交互式工具。

啟動shell:

spark-shell

技術分享圖片

從輸出中我們能夠看到。shell創建了一個Scala變量,存放的是SparkContext的實例。

我們使用sc載入一個文本文件:

val lines = sc.textFile("input/ncdc/sample.txt"
)

技術分享圖片

lines變量引用的是一個RDD對象(Resilient Distributed Dataset)。RDD是Spark最核心的抽象。它是一個(通過分區。partitioned)分布在集群多臺機器上的僅僅讀對象集合。

在一個典型的Spark應用程序中,一個或者多個RDD被載入作為輸入。經過一系列的轉化(transformation)之後變成目標RDD集合。然後一個動作(action)作用於這些RDD上,比如計算結果或者保存到持久化介質中。

RDD中的resilient是指:當一個RDD分區(partition)丟失之後。Spark會自己主動從其原始的RDD又一次計算。載入RDD或者在RDD上調用transformation時,並沒有觸發真正的處理過程,Spark僅僅是創建執行的計劃。

僅僅有當action作用域RDD時,才會觸發真正的數據處理,比如執行foreach().

接著前面載入的數據,拿到lines之後。我們想要把每一行的字段進行切分:

val records = lines.map( _.split("\t"))

技術分享圖片

map方法將一個函數作用在RDD中的每一個元素上,這個樣例中,split把每一行(RDD[String])轉變成一個Scala的字符串數組(RDD[Array[String]])。

移除臟數據:

val filtered = records.filter( rec => (rec(1) !="9999" && rec(2).match("[01459]")))

技術分享圖片

技術分享圖片

filer針對RDD中的每一個元素執行一個predicate推斷,傳入的是一個返回Boolean類型的函數,Scala中數組的訪問訪問是通過()操作。

這裏主要是過濾到臟數據。

為了找到每一年的最高溫度。我們須要執行一個分組操作,Spark提供了reduceByKey的操作,可是僅僅能應用在key-value類型的RDD(使用Scala的Tuple2來表示)上面,因此須要先做一次轉換:

val tuples = filtered.map(rec => (rec(0).toInt,rec(1).toInt))

技術分享圖片

通過map操作來完畢。將字符串素組轉化為Int二元組,scala中調用方法時假設沒有參數能夠省略不寫括號。

轉換之後我們就能夠進行聚合操作:

val maxTemps = tuples.reduceByKey((a,b)=> Math.max(a,b))

技術分享圖片

reduceByKey接受一個函數。這個函數將一對值合並為一個值,然後不斷應用在key相應的全部值上。假設1950這個key相應的記錄有:

(1950,20) // 1
(1950,19) // 2
(1950,22) // 3

reduceByKey操作會把max函數應用到1和2身上,即執行max(20,19),得到20,然後再把20跟第三條記錄對照。得到終於的22. 我們把結果輸出:

maxTemps.foreach(println(_))

foreach是個action操作,針對RDD中的每一個元素應用println(_)這個函數。這時候才會觸發整個RDD鏈條執行計算,輸出結果到控制臺:

(1950,22)
(1949,111)

我們也能夠把計算結果保存到磁盤中:

maxTemps.saveAsTextFile("output")

查看輸出文件:

cat output/part-*

Spark Applications,JObs,Stages , and Tasks

Spark中也有一些核心的概念。相似於MapReduce,Spark也有作業(job)的概念,可是更為通用一些,作業由隨意的stage 有向無環圖(DAG)組成,stage有點相似於MapReduce中的map或reduce階段(phase)。

Spark執行時將Stage進一步被拆分為task,並在分布於集群上的RDD partitions並行執行。作業總是執行於Application的上下文中,這個上下文通過SparkContext來表示,用於組織相關的RDD和共享變量。一個Application能夠並行或串行執行多個Job。

Application提供了一種在同一個應用中共享數據集的機制,前面執行的作業能夠將數據集緩存。興許的作業能夠直接訪問這些緩存的RDD。這與MapReduce中每一個作業都須要從磁盤中讀取輸入數據是不同的。交互式的Spark會話如Spark-shell就是一個應用實例。

一個Scala應用程序

spark-shell提供了一種探索和學習Spark非常好的方式。可是實際中常常須要將業務邏輯作為一個自包括的、完整的應用打包在一起,能夠多次執行。以下是一個Scala應用的樣例:

import org.apache.spark.SparkContext._
import org.apche.spark.{SparkContext,SparkConf}

object MaxxTemperature{
  def main(agrs: Array[String]) {
    val conf = new SparkConf().setAppName("Max Temperature")
    val sc = new SparkContext(conf)

    sc.textFile(agrs[0]))
      .map(_.split("\t"))
      .filter(rec => (rec(1) != "9999" && rec(2).match("[01459]")))
      .map(rec => (rec(0).toInt , rec(1).toInt))
      .reduceByKey( (a,b) => Math.max(a,b))
      .saveAsTextFile(args(1))
  }
}

當作為一個獨立的應用時,我們須要自己創建SparkContext,由於沒有shell提供這個對象給我們。SparkConf用於配置應用的各個屬性,這裏我們僅僅設置了應用名稱。

Spark中的轉換(Transformation)大多數在RDD這個類中定義,可是這個樣例中的reduceByKey()其實是在PairRDDFunctions類中定義的,我們之所以不須要顯式轉換。是由於有以下的導入語句:

import org.apache.spark.SparkContext._

這個語句導入了Spark中各種隱式轉換的函數,這個導入在編寫應用的時候非常有用。

完畢這個簡單應用的編寫之後。我們使用spark-submit來執行程序:

spark-submit --class MaxTemperature --master local spark-examples.jar input/ncdc/sample.txt output/max-temp

spark-submit相似於Hadoop中的hadoop jar命令,class參數指定了執行Main入口,master指定執行模式,local模式下全部組件都執行在一個JVM中,spark-examples.jar包括編譯過的應用程序代碼,後面是輸入和輸出參數。

Java樣例

Spark使用基於JVM的語言Scala。能夠非常好地與Java集成。Spark提供的Java API中,RDD使用類JavaRDD封裝,JavaPairRDD用於特殊的key-value RDD。這兩個類都事先了JavaRDDLike接口。RDD的大部分操作方法都在這個接口中定義。

上述的邏輯使用Java來表達例如以下:

public class MaxTemperatureSpark {
  public static void main(String[] args) throws Exception {
  if (ages.length != 2){
    System.err.println("Usage: MaxTemperaturSpark <input path> <output path>");
    System.exit(-1);
  }

  SparkConf conf = new SparkConf();
  JavaSparkContext sc = new JavaSparkContext("local","Max TemperaturSpark",conf);
  JavaRDD<String> lines = sc.textFile(args[0]);
  JavaRDD<String[]> records = lines.map( new Function<String , String[]>(){
    @Override
    public String[] call(String s){
      return s.split("\t");
    }
  });

  JavaRDD<String[]> filtered = records.filter(new Function<String[],Boolean>(){
    @Override
    public Boolean call(String[] rec){
      return rec[1] != "9999" && rec[2].matches("[01459]");
    }
  });

  JavaPairRDD<Integer,Integer> tuples = filtered.mapToPair(
    new PairFunction<String[],Integer,Integer>(){
      @Override
      public Tuple2<Integer,Integer> call(String[] rec) {
        return new Tuple2<Integer,Integer> ( Integer.parseInt(rec[0]),Integer.parseInd(rec[1]));
      }
    } );

  JavaPairRDD<Integer,Integer> maxTemps = tuples.reduceByKey(
    new Function2<Integer,Integer,Integer>(){
      @Override
      public Integer call (Integer i1 , Integer i2){
        return Math.max(i1,i2);
      }
    }
  );

  maxTemps.saveAsTextFile(agrs[1]);

  }
}

能夠看到代碼非常冗長,Java在處理函數式的代碼的確非常不給力。

實現邏輯非常簡單,就是不斷對RDD做轉換,轉換的邏輯大多通過各階的函數來表示,比如Function。Function2。PairFunction等。另外靜態類型的特點也要求我們在定義每一個RDD都要指定其泛型類型。沒有了Scala中的隱式轉換。因此從String數組RDD到PairRDD也須要我們自己動手。

編譯這個類,然後使用spark-submit提交。格式與Scala版本號全然一樣。除了類名變了。

Python樣例

Spark也提供了Python的API,一般叫PySpark。通過使用Python中的lambda表達式。Python寫出來的代碼和Scala非常相似,比較緊湊。

from pyspark import SparkContext
import re ,sys

sc = SparkContext("local","Max Temperature")
sc.textFile(sys.argv[1])
  .map( lambda s: s.split("\t"))
  .filter(lambda rec : (rec[1]!="9999" and re.match("[01459]" ,rec[2])))
  .map(lambda rec: (int(rec[0]),int(rec[1])))
  .reduceByKey(max)
  .saveAsTextFile(sys.argv[2])

非常好,代碼非常緊湊,參數通過sys.argv傳入,下標從1開始而不是0. 轉換邏輯通過lambda來表達,正則匹配用了re模塊。reduce的時候用了python內置的max函數。

Python代碼執行時,Spark會fork出子線程來執行這些用戶代碼。在啟動程序(launcher program)和executor中都會fork子進程。

兩個進程之間使用socket通信,全部父進程能夠把RDD Partition的數據傳遞給Python代碼。

執行Python版本號時。我們制定的是Python文件而不是jar包:

spark-submit --master local src/python/MaxTemperature.py input/ncdn/sample.txt output

另外。使用pyspark命令也能夠啟動Python版的交互式REPL。

RDD

RDD是Spark應用中最核心的部分。接下來看一些RDD相關的內容。

創建RDD

有3種能夠創建RDD:
1. 從內存中的對象集合創建(parallelism collection)
2. 從外部存儲文件創建(如HDFS)
3. 從其它已存在的RDD轉換而來

第1種方式適合內存不敏感的少量數據進行操作。

比如以下代碼對1-10的整數進行處理:

val params = sc.parallelize( 1 to 10)
val results = params.map(doSomethingExpensive)

doSomethingExpensive函數在params上的每一個元素並行計算,並行度由spark.default.parallelism參數指定,默認情況下,假設在local模式執行。該值為機器的CPU核數。假設執行在集群環境中,該值為集群中屬於該應用的executor擁有的CPU總核數。


假設我們不想使用默認的並行度。能夠在parallelize方法傳入第二個參數指定,以下的代碼指定並行度為10:

val params = sc.parallelize(1 to 10 , 10)

第2種方法通過創建對外部數據的引用來創建RDD:

val text:RDD[String] = sc.textFile("hdfs-path")

路徑參數必須是Hadoop文件系統的的文件路徑。比如本地文件路徑。HDFS文件路徑,或者HDFS暴露的web接口webhdfs。

在內部。Spark使用MapReduce的TextInputFormat API讀取文件,因此文件的分區和MapReduce是一樣的,一個Block相應一個Partition。我們也能夠明白指定想要的Partition數:

val text:RDD[String] = sc.textFile("hadoop-fs-path" , 10 )

另外。我們能夠把整個文件當做一條記錄來處理,此時創建的RDD是一個PairRDD。key為文件路徑,value為文件的內容:

val file:RDD[(String,String)] = sc.wholeTextFiles("hadoop-fs-path")

除了讀取文本文件外,Spark也能夠讀取其它格式的文件,比如SequenceFile:

sc.sequenceFile[IntWritable,Text](input-path)

對於Writable數據類型,Spark會自己主動將其轉成相應的Java類型,因此以下這個語句與上邊的等同:

sc.sequenceFile[Int,String](input-path)

對於隨意類型的Hadoop InputFormat,有2中方式能夠創建RDD:

  • hadoopFile(): 用於基於文件的。用一個路徑來指定的情況
  • hadoopRDD():用於其它情況,比如HBase的TableInputFormat。

    新版本號的MapReduce(即V2)使用相應的newAPIHadoopFile和newAPIHadoopRDD。

    比如我們想讀取Avro格式的文件:

val job = new Job()
AvroJob.setInputKeySchema(job,WeatherRecord.getClassSchema)
val data = sc.newAPIHadoopFile(input-path , 
  classOf[AvroKeyInputFormat[WeatherRecord]],
  classOf[AvroKey[WeatherRecord]],
  classOf[NullWritable],
  job.getCofiguration)

第二行中我們把Schema增加做Job的配置中,方法四個參數分別為:

  • 文件路徑
  • InputFormat類型
  • Key類型
  • Value類型
  • Job配置

第3中創建RDD的方式是從已有的RDD轉換而來,下一部分的Transformation具體介紹這樣的方式。

轉化:Transformation和Action

Spark定義了2種作用於RDD的的操作:Transformation和Action。Transformation從一個已有的RDD。計算生成還有一個RDD。

Action作用在一個RDD上,計算後對結果進行操作。返回給用戶或者保存到外部存儲中。

調用Action會馬上生效。可是Transformation是懶執行的(lazy),全部轉換操作僅僅有當Action觸發之後才會執行。

val text = sc.textFile(input-path)
val lower: RDD[String] = text.map(_.toLowerCase)
lower.foreach(println(_))

上述代碼中,map是一個操作,foreach是一個Action,僅僅有當調用foreach時,Spark才會執行一個作業,從文件裏讀取數據創建RDD,針對每一個元素調用toLowerCase方法,輸出控制臺。

區分一個操作是Transformation還是Action,一般能夠通過返回類型來推斷。假設返回的是RDD,則為Transformation。否則為Action。關於RDD操作的描寫敘述,大部分能夠在org.apache.spark.rdd包下的RDD類找到,特定類型的鍵值對RDD。能夠在PairRDDFunctions中找到。

Spark的庫中帶了非常豐富的操作。包括mapping,grouping,aggregating。repartitioning。sampling,joining等轉換操作,Action操作包括從RDD中提取固定個數的元素,抽樣。保存到外部等。

更具體的描寫敘述能夠在Spark文檔中找到。

作用在鍵值對RDD上的聚合操作主要有3個:

  • reduceByKey
  • foldByKey
  • aggerateByKey。

這三個操作都是作用在每一個key上,對key上的值列表進行聚合操作。得到一個值。相應的reduce。fold,aggregate操作相似,可是是作用在整個RDD上,終於生成單一值。以下以一個樣例來說明這三個Transformation:

val pairs:RDD[(String,Int)]=
  sc.parallelize(Array(("a",3),("a",1),("b",7),("a",5)))

val sums: RDD[(String,Int)] = pairs.reduceByKey(_+_)
assert(sums.collect().toSet === Set(("a",9),("b",7)))

針對每一個key,reduceByKey操作將加法函數_+_循環作用在全部值。比如,對於a這個key。其值有:3,1,5, 執行加法(3+1)+5=9,每一次計算將上一次計算結果與下一個值進行運算。

由於這些操作通常都是在集群中並行執行,所以聚合函數必須是commutative和associative的,也就是計算結果跟的順序無關,我們的樣例中(3+1)+5與(5+3)+1結果是一樣的。

foldByKey的轉換操作例如以下:

val sums :RDD[(String,Int)] = pairs.foldByKey(0)(_+_)
assert( sums.collect().toSet === Set(("a",9),"b",7)))

不同於reduceByKey,foldByKey須要傳遞一個初始的”零值”,不同類型的零值可能不同,此時聚合變成 (((0+3)+1)+5)=9,初始值第一個參與運算,其它值順序無關,b相應的聚合為0+7=7。

reduceByKey和foldByKey都無法改動聚合結果的類型,即整數相加之後得到的依舊數整數,不能改動類型。

要想改動聚合結果的類型,須要使用aggregateByKey:

val sums :RDD[(String , HashSet[Int])]=
  pairs.aggregateByKey(new HashSet[Int])( _+=_ ,_++=_)

assert( sums.collect().toSet === 
        Set( ("a",Set(1,3,5)),
             ("b",Set(7))
           )
      )

我們提供了一個初始的值。即空的Set[Int],另外我們提供了兩個函數(_+=_,_++=_).第一個函數控制值是怎樣合並到Set中的,_+=_a=a+b的簡寫,對於Set而言。就是把值增加到原有的Set中,返回一個新的Set。原有Set保持不變。
第2個函數控制的是兩個Set是怎樣合並在一起的,這個在reduce中合並來自己多個Partition的聚合結果時使用。這裏的函數_++=_表示將第二個Set中的元素都加到第一個Set中。

轉換之後的RDD能夠在內存中持久化,以便興許的操作能夠更高速地訪問。

持久化:Persistence

前面我們提到將RDD轉化成鍵值對的RDD,這個開銷相對照較大。我們將轉化的結果緩存下來:

tuples.cache()

調用cache方法並不會立馬將結果緩存,而是設置一個標誌位告訴Spark:當執行這個作業的時候,將結果緩存下來。

為了讓這個緩存真正存在內存中。我們觸發一個Action:

tuples.reduceByKey((a,b)=>Marh.max(a,b)).foreach(println(_))

返回結果中的BlockManagerInfo顯示RDD的編號(RDD number)為4,有2個Paritition。同一個應用中興許作業假設須要用到這個RDD。將直接從緩存載入。我們執行一個找最小值的的轉換:

tuples.reduceByKey((a,b) => Math.min(a,b)).foreach(println(_))

從輸出信息能夠看到RDD是直接從緩存載入的(Found Block locally)。當數據量比較大事,能夠節約的時間非常可觀。相比於MapReduce。不同的作業之間假設想連接起來,僅僅能通過寫入文件,興許作業再從磁盤中讀取。Spark能夠將數據集緩沖在集群的內存中,興許作業能夠高速讀取數據。

Spark RDD的這樣的特性使得其在交互性的應用中非常有用,對於須要多次叠代的算法也非常適合在Spark上執行,叠代過程中的產生的數據能夠緩存在內存中,供下一輪叠代使用。

叠代算法能夠通過MapReduce實現,可是每一輪叠代的結果須要寫入磁盤,下一輪再從磁盤讀取。效率低下。

須要註意的是,緩存的RDD僅僅能被同一個應用的作業讀取,假設須要跨應用使用這些RDD。須要使用相應的saveAs*以及hadoopFile,hadoopRDD保存數據到外部存儲,然後再寫入。

當一個應用執行結束後。其全部緩存的RDD將無法訪問。除非保存到外部存儲中。

持久化級別

調用cache方法,能夠將RDD的各個partition保存到其executor的內存中。假設內存容納不下,作業不會失敗。而是在必要的時候又一次計算。對於擁有非常多Transformation的大型作業,又一次計算是非常昂貴的。

因此Spark提供了不同級別的持久化機制。在調用persist方法時傳入StorageLevel參數。控制持久化類型和級別。

默認情況下,持久化級別為MEMORY_ONLY,這個級別的緩存數據,在內存中以對象的形式存在。能夠通過將對象序列化成字節數組。達到更緊湊的數據格式,節約內存空間,這個級別是MEMORY_ONLY_SER。相比MEMORY_ONLY,這樣的序列化的方式更耗CPU,可是當未序列化的數據存不下內存而序列化之後能夠存放於內存時,這樣的犧牲是值得的,用CPU換內存,這是常見的一種權衡。MEMORY_ONLY_SER的持久化級別同一時候減少了GC壓力,由於RDD是以一個字節數組存在於內存中的。而不是非常多對象。

怎樣知道內存是否能容納下RDD。能夠通過查看BlockManager的日誌文件來獲取這一信息。

另外,每一個驅動程序的SparkContext都在4040端口上執行了一個HTTP Server,這個服務器提供了非常多有用的信息,比如關於SparkContext執行環境。當中執行的作業。還有緩存的RDD Paritition信息。

技術分享圖片

默認情況下。RDD Partition有用Java的序列化方式來序列化數據,但通常情況下Kyro是一個更好的選擇。不管實在速度上還是空間效率上。另外能夠通過壓縮來進一步節約內存空間,可是這是以CPU計算為代價的。要使用壓縮格式。將spark.rdd.compress屬性設為true。並依據須要設置壓縮算法spark.io.compression.codec.

假設又一次計算RDD的代價非常昂貴,那麽值得考慮另外的2中持久化級別:MEMORY_AND_DISK和MEMORY_AND_DISK_SER。前者會在內存不足時將數據寫入到磁盤,後者在序列化後的數據仍然無法存放在內存時。將數據寫到磁盤。

另外還有一些更高級的持久化特性,比如將RDD Partition的多個副本存在集群的不同節點上。或者使用對外(off-heap)內存,更具體內容參考Spark文檔。

序列化

Spark中的序列化通常考慮2個方面:

  • 數據序列化
  • 函數序列化

數據序列化

默認情況下。Spark使用Java的序列化機制序列化數據,並通過網絡發送給其它Executor。持久化(或者緩存)RDD Partition的時候也會涉及到數據序列化。實現了Serializable或Externalizable接口的對象使用Java標準的方式序列化之後,非常easy被其它JVM應用理解。

可是在性能和空間效率上不是非常理想。

對於大部分的Spark程序來說。Kyro 序列化是更好的選擇方式。Kyro是一種各高效的通用Java序列化庫。要使用Kyro序列化。須要在SparkConf上配置例如以下序列化器:

conf.set("spark.serializer","org.apache.spark.serializer.KyroSerializer")

Kyro不要求實現不論什麽特定的接口(像java.io.serializable那樣)。侵入性非常小。所以POJO能夠直接被序列化,除了啟用Kyro序列化以外,無需不論什麽其它操作。可是,假設在使用一個類之前先在Kyro註冊,能夠使得性能更加高效。假設沒有註冊,Kyro在序列化時會寫入一個對象所屬類的引用,每一個被序列化的對象都會寫入一個類的全稱。假設事先註冊,則僅僅寫入一個整數ID。Spark已經默認註冊了Scala的類和更多框架的類,比如Avro Generic,Thrift的類。

在Kyro中註冊類非常直接,創建一個KyroRegistrator的子類。實現registerClasses方法:

class CustomKyroRegistrator extends KyroRegistrator {
  override def registerClassed(kyro : Kyro) {
    kyro.register(classOf[WeatherRecord])
  }
}

然後在驅動程序中。將Registrator的類名全稱賦給spark.kyro.registrator屬性:

conf.set("spark.kyro.registrator","CustomKyroRegistrator")

函數序列化

在Scala中,函數的序列化採用的是Java標準的序列化機制。Spark也使用這樣的標準的方式發送函數給遠程的Executor節點。其實。即使執行在local模式(即全部Spark組價都在一個JVM中執行),Spark也會序列化函數。

假設在代碼中使用了不可序列化的函數,Spark將會報錯,比如從不可序列化類的一個方法中轉換而來的方法是不可序列化的。

共享變量

Spark程序常常須要訪問不在RDD中的數據,比如以下的代碼在map方法中使用了一個查詢表:

val lookup = Map( 1 -> "a" , 2 ->"e" , 3->"i" , 4->"0",5->"u")
val result = sc.parallelize(Array(2,1,3)).map(lookup(_))
assert( result.collect.toSet === Set("a" , "e","i"))

這樣的方式沒有問題,lookup變量被序列化之後作為一個閉包(closure)傳遞給map方法。

可是使用廣播變量(broadcast variables),能夠更高效地達到相同的效果

廣播變量:Broadcast variables

廣播變量會被序列化,然後發送給每一個executor,executor將廣播變量緩存,以便興許須要的時候使用,這不同於被序列化為閉包一部分的常規變量。常規變量會作為閉包的一部分通過網絡發送給每一個task。

廣播變量有點相似MapReduce中的分布式緩存,盡管Spark的實現中將數據存儲在內存。僅僅有在內存耗盡時才會寫到磁盤(而MapReduce的分布式緩存位於磁盤中)。

使用SparkContext的broadcast方法來廣播變量,該方法返回一個Broadcast[T]類型的包裝器:

val lookup:Broadcast[Map[Int,String]] =
  sc.broadcast(Map(1->"a",2->"e",3->"i",4->"o", 5 ->"u"))
val result = sc.parallelize(Array(2,1,3)).map(lookup.value(_))

廣播變量的值通過value來訪問。註意的是,廣播變量是單向傳播的,從驅動到任務,沒有辦法更新廣播變量或者將更新傳播回驅動程序。這樣的情況下。能夠通過Accumulator來實現。

Accumulators

累積器是一個共享變量,任務僅僅能做增加操作。就像MapReduce中的counter。

當一個作業執行完畢後。累積器的終於值能夠從驅動程序中獲取。以下這個樣例使用累積器來統計整形的RDD中有多少個元素,同一時候使用reduce Action來計算總和:

val count:Accumulator[Int] = sc.accumulator(0)
val result = sc.parallelize(Array(1,2,3))
  .map( i => {count += 1; i})
  .reduce((x,y) => x + y)

assert(count.value === 3)
assert(result === 6)

累計變量count通過SparkContext的accumulator方法創建。

當中的map操作是個identify function。原值返回,可是產生了count加1的副作用。

當作業執行完畢之後,累積器的值通過value訪問得到。

上面的樣例中,我們使用Int類型的累積器,可是不論什麽數值類型的數據類型都能夠用於累積器。

Spark另外提供了accumulable方法,用於累積器的結果類型和被累積的類型不一致的情況,accumulableCollection用於累積可變集合的值。更多內容參考Spark文檔。

Spark作業執行過程

Spark作業的執行從宏觀上看。僅僅要由driver和executors組成。driver執行application(SparkContext)、調度作業(schedule tasks)。executor負責具體任務的執行。通常情況下。driver執行在client機器上(client一般不受整個集群管理)。可是在YARN的cluster模式上,driver執行在Application Master上。

下圖是Spark執行作業的總體流程:

技術分享圖片

當有Action在RDD上執行時,作業被自己主動提交,提交將調用SparkContext的run_job被調用,進而把作業提交給Scheduler。Scheduler執行在driver上,由兩部分組成:

  • DAG Scheduler:負責將作業分解為stage組成的DAG
  • Task Scheduler:負責提交每一個stage的Task到集群。

構造DAG

在介紹作業怎樣被分解為DAG之前,我們須要了解一下stage能夠執行的任務類型,stage能夠執行2中類型的任務:shuffle map task和result task。

  • shuffle map task
    這樣的類型的任務相似於MapReduce中的map端shuffle。每一個shuffle map任務在RDD partition上執行計算,而且依據partitioning 函數將結果寫入新的partition,這些結果在興許被下一個stage取走。shuffle map task執行在最後一個stage以外的全部stage。
  • result task
    result task執行在作業的最後一個stage上,最後一個stage將結果返回給用戶程序(user’s program).每一個Result任務在他的RDD Partition上執行,並將結果發回給driver。driver組合來自每一個partition的結果。得到終於的計算結果。

最簡單的作業能夠僅僅有Result Task,也就是僅僅有一個由Result Task組成的stage。對於復雜的應用。可能須要組合多個shuffle stage。比如,以下的樣例中我們想要計算詞語的頻率分布直方圖:

val hist:Map[Int,Long] = sc.textFile(inputPath)
    .map(word => (word.toLowerCase(),1))
    .reduceByKey((a,b)=>a+b)
    .map(_.swap)
    .countByKey()

前兩個Transformation統計詞頻,即計算每一個詞語出現的次數。第三個Transformation對調key和value。得到的是(count。word)。最後的Action countByKey()得到頻率直方圖。即出現N詞的詞語有M個。

Spark的DAG Scheduler將這個作業分解為2個Stage。由於reduceByKey()這個Operation須要通過shuffle stage來完畢。最後的DAG例如以下圖所看到的:

技術分享圖片

在一個stage內,RDD通過也被組織成DAG。上圖中展示了RDD的類型以及產生該RDD的操作。RDD[String]有sc.textFile()創建。

圖中省略了一些Spark內部產生的RDD,比如textFile()創建的RDD其實是MappedRDD[String],其父類為HadoopRDD[LongWritable,Text]。

註意到reduceByKey同一時候出如今兩個Stage中,這是由於它是使用shuffle實現的。在map端(stage1)reduce函數作為combiner執行,在reduce端(stage2)作為reducer執行。

這一點相似於MapReduce。有些情況下降Reducer的實現直接作為map端的Combinor,對map任務的輸出結果先做一次預聚合,能夠避免在網絡上傳輸大量數據。

Spark的shuffle實現把輸出寫入到本地的分區文件(partitioned file),即使是內存級別的RDD,這些文件被下一個stage的RDD取走。

假設RDD已經被上一個job(同一個Application)持久化,則DAG Scheduler不會再創建stage又一次計算這個RDD(或者由這個RDD衍生的其它RDD)。

DAG Scheduler還負責將stage分級為task,然後將task提交給Task Scheduler。在這個樣例中,輸入文件的每一個partition,執行一個task(shuffle map)。reduceByKey()的並行度(parallelism)能夠通過其第二個參數設置,假設沒有設置,則從其上一級RDD推斷。這個樣例中就是輸入數據的partition數量。

針對每一個Task。DAG Scheduler都給出了一個位置偏好(placement preference),Task Scheduler能夠依據這些偏好,更有效地理由本地數據的優勢(data locality)。

比如。假設task處理的是來自HDFS的RDD,則更傾向於執行在擁有相應數據的節點上(node local)。而假設一個task處理的數據來自於緩存在內存中的RDD partition。則傾向於執行在內存中擁有這些數據的executor(process local)。

一旦DAG Scheduler構建完stage的DAG後。將每一個stage的任務提交給task scheduler。子stage僅僅有在其上一級完畢之後才提交。

任務調度

當task scheduler收到一組任務後,它依據應用持有的executor列表(在YARN中。Spark會實現申請固定數量的容器,然後自己決定怎樣使用這些Container,這一點不同於批處理的MapReduce,MapReduce按需申請Container)。結合任務的位置偏好,決定每一個任務執行在哪個executor,即task-executor映射。

然後把task分配給空暇的executor,直到任務集都執行完畢。

默認情況下。每一個task分配一核CPU,CPU數量能夠通過spark.task.cpus設置。

對於一個給定的executor,作業調度器優先分配有process-local偏好的task,然後依次是node-local,rack-local。最後才是沒有位置偏好(nonlocal的任務或者猜測執行(speculative)的任務。

已分配的任務通過scheduler backend啟動,這個backend發送啟動任務的消息給相應的executor backend,告知executor開始執行任務。

Spark使用Akka發送遠程消息,而不是使用Hadoop的RPC機制,Akka用於構建高並發和分布式的JVM應用。提供了工具箱和執行時。

Executor在任務結束或者失敗的時候發送狀態消息給driver,假設任務執行失敗。task scheduler將又一次提交任務到還有一個executor。

假設啟用了猜測執行(默認沒有啟用),對於執行慢的任務,會啟用speculative任務。

任務執行

Executor接到執行任務的消息後,首先確認任務須要的jar包和文件都是最新的。

假設之前的任務執行過,executor會在本地緩存這些jar包和文件,僅僅有當發生變更時才會又一次下載。接著反序列化任務代碼,任務代碼以序列化字節的形式通過任務啟動消息發送到executor。最後任務在executor同一個JVM中被執行。因此無需再任務啟動的時候又一次啟動JVM。

任務執行的結果序列化之後發送給executor backend,然後作為狀態更新消息(status update message)發回給driver。

假設是shuffle map task。返回中包括的信息用於下一個stage提取數據,相似於MapReduce中map任務完畢之後,通過心跳發送消息給Application Master,然後Reducer通過心跳得知map已經執行完畢,進而去copy數據。假設是Result task。將相應partition的執行結果發送給driver。driver組合出終於結果。

Executor和Cluster Manager

上一部分我們了解了Spark是怎樣依靠executor來執行任務的。接下來進一步了解executor到底是怎樣啟動的。Spark中,executor的聲明周期通過cluster manager來管理。Spark提供了多種不同的manager:

  • Local
    Local模式中,僅僅有一個跟driver執行在同一個JVM的executor。對於測試或者執行小型任務,這樣的模式非常有用。該模式下master的url為local(使用一個線程),local[n](n個線程)或者local(*)(線程數和CPU核數相等)。

技術分享圖片

  • Standalone
    Standalone是cluster manager一種簡單的分布式實現。執行一個Spark master和一個或多個的worker。當Spark應用啟動之後,master代表應用要求worker執行task。master的url為spark://host:port
  • Mesos
    Apache Mesos是一種通用的集群資源管理框架。

    在fine-grained模式下。每一個Spark Task作為一個Mesos Task執行。這樣的方式能夠更有效地利用集群資源,可是以進程啟動負擔為代價。

    在coarse-grained模式下,executor在進程任務執行任務。所以集群資源在Spark應用程序執行過程中一直被executor持有。

    master的url地址為mesos://host:port

  • YARN
    YARN是Hadoop使用的資源管理框架,每一個Spark應用相應一個YARN 應用實例。每一個executor在其自己的Container中執行。

    master的url為yarn-client或者yarn-cluster。

Mesos和YARN的資源管理方式優於Standalone。它們考慮了集群中其它應用對資源的需求(比如MapReduce作業),而Standalone採用靜態的資源分配方式,沒有辦法動態地調整以滿足集群的其它資源需求。

YARN是唯一一個與Hadoop的Kerberos安全機制集成的cluster manager。

Spark on YARN

Spark能夠通過兩種模式執行在YARN上:

  • YARN client mode:
    Driver執行在client機器上。對於包括有交互式組件的應用,必須使用這樣的模式,比如spark-shell和pyspark。

    在開發和調試Spark應用的時候,這樣的模式能夠立馬查看到debug信息。

  • YARN cluster mode:
    Driver執行在集群的Application Master上,適合於生產環境中的應用。

    整個應用執行在集群上,方便管理應用的日誌文件。

    另外YARN在Application Master故障的時候會重試應用。

YARN client mode

在YARN client模式上,在driver構造出SparkContext實例(下圖step1)的時候,就開始於YARN進行交互。Context向YARN的資源管理器(RM)提交一個應用(step2),RM在集群的NodeManager中啟動一個Container,並在Container中執行Spark ExecutorLauncher(step3)。ExecutorLauncher的任務是向RM申請資源(step4),並在申請到資源後將Executor Backend作為容器在相應的NodeManager中啟動(step5)

技術分享圖片

當executor啟動後,回頭連接到SparkContext並註冊自己,註冊的這個步驟能夠給SparkContext提供整個應用的executor信息,一遍task scheduler在決定將任務執行在哪個節點(task placement decision)的時候,能夠考慮任務的位置偏好。

executor的數量在啟動spark-shell,spark-submit或者pyspark時指定,假設沒有指定,默認啟動2個executor。每一個executor使用的CPU核數(默認1)和內存大小(默認1024M)也能夠在這個時候設置,以下這個樣例啟動spark-shell,執行4個executor:

spark-shell --master yarn-client --num-executors 4 --executor-cores 1 -- executor-memory 2g

技術分享圖片

不同於Standalone或者Mosos。YARN的NM地址並沒有在這裏配置,而是從Hadoop的配置中提取,配置的文件夾通過HADOOP_CONF_DIR設置環境變量。

YARN cluster mode

在cluster模式下,用戶的驅動程序(driver。下圖的Spark Program)執行在YARN的Application Master進程中,使用該模式時,指定master的url相似:

spark-submit --master yarn-cluster ...

其它參數。比如executor數量,jar包或者python文件,與client mode一樣。

例如以下圖所看到的,spark-submit客戶端將啟動一個YARN Application(step1通過請求NM),可是它不執行不論什麽用戶代碼。Application Master在開始為executor申請資源(step4)之前將啟動驅動程序(step3b)。其它的過程與client mode一樣。

註意此時我們假設再訪問剛才的4040端口,發現頁面自己主動跳轉到YARN的應用程序管理界面了,url相似:

master:18088/proxy/application_1469461440579_0001

技術分享圖片

YARN的兩種模式下。啟動executor之前對於數據放在哪裏(data locality)並不知道,所以啟動的executor可能與所要處理的數據不在一個節點上(從而task的位置偏好無法得到滿足)。

對於交互式的session來說,這個是比較easy接受的。由於在啟動一個交互會話時,非常可能並不知道要處理哪些數據。

可是在生產環境中就不是這樣了,所以Spark提供一種方式。當應用執行在cluster模式時,能夠通過位置提示(placement hint)來提高data locality,這是通過構造SparkContext時傳入位置偏好達到的。

構造SparkContext之前,我們能夠使用InputFormatInfo這個工具類來獲取位置偏好,比如對於文本文件,使用TextInputFormat。例如以下能夠獲取位置偏好:

val preferredLocations = InputFormatInfo.computePreferredLocations(
Seq(new InputFormatInfo(new Configuration(),classof[TextInputFormat],inputPath)))

val sc = new SparkContext(conf , preferredLocations)

這些位置偏好信息在Application Master為executor申請資源的時候能夠使用,眼下該特性的API還不是非常穩定。

最後我們提交一個Spark自帶的樣例到YARN集群執行:


export HADOOP_CONF_DIR=/home/hadoop-2.6.0/etc/hadoop/

spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode cluster  --executor-memory 2G --num-executors 6  /home/spark-1.6.0/lib/spark-examples-1.6.0-hadoop2.6.0.jar  1000

執行結果:

技術分享圖片

執行成功後在YARN管理界面能夠看到這個Spark作業:

技術分享圖片

參考資料

大部分內容來自《Hadoop權威指南》第4版

Spark核心概念理解