1. 程式人生 > >spark 基礎開發 Tips總結

spark 基礎開發 Tips總結

本篇部落格主要是 sparksql 從初始開發注意的一些基本點以及力所能及的可優化部分的介紹:  

所使用spark版本:2.0.0       scala版本:2.11.8

1. SparkSession的初始化:

 

val sparkSession = SparkSession.builder().master("local[*]").appName("AppName").config("spark.sql.warehouse.dir", "file:///D:/XXXX/XXXX/spark-warehouse").config("spark.sql.shuffle.partitions", 50).getOrCreate()

  

 注意點:

             a.  spark.sql.warehouse.dir 需要顯示設定,否則會丟擲 Exception in thread "main" java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: file:...   錯誤

             b. spark.sql.shuffle.partitions  指定 Shuffle 時 Partition 個數,也即 Reducer 個數。根據業務資料量測試調整最佳結果

                Partition 個數不宜設定過大:

               Reducer(代指 Spark Shuffle 過程中執行 Shuffle Read 的 Task) 個數過多,每個 Reducer 處理的資料量過小。大量小 Task 造成不必要的 Task 排程開銷與可能的資源排程開銷(如果開啟了 Dynamic Allocation)

            Reducer 個數過大,如果 Reducer 直接寫 HDFS 會生成大量小檔案,從而造成大量 addBlock RPC,Name node 可能成為瓶頸,並影響其它使用 HDFS 的應用

            過多 Reducer 寫小檔案,會造成後面讀取這些小檔案時產生大量 getBlock RPC,對 Name node 產生衝擊

               Partition 個數不宜設定過小:

            每個 Reducer 處理的資料量太大,Spill 到磁碟開銷增大

            Reducer GC 時間增長

            Reducer 如果寫 HDFS,每個 Reducer 寫入資料量較大,無法充分發揮並行處理優勢

2. 將非結構化資料轉換為結構化資料DataFrame(本人用的自定義模式): 

    val rdd= sparkSession.sparkContext.textFile(path, 250)  // 預設split為2

    val schemaString = "time hour lic"   //結構化資料的列名,可理解為關係型資料庫的列名

    val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true))   // 欄位名  欄位型別  是否可為空

    val schema = StructType(fields)      //上兩步組裝最終 createDataFrame 時需要的 schema

    val rowRDD = citySECRDD.map(_.split(",")).filter(attributes => attributes.length >= 6 && attributes(1).equals("2")&& attributes(0).split(" ").length > 1 && attributes(0).split(" ")(1).split(":").length > 1).map(attributes => {Row(attributes(0).trim,attributes(0).split(" "                   (1).split(":")(0).trim,attributes(2).trim,attributes(3).trim,attributes(4).trim,attributes(5).trim)})         //自定義一些過濾條件  以及組裝最終的 row型別的RDD

    val df= sparkSession.createDataFrame(rowRDD, schema)       //將rdd裝換成DataFrame

3. 兩種快取使用方式:

    1)df.persist(StorageLevel.MEMORY_ONLY)     //後續如果需要反覆使用DF[DataFrame的簡稱],則就把此DF快取起來                            
df.unpersist() //釋放快取 常用的兩種序列化方式:MEMORY_ONLY->不加工在記憶體中儲存 MEMORY_ONLY_SER->在記憶體中序列化儲存(佔用記憶體空間較小) 2)df.createOrReplaceTempView("table") sparkSession.sql("cache table table") // 以 sql 形式快取DF
sparkSession.sql("uncache table table") //釋放快取

4.spark整合Hbase快速批量插入

  將計算結果寫入Hbase:

      注意:1) 如果是帶有shuffle過程的,shuffle計算之前使用select()提出只需要的欄位然後再進行計算,因為shuffle特別耗費時間,寫磁碟的過程,所以要能少寫就少寫。

