1. 程式人生 > >spark 最佳實踐學習筆記

spark 最佳實踐學習筆記

大資料的特徵4v 1、大量 2、多樣 3、快速 4、價值

如何儲存和計算大資料 大資料交易

加州大學伯克利分校AMP實驗室

spark 讀取HDFS的資料到記憶體,在記憶體中使用彈性分散式資料集RDD組織資料

支援常見的mapreduce正規化,還支援圖形計算、流計算

spark支援scala、python、R

RDD主要分兩類操作: 1、轉換(transformation) 2、動作  (action)

RDD生成方式: 1、由Driver程式的資料集生成,不適合處理大量資料,適合互動式或資料量較小 2、由外部資料集生成,由分散式資料直接生成分散式資料集

預設,RDD的transformation只有等到action時才會進行,可以對RDD進行持久化或cache操作,也會觸發RDD進行真正的計算

RDD持久化操作persist()可以指定記憶體或磁碟,快取操作cache()只在記憶體中

spark最核心的抽象是彈性分散式資料集RDD

spark嚴重依賴傳遞函式型別的引數, 有兩種建議的方法: 1、匿名函式,適用於小片段的程式碼 2、object物件中的靜態方法 不建議的方法: 1、傳入普通類的方法,必順將方法所屬的例項一起傳進去,方法的第一個隱含引數是this指向例項 普通類必順具備序列化的能力(繼承java.io.serializable),因為需要將程式碼分發到各個計算節點,普通類也可以不繼承序列化,只需用function(val)代替method(def),因為function本身就支援序列化

spark的核心是RDD,RDD是分散式計算的

每個JOB的執行都會經歷序列化、網路傳輸、反序列化、執行

在序列化時,會將JOB執行所依賴的變數、方法(閉包)全部打包一併序列化

RDD操作不能巢狀,即不能在RDD操作傳入的函式引數的函式體中進行RDD呼叫

spark工作機制 排程管理、記憶體管理、容錯機制、監控管理、配置管理

排程管理的主要目的是資源分配 spark叢集上主要資源是cpu core數理和記憶體

spark排程按場景分兩類: 1、spark程度之間的排程,是最主要的排程場景 2、spark程度內部的排程

叢集模式下的spark程式 1、Driver程式: sprarkcontext 2、叢集管理器 3、工作節點:      執行器 task      快取

1、Driver程式 叢集模式下,使用者編寫的spark程式稱為Driver程式.每個Driver程式包含一個代表叢集環境的sparkcontext物件與之連線,程式的執行從Driver程式開始,中間過程會呼叫RDD操作,這些操作通過叢集資源管理器來排程執行,一般在worker節點上執行,所有操作執行結束後回到Driver程式中,在Driver程式中結束. 2、sparkcontext物件 每個驅動程式都 有一個sparkcontext物件,擔負著與叢集溝通的職責. 1、sparkcontext物件聯絡叢集管理器,分配cpu、記憶體等資源 2、叢集管理器在工作節點上啟動一個執行器(專屬本驅動程式) 3、程式程式碼會被分發到相應的工作節點上 4、sparkcontext分發任務(task)至各執行器執行

3、叢集管理器 叢集管理器負責叢集的資源排程 spark支援3種叢集部署方式,每種部署對應一種資源管理器 1、standalone模式(資源管理器是Master節點),只支援先進先出模式 2、Hadoop YARN(資源管理器是YARN叢集),主要用來資源管理.YARN支援動態資源管理,更適合多使用者場景下的叢集管理,而且YARN可以同時排程spark計算和hadoop MR計算,還可以排程其它實現了YARN排程介面的叢集計算,非常適合多個叢集同時部署的場景,是目前最主流的一種資源管理系統 3、Apache Mesos(資源管理器是Mesos叢集),Mesos是一個專門用於分散式系統資源管理的開源系統,可以對叢集中的資源做彈性管理

