1. 程式人生 > 程式設計 >Spark 程式設計

Spark 程式設計

一、Spark Shell on Client

scala> var rdd =sc.parallelize(1 to 100,3)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> rdd.count
res0: Long = 100  
scala> val rdd2=rdd.map(_ + 1)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:26
scala> rdd2.take(3)
res1: Array[Int] = Array(2,3,4)
scala> val rdd1=sc.textFile("file://home/hadoop/apps/sparkwc"
) rdd1: org.apache.spark.rdd.RDD[String] = file://home/hadoop/apps/sparkwc MapPartitionsRDD[3] at textFile at <console>:24 cala> val rdd1=sc.textFile("file:///home/hadoop/apps/sparkwc") rdd1: org.apache.spark.rdd.RDD[String] = file:///home/hadoop/apps/sparkwc MapPartitionsRDD[9] at textFile at <console>:24 scala> val rdd2=rdd rdd rdd1 rdd2 rdd3 rddToDatasetHolder scala> val rdd2=rdd1.flatMap(_.split("\t"
)) rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[10] at flatMap at <console>:26 scala> val rdd3=rdd2.map((_,1)) rdd3: org.apache.spark.rdd.RDD[(String,Int)] = MapPartitionsRDD[11] at map at <console>:28 scala> val rdd4=rdd3.reduceByKey(_ + _) rdd4: org.apache.spark.rdd.RDD[(String,Int)] = ShuffledRDD[12] at reduceByKey at <console>:30 scala> rdd4.collect res2: Array[(String,Int)] = Array((spark,1),(hadoop,(hello,3),(world,1)) scala> rdd4.collect res2: Array[(String,1)) scala> rdd4.saveAsTextFile("file:///home/hadoop/apps/out1"
) [hadoop@hadoop01 apps]$ cd out1/ [hadoop@hadoop01 out1]$ ls part-00000 _SUCCESS [hadoop@hadoop01 out1]$ cat part-00000 (spark,1) (hadoop,1) (hello,3) (world,1) [hadoop@hadoop01 out1]$ pwd /home/hadoop/apps/out1 複製程式碼

WebUI 地址:http://192.168.43.20:4040/jobs/

二、Spark Shuffle

  • Shuffle Write:將Task中間結果資料寫入到本地磁碟
  • Shuffle Read:從Shuffle Write階段拉取資料到記憶體中平行計算
    SparkShuffle Write

三、Shuffle Write(hash-based)

  • Shuffle Write階段產生的總檔案數=MapTaskNum * ReduceTaskNum
  • TotalBufferSize=CoreNum * ReducceTaskNum*FileBufferSize
  • 產生大量小檔案,佔用更多的記憶體緩衝區,造成不必要的記憶體開銷,增加 了磁碟IO和網路開銷
    Shuffle Write

四、Shuffle Write(hash-based優化)

  • Shuffle Write階段產生的總檔案數=CoreNum * ReduceTaskNum
  • TotalBufferSize=CoreNum * ReducceTaskNum*FileBufferSize 減少了小檔案產生的個數,但是佔用記憶體緩衝區的大小沒變
  • 設定方法
    • conf.set("spark.shuffle.manager","hash")
    • 在conf/spark-default.conf 配置檔案中新增spark.shuffle.manager=hash
      Shuffle Write優化

五、Shuffle Write(hash-based優化)Shuffle Write(sort-based)

  • Shuffle Write階段產生的總檔案數= MapTaskNum * 2
  • 優點: 順序讀寫能夠大幅提高磁碟IO效能,不會產生過多小檔案,降低檔案快取佔用記憶體空間大小,提高記憶體使用率。
  • 缺點:多了一次粗粒度的排序。
  • 設定方法
  • 程式碼中設定:conf.set("spark.shuffle.manager","sort")
  • 在conf/spark-default.conf 配置檔案中新增spark.shuffle.manager=sort
    sort-based

六、Shuffle Read

  • hase-based和sort-based使用相同的shuffle read實現
    Shuffle Read

七、Spark History Server配置

  • spark history server檢視執行完成的作業資訊和日誌
  • 配置Hadoop的yarn-site.xml檔案,所有節點配置檔案同步,重啟yarn
<property>
<name>yarn.log.server.url</name>
<value>http://node02:19888/jobhistory/logs</value>
<description> Yarn JobHistoryServer訪問地址 </description>
</property>
複製程式碼
  • 修改spark安裝包conf目錄下的spark-defaults.conf(如果沒有該檔案, 通過spark-defaults.conf.template模板複製一個),spark history server 在192.168.183.100節點啟動,spark_logs這個目錄需要在HDFS上提前建立
spark.yarn.historyServer.address=192.168.183.100:18080  spark.history.ui.port=18080
spark.eventLog.enabled=true  spark.eventLog.dir=hdfs:///spark_logs
spark.history.fs.logDirectory=hdfs:///spark_logs
複製程式碼

1.Spark History Server啟動

  • 啟動Spark History Server
sbin/start-history-server.sh
複製程式碼
  • Spark History Server訪問地址
httpL://192.168.183.100:18080
複製程式碼

Spark history

七、Spark執行環境優化

  • 將spark系統jar包上傳到HDFS上,直接使用HDFS上的檔案
  • 在spark安裝目錄下執行:jar cv0f spark-libs.jar -C jars/ .
  • 將spark安裝目錄下生成的spark-libs.jar上傳到HDFS上的 /system/spark(需要手動建立)目錄下
hadoop fs -put spark-libs.jar /system/spark
複製程式碼

修改spark安裝包conf目錄下spark-defaults.conf配置檔案新增spark- libs.jar在HDFS上的路徑

spark.yarn.archive=hdfs:///system/spark/spark-libs.jar
複製程式碼

八、Spark程式設計模型

  • 建立SparkContext
    • 封裝了spark執行環境資訊
  • 建立RDD
    • 可以用scala集合或hadoop資料檔案建立
  • 在RDD上進行transformation和action
    • spark提供了豐富的transformation和action運算元
  • 返回結果
    • 儲存到hdfs、其他外部儲存、直接列印

1.提交Spark程式到Yarn上

2.Spark RDD運算元分類

  • Transformation轉換操作,惰性執行,不觸發app執行
    • 針對Value資料型別,如map、filter
    • 針對Key-Value資料型別,如groupByKey、reduceByKey
  • Action執行操作,觸發app執行

3.建立RDD

  • parallelize從集合建立RDD
    • 引數1:Seq集合,必須
    • 引數2:分割槽數
    • 建立RDD:val rdd = sc. parallelize(List(1,2,4,5,6,7),3)
    • 檢視RDD分割槽數:rdd.partitions.size
  • textFile從外部資料來源(本地檔案或者HDFS資料集)建立RDD
    • 引數1:外部資料來源路徑,必須
    • 引數2:最小分割槽數
    • 從本地檔案建立RDD:val rdd = sc.textFile("file:///home/hadoop/apps/in")
    • 從HDFS資料集建立RDD:val rdd = sc.textFile("hdfs:///data/wc/in",1)

4.Value資料型別Transformation

  • map
    • 輸入是一個RDD,將一個RDD中的每個資料項,通過map中的函式對映輸出一個新的RDD,輸入分割槽與輸出分割槽一一對應
  • flatMap
    • 與map運算元功能類似,可以將巢狀型別資料拆開展平
  • distinct
    • 對RDD元素進行去重
  • coalesce
    • 對RDD進行重分割槽
    • 第一個引數為重分割槽的數目
    • 第二個為是否進行shuffle,預設為false,如果重分割槽之後分割槽數目大於 原RDD的分割槽數,則必須設定為true
  • repartition
    • 對RDD進行重分割槽, 等價於coalesce第二個引數設定為true
  • union
    • 將兩個RDD進行合併,不去重
  • mapPartitions
    • 針對RDD的每個分割槽進行操作,接收一個能夠處理迭代器的函式作為引數
    • 如果RDD處理的過程中,需要頻繁的建立額外物件,使用mapPartitions要比使用map的效能高很多,如:建立資料庫連線
  • mapPartitionsWithIndex
    • 與mapPartitions功能類似,接收一個第一個引數是分割槽索引,第二個引數是分割槽迭代器的函式
  • zip
    • 拉鍊操作,將兩個RDD組合成Key-Value形式的RDD,保證兩個RDD的partition數量和元素個數要相同,否則會丟擲異常
  • mapValues
    • 針對[K,V]中的V值進行map操作
  • groupByKdy
    • 將RDD[K,V]中每個K對應的V值,合併到一個集合Iterable[V]中
  • reduceByKey
    • 將RDD[K,V]中每個K對應的V值根據傳入的對映函式計算
  • join -返回兩個RDD根據K可以關聯上的結果,join只能用於兩個RDD之間的關聯,如果要多個RDD關聯,需要關聯多次

5.RDD Action

  • collect
    • 將一個RDD轉換成陣列,常用於除錯
  • saveAsTextFile
    • 用於將RDD以文字檔案的格式儲存到檔案系統中
  • take
    • 根據傳入引數返回RDD的指定個數元素
  • count
    • 返回RDD中元素數量

6.Spark優化-Cache應用

Cache應用

7.Accumulator計數器

  • accumulator累加器,計數器
    • accumulator累加器,計數器
    • 通常用於監控,除錯,記錄關鍵資料處理的數目等
    • 分散式計數器,在Driver端彙總
val total_counter = sc.accumulator(0L,"total_counter")  
val resultRdd = rowRdd.flatMap(_.split("\t")).map(x=>{  total_counter += 1
(x,1)
}).reduceByKey(_ + _)
複製程式碼

通過Spark Web UI檢視

Accumulator計數器