df.foreachPartition(partition => {

      val hconf = HBaseConfiguration.create();

      hconf.set(zkClientPort, zkClientPortValue) //zk 埠

      hconf.set(zkQuorum, zkQuorumValue) //zk 地址
      hconf.set(hbaseMaster, hbaseMasterValue) //hbase master
       val myTable = new HTable(hconf, TableName.valueOf(tableName))
       myTable.setAutoFlush(false, false) //關鍵點1
      myTable.setWriteBufferSize(5 * 1024 * 1024) //關鍵點2
      partition.foreach(x => {

      val column1 = x.getAs[String]("column1") //列1
      val column2 = x.getAs[String]("column2") //列2
      val column3 = x.getAs[Double]("column3") //列3
      val date = dateStr.replace("-", "") // 格式化後的日期

    val rowkey = MD5Hash.getMD5AsHex(Bytes.toBytes(column1+ date)) + Bytes.toBytes(hour)
    val put = new Put(Bytes.toBytes(rowkey))
    put.add("c1".getBytes(), "column1".getBytes(), licPlateNum.getBytes()) //第一列族 第一列 
    put.add("c1".getBytes(), "column2".getBytes(), hour.getBytes()) //第一列族 第二列
    put.add("c1".getBytes(), "column3".getBytes(), interval.toString.getBytes()) //第一列族 第三列
    put.add("c1".getBytes(), "date".getBytes(), date.getBytes()) //第一列族 第四列
    myTable.put(put)
     })
     myTable.flushCommits() //關鍵點3
    /*
    *關鍵點1_:將自動提交關閉,如果不關閉,每寫一條資料都會進行提交,是匯入資料較慢的做主要因素。
     關鍵點2:設定快取大小,當快取大於設定值時,hbase會自動提交。此處可自己嘗試大小,一般對大資料量,設定為5M即可,本文設定為3M。
     關鍵點3:每一個分片結束後都進行flushCommits(),如果不執行,當hbase最後快取小於上面設定值時,不會進行提交,導致資料丟失。
     注:此外如果想提高Spark寫資料如Hbase速度,可以增加Spark可用核數量。
    */

5. spark任務提交shell指令碼:

spark-submit --jars /XXX/XXX/hbase/latest/lib/hbase-protocol-0.96.1.1-cdh5.0.2.jar \
         --master yarn\
         --num-executors 200 \
         --conf "spark.driver.extraClassPath=/share/apps/hbase/latest/lib/hbase-protocol-0.96.1.1-cdh5.0.2.jar" \
         --conf "spark.executor.extraClassPath=/share/apps/hbase/latest/lib/hbase-protocol-0.96.1.1-cdh5.0.2.jar" \ 
         --conf spark.driver.cores=2 \
         --conf spark.driver.memory=10g \
         --conf spark.driver.maxResultSize=2g \
         --conf spark.executor.cores=6 \
         --conf spark.executor.memory=10g \
         --conf spark.shuffle.blockTransferService=nio \
         --conf spark.memory.fraction=0.8 \
         --conf spark.shuffle.memoryFraction=0.4 \               
         --conf spark.default.parallelism=1000 \
         --conf spark.sql.shuffle.partitions=400 \                     預設200,如果專案中程式碼設定了此選項,則程式碼設定級別優先,會覆蓋此處設定
         --conf spark.shuffle.consolidateFiles=true \
         --conf spark.shuffle.io.maxRetries=10 \
         --conf spark.scheduler.listenerbus.eventqueue.size=1000000 \
         --class XXXXX\                                                                專案啟動主類引用
         --name zzzz \
         /data/XXX/XXX-jar-with-dependencies.jar \                       專案jar包
        "引數1" "引數2" 

  

注: 紅色部分是Hbase需要的配置,同時需要在spark叢集的spark-defaults.conf 裡面配置

        spark.driver.extraClassPath  和  spark.executor.extraClassPath   直指 hbase-protocol-0.96.1.1-cdh5.0.2.jar 路徑

先寫到這裡吧,後續會繼續完善通過sparkUi 優化細節以及提交spark任務的時候 如何分配 executor.cores 和 executor.memory。