執行器:每個spark程式在每個節點上啟動的一個程序,專屬於一個spark程式,與spark程式有相同的生命週期,負責spark在節點上啟動的task,管理記憶體和磁碟.如果一個節點上有多個spark程式在執行,那麼會相應地就會啟動多個執行器. Job:一次RDD action對應一次Job,會提交至資源管理器排程執行。 stage:Job在執行過程中被分為多個階段,task集合。 task:在執行器上執行的最小單元。

spark程式之間的排程 spark程式之間的排程資源分配策略: 1、靜態分配 2、動態分配

靜態分配:spark程式啟動時即一次性分配所有資源,執行過程中固定不變,直至程式退出。是一種簡單可靠的分配策略,強烈建議優先使用這種策略 動態分配:執行過程中不斷調整分配的資源,可以按需增加或減少。動態資源分配的粒度是執行器,即增加或減少執行器.spark程式在機器的每個節點上只有一個執行器,所以增加或減少執行器意味著spark程式服務的節點的增加或減少

共享變數 廣播變數,計數器

廣播變數是隻讀的,建立之後再修改沒有意義,一般用val定義 計數器只能增加,可用於計數或求和,預設數值型別,可自定義型別,只有Driver程式可以讀到這個計算器變數,

spark master容錯分兩種情況:standalone叢集模式、單點模式. standalone叢集模式master容錯是通過zookeeper實現的,多個master,一個是active,其它是standby. slave節點執行著worker、執行器和Driver程式.

監控管理:web介面、metrics、外部系統

web介面: 每一個Driver的sparkcontext都會啟動一個web介面,預設埠是4040 spark還提供了rest api介面,son格式 metrics指標體系: spark還支援基於coda hale metrics library的指標體系,可以主動將執行狀態傳送給其它系統,方便與其它系統整合 其它監控工具: 叢集級別:ganglia 監控各節點的cpu、磁碟、網路負載等 作業系統級別:作業系統自帶的工具(dstat、iostat、iotop) JVM工具:jstack,jmap,jstat,jconsole等

spark程式配置管理: 靈活配置,可以使用環境變數、配置檔案、命令列引數,還可以直接在spark程式中直接指定,優先給不同,可以相互覆蓋

spark程式配置載入過程: spark程式是通過指令碼bin/spark-submit來提交的,互動式程式設計也由它提交 1、設定SPARK_HOME環境變數為bin/spark-submit指令碼父目錄 2、配置檔案目錄,由環境變數SPARK_CONF_DIR指定,預設為${SPARK_HOME}/conf 3、執行配置檔案spark-env.sh,設定基本的環境變數 4、載入配置檔案目錄下預設配置檔案spark-defaults.conf 5、讀取命令列引數,覆蓋前面配置 6、使用sparkconf物件的配置覆蓋前面的配置

環境變數配置: 可以在提交前通過export設定也可以在配置檔案目錄下的spark-env.sh檔案中指定 常用的通用配置項: 1、SPARK_LOCAL_IP: 繫結的ip 2、SPARK_PUBLIC_DNS: Driver程式使用的DNS伺服器 3、SPARK_CLASSPATH:額外追加的CLASSPATH

spark屬性項配置: 1、spark-defaults.conf 2、命令列引數 3、sparkconf物件 優先順序依次由低到高

spark日誌配置: 使用配置檔案目錄下的log4j.properties作為配置檔案

spark核心講解

spark核心資料結構RDD:

RDD是spark最重要的抽象,掌握了RDD,就掌握了spark計算的精髓。

RDD定義: 一個RDD包含5個核心屬性. 1、一個分割槽列表,每個分割槽裡是RDD的部分資料(或資料塊) 2、一個依賴列表,儲存依賴的RDD 3、一個名為compute的計算函式,用於計算RDD各分割槽的值 4、分割槽器(可選),用於鍵值型別的RDD,比如某個RDD是按雜湊來分割槽的 5、計算各分割槽時的選先的位置列表(可選)

spark支援兩種分割槽方式:hash、range

5、RDD依賴分兩種型別: 窄依賴:依賴父分割槽的部分分割槽 shuffle依賴:依束父分割槽的全部分割槽

對依賴鏈條過長可以設定檢查點,對中間結果儲存一份快照,流式計算依賴鏈條會無限擴大需要設定檢查點

transformation代表計算的中間結果,而action代表計算的最終結果,action不可以在transformantion的內部呼叫 transformation只建立計算關係,而action才是真正的執行者,action才會真正的向叢集提交Job,每一個action代表一個Job

shuffle就是將分佈在不同節點上的資料匯聚到一個節點的過程 shuffle是一個非常消耗資源的操作,涉及大量的IO和大量記憶體,還會生成大量臨時檔案,用於避免錯誤恢復時重新計算 shuffle使用的本地磁碟目錄由spark.local.dir指定

sparkcontext是spark程式的主要入口,用於與叢集建立連線,所有的spark程式都必順建立一個sparkcontext物件 初始化sparkcontext時只需要sparkconf配置物件作為引數或無參(會預設生成一個sparkconf物件)

DAG排程與Task排程: DAG是最高層級的排程,為每個Job繪製出一個有向無環圖,跟蹤各task的輸出,計算完成Job的最短路徑,並將task提交到task排程器來執行,而task排程器只負責接收DAG排程器的請求,負責task的實際排程執行 DAG排程的粒度是stage,具體執行過程是將stage下的task提交至task排程器 task排程器從DAG排程器的stage接收一組task,並負責將它們提交至叢集,執行它們,出錯進行重試,最後返回訊息給DAG排程器

其它功能介面 sparkcontext除了初始化環境、連線叢集外, 1、建立RDD 2、RDD持久化 3、建立共享變數 4、stop(),停止sparkcontext 5、runjob,提交action操作

spark sql是spark的一個模組,專門用於處理結構化資料

使用spark sql有兩種方式: 1、作為分散式sql引擎 2、在spark程式中,通過領域api來操作資料(抽象dataframe)

分散式sql引擎: 有兩種執行方式:jdbc/odbc server,使用spark sql命令列

jdbc/odbc server預設tcp協議,預設繫結host為localhost,表示所有ip,預設埠為10000,也可以設定為http協議,hive.server2.transport.mod=http,預設埠為10001 jdbc/odbc server模式: 1、DB,用於儲存資料庫元資料    a、建立mysql服務,建立使用者並賦權    b、配置conf/hive-site.xml 2、啟動一個jdbc/odbc server作為對外服務的介面    a、./sbin/start-thriftserver.sh 3、測試,使用互動式測試工具beeline    ./bin/beeline    beeline> !connect jdbc:hive2://localhost:10000    然後輸入使用者名稱和密碼    如果出現查詢結果中文亂碼:LANG=zh_CN.UTF-8; ./bin/beeline 啟動時指定客戶端編碼

spark sql命令列:另外一種執行spark sql方式,適合本地除錯 1、./bin/spark-sql 2、也支援與jdbc/odbc server一樣的配置

spark sql支援絕大多數hive特性

catalyst執行優化器 所有的spark sql語句最終由catalyst解析、優化並最終生成可以執行的java位元組碼 catalyst是spark sql最核心的部分,其最主要的資料結構是樹,所有的sql語句都由樹結構來儲存,樹中的每一個節點都由一個class,以及0或多個子節點. catalyst另外一個重要且基礎的概念是規則,基本上所有優化都是基於規則的 catalyst的執行過程分為四個階段: 1、分析階段,分析邏輯樹,解決引用 2、邏輯優化階段 3、物理計劃階段,catalyst會生成多個計劃,並基於成本進行對比 4、程式碼生成階段,將查詢編譯成java位元組碼

spark實時流式計算spark streaming spark streaming以spark為核心,具備可擴充套件性、高吞量、自動容錯等特點,資料來源支援hdfs/s3/nfs,kafaka,flume,twitter,zeromq,kinesis或tcp socket,處理時可以使用map,reduce,join,window等高階函式實現複雜邏輯,結果可以寫入檔案系統、資料據、實時展示系統等 在內部,spark streaming接收實時資料,按週期將資料分成多批次(batch),按批次提交給spark核心來排程計算 spark streaming使用的資料抽象是DStream,表示連續的資料流,但內部其實是通過RDD序列來儲存的 spark streaming支援java、python、scala語言,暫不支援R

StreamingContext是spark streaming程式設計最基本的環境物件,提供最基本的功能入口,包括從各途徑建立最基本物件DStream(類似spark的RDD) StreamingContext建立: 1、val conf = new SparkConf().setAppName("SparkStreamingWordCount")    注:spark-shell下,需先呼叫sc.stop(),來停止預設的sparkcontext,    val ssc = new StreamingContext(conf,Seconds(5))    注:執行週期為5秒,表示流式計算間隔5秒執行一次,時間設定應該大於每次執行時間 2、從sc中建立    val ssc = new StreamingContext(sc,Seconds(5)) 一個完整個流式計算需要以下幾個步驟: 1、建立一個輸入DStream,用於接入資料 2、使用作用於DStream上Transformatin和output操作來定義流式計算(spark程式使用Transformation和action) 3、啟動計算,使用streamingContext.start(); 4、等待計算結束,使用streamingContext.awaitTermination(); 5、也可以手工結束計算,streamingContext.stop();

DStream內部是一個RDD序列,每個RDD對應一個計算週期

容錯處理 RDD是隻讀、可重複計算的分散式資料集,它用鏈條記錄了RDD從建立開始的中間每一步的計算過程,錯誤恢復就是重新計算的過程 spark操作的資料一般都儲存在有容錯功能的檔案系統上,從這些系統上的資料生成的RDD也具有容錯能力,但是這不適用於spark streaming.因為大部分場景下,spark streaming的資料來自網路,為了達到相同的容錯能力,通過網路接收到的資料還被複制到其它節點上,這就導致錯誤發生時有兩類資料需要恢復: 1、剛收到已經快取,但還沒有被複制到其他節點的資料。因為沒有副本,恢復的唯一方法是從資料來源重新獲取一份 2、收了到且已經複製到其他節點的資料。可以從其他節點恢復 此外,還有兩類可能發生的錯誤: 1、worker節點失效。一旦計算節點失效,所有記憶體中的資料都會丟失且無法恢復 2、Driver節點失效。如果執行Driver程序的節點失效,那麼SparkContext也會隨之失效,整個Streaming程式會退出,所有附屬的執行節點都會退出,記憶體中的資料全部丟失。 關於容錯保障的效果定義,一般都是用資料計算的次數來定義: 1、至多一次。每條記錄最多被計算一次或根本沒有計算就丟失了 2、至少一次。保證每條記錄都不丟失,最少計算一次,但可能會重複多次計算 3、精準一次。保證每條記錄都不丟失,並且只計算一次,不多不少,顯示這是最佳的容錯保障 一般流式計算分為3步: 1、輸入資料流 資料接收 2、資料計算 3、結果輸出 要想實現精準一次的容錯效果,需要確保每一步都能實現精準一次的計算: 步驟一、資料接收,容錯保障很大程度上依賴於資料來源 步驟二、transformation計算,因為有RDD容錯性的保證,所以可以實現精準一交的容錯保障 步驟三、結果輸出,預設只提供“至少一次”的容錯保障,不到達到“精準一次”的級別,是因為還依賴輸出操作的型別和下一級接收系統是否支援事務特性。 資料接收容錯: 不同資料來源提供不同程式的容錯保障 1、對於HDFS,S3等自帶容錯功能的檔案系統,我們可以保障精準一次的容錯能力 2、Spark從1.3引入新的Kafka Direct API,也可以保障精準一次的容錯能力 3、對於其他使用接收器來接收資料的場景,視接收機制是否可靠以及是否開啟WAL功能而不同 結果輸出容錯: 結果輸出操作本身提供至少一次級別的容錯效能,就是說可能輸出多次至外部系統,但可能通過一些輔助手段來實現精準一次的容錯效果 當輸出為檔案時是可以接受的,因為重複的資料會覆蓋前面的資料,結果一致,效果相當於精準一次 其它場景下的輸出要想實現精準一次的容錯,需要一些額外的操作,有兩種方法: 1、冥等更新。確保多操作的效果與一次操作的效果相同,如saveAS***Files即便呼叫多次,結果還是同一個檔案 2、事務更新。更新時帶上事務資訊,確保更新只進行一次,更新時判斷是否已經更新 檢查點: 流式計算7*24小時的執行特點,除了考慮具備容錯能力,還要考慮容錯的代價問題。為了避免錯誤恢復的代價與執行時間成正比增長,Spark提供了檢查點功能,使用者定期記錄中間狀態,避免從頭開始計算的漫長恢復。 呼叫有狀態的Transformation操作必順啟用檢查點,如updateStateByKey或reduceByKeyAndWindow.因為有狀態的操作是從程式開始時一直進行的,如果不進行檢查點,那麼計算連線會隨著時間一直增長,重新計算代價太高。 另外,如果期望程式在因Driver節點失效後的重啟之後可以繼續執行,也建議開啟檢查點功能,可以記錄配置、操作以及未完成的批次,重啟後可以繼續執行. 實際上大部分程式不需要啟用檢查點 開啟檢查點的方法,呼叫streamingContext.checkpoint(checkpointDirectory)即可,引數是一個支援容錯的檔案系統目錄,如HDF3、S3. 檢查點是有代價的,需要儲存資料至儲存系統,增加批次的計算時間,並且降低吞吐量,我們可以通過增加週期的時間間隔來降低影響,一般建議時間間隔至少為10秒 效能調優: Spark流式計算程式想要執行流暢,需要一些基本的調優: 1、每個批次的處理時間儘可能短    a、增加資料接收的併發數量,尤其是當瓶頸發生在資料接收時。    預設每個InputDStream都只會建立一個接收器,執行在某個節點上,可以建立多個InputDStream,它他們接收不同的資料分割槽。以實現並行接收。    b、資料處理的併發度,如果併發度不夠,可能導致叢集的資源不被充分利用,檢視各機器CPU的所有核心是不是都在工作,可以調整 選項spark.default.parallelism來增加併發度    c、資料序列化,資料接收後,當需要與磁碟交換資料時,資料可能會進行序列化與反序列化,好處是節省空間和記憶體,但會增加計算負載。因些我們應習可能地使用Kryo來完成這項工作,cpu和記憶體開銷相對少一些    最後要注意task啟動的額外開銷,如果task啟動過於頻繁(50s/次),那麼額外開銷可能非常高,甚至無法達到那樣的實時計算要求 2、收到資料後,儘可能快地處理     設定合理批次間隔時間。一般來說,短時間間隔導致更多的額外開銷,以及無法完成的風險,所心前期可以採取相對保守的方法,如間隔設定為5~10秒,然後觀察後進行按需縮短間隔時間 圖計算可以簡單理解為以圖這種資料結構為基礎,基於其實現的相關演算法及應用. Spark的圖計算庫叫作GraphX 一個圖由頂點集V和頂點間的關係集合E組成,可以用二元組定義為:G=(V,E) 圖的計算量一般都比較大,而且通常會有多次迭代。 Spark GraphX依託Spark的強大計算能力,提供了圖計算需要的便捷API,同時兼具平行計算的效能,是做大規模圖計算的一把利器 GraphX的核心資料結構是Graph,是一種攜帶了每個點和邊的屬性的有向多重圖. 多重圖就是一對源、目的節點之間允許在多條邊,以便表示不同的關係 Graph資料結構比較簡單,由VertexRDD[VD]和EdgeRDD[ED]組成,VD和ED分別表示頂點和邊的抽象資料結構,實際等價於RDD[(VertexID,VD)]和RDD[Edge[ED]]兩種RDD RDD是scala語言的核心資料結構 GraphX使用Vertex-Cut(點分割)方式,即將圖的頂點集合劃分到不同的計算節點上,這樣可以減少分散式計算時的通訊和儲存消耗 GraphX內部維護了3個RDD來儲存一個圖:一個頂點表(Vertex Table),儲存頂點資訊;一個邊表(Edge Table),儲存邊資訊;一個是路由表(Routing Table),用來查詢頂點儲存在哪個計算節點上; GraphX常用API分為:資料查詢、資料轉換、結構轉換、關聯聚合、快取操作等 圖計算中最常用的API有3類:資料查詢類、關聯類和聚合類. 關聯類API是將一個圖和一個RDD通過節點ID(VertexID)關聯,來使圖獲得RDD中的資訊 Spark MLlib機器學習庫 MLlib是Spark為解決機器學習問題開發的庫,這些問題包括:分類、迴歸、聚類、協同過濾等 本質上,MLlib就是RDD上一系列可供呼叫的函式的集合 如果一個系統能夠通過執行某個過程而改進效能,這就是學習 機器學習能夠自動地從資料中學習“程式”,而這個“程式”不是人來編寫的 機器學習方法=模型+策略+演算法 模型(model):計算機如何表達要解決的問題,就是機器學習的模型,這裡使用一個函式f(x)來表達,具體函式實現是為求均值 策略(strategy):模型通常是有引數的,所以可能的模型有很多個,如何評估模型的優劣是機器學習的策略,這裡通過計算誤差平方和評估模型的優劣,這個誤差平方和通常叫做平方損失函式 演算法(algorithm):損失函式評估模型的優劣通常通過一個搜尋演算法來找到最優的模型,這裡通過函式求導來搜尋損失函式值最小的演算法,藉此來選擇最優模型 機器學習的一般過程大概可以分為幾個步驟: 1、準備訓練資料集合 2、選擇學習模型 3、行動學習策略 4、實現求解最優模型的演算法 5、使用最優模型對新資料進行預測 每一步驟都有很多細緻的工作要做: 首先,在準備訓練資料時,訓練資料要注意滿足以下兩點: 1、儘可能和應用場景同分布 2、儘可能充分,而且充足 模型選擇一般還要考慮以下幾點: 1、訓練時間:隨著觀測資料的變化,需要重新訓練模型,訓練新模型的時間不能太長 2、預測時間:模型上線工作的時候,對於新的輸入預測得分所需要的時間 3、模型的儲存:模型執行的時候需要多少記憶體空間 機器學習參考資料: 李航的《統計學習方法》,通俗易懂,系統地介紹了機器學習的常見演算法 Andrew Ng的視訊課程Machine Learning是目前的高校經典課程 Christpher M.Bishop的著作Pattern Recognition And Machine Learning是模式識別與機器學習的經典教材 Toby Segaran的《集體智慧程式設計》和Peter Harrington的《機器學習實戰》適合入門 MLlib庫簡介 基礎資料型別: 1、向量 向量(vector)通過import mlib.linalg.Vectors使用,MLlib支援稠密向量和稀疏向量。稀疏向量只儲存值非零的項. 如 Vectors.sparse(1095023,Array(1,7,31),Array(1.0,1.0,1.0)),1095023表示最大的下標,第一個陣列表示非零值的下標,第二個陣列表示非零項的具體值 2、labeled point labeled point是一個帶標註(label)的向量,也可以是稠密或者稀疏的,用於監督學習中表示一個特徵向量和一個標註。這裡規定標註是double型別 3、各種模型類 訓練演算法輸出就是各種模型類 MLlib庫主要有這幾塊:分類、迴歸、聚類、推薦等