1. 程式人生 > >如何在萬億級別規模的資料量上使用Spark

如何在萬億級別規模的資料量上使用Spark

一、前言

Spark作為大資料計算引擎,憑藉其快速、穩定、簡易等特點,快速的佔領了大資料計算的領域。本文主要為作者在搭建使用計算平臺的過程中,對於Spark理解,希望能給讀者一些學習的思路。文章內容為介紹Spark在DataMagic平臺扮演的角色、如何快速掌握Spark以及DataMagic平臺是如何使用好Spark的。

二、Spark在DataMagic平臺中的角色

圖 2-1

整套架構的主要功能為日誌接入、查詢(實時和離線)、計算。離線計算平臺主要負責計算這一部分,系統的儲存用的是COS(公司內部儲存),而非HDFS。

下面將主要介紹Spark on Yarn這一架構,抽取出來即圖2-2所示,可以看到Spark on yarn的執行流程。

圖2-2

三、如何快速掌握Spark

對於理解Spark,我覺得掌握下面4個步驟就可以了。

1.理解Spark術語

對於入門,學習Spark可以通過其架構圖,快速瞭解其關鍵術語,掌握了關鍵術語,對Spark基本上就有認識了,分別是結構術語Shuffle、Patitions、MapReduce、Driver、Application Master、Container、Resource Manager、Node Manager等。API程式設計術語關鍵RDD、DataFrame,結構術語用於瞭解其執行原理,API術語用於使用過程中編寫程式碼,掌握了這些術語以及背後的知識,你就也知道Spark的執行原理和如何程式設計了。

2.掌握關鍵配置

Spark在執行的時候,很多執行資訊是通過配置檔案讀取的,一般在spark-defaults.conf,要把Spark使用好,需要掌握一些關鍵配置,例如跟執行記憶體相關的,spark.yarn.executor.memoryOverhead、spark.executor.memory,跟超時相關的spark.network.timeout等等,Spark很多資訊都可以通過配置進行更改,因此對於配置需要有一定的掌握。但是使用配置時,也要根據不同的場景,這個舉個例子,例如spark.speculation配置,這個配置主要目的是推測執行,當worker1執行慢的情況下,Spark會啟動一個worker2,跟worker1執行相同的任務,誰先執行完就用誰的結果,從而加快計算速度,這個特性在一般計算任務來說是非常好的,但是如果是執行一個出庫到Mysql的任務時,同時有兩個一樣的worker,則會導致Mysql的資料重複。因此我們在使用配置時,一定要理解清楚,直接google spark conf就會列出很多配置了。

3.使用好Spark的並行

我們之所以使用Spark進行計算,原因就是因為它計算快,但是它快的原因很大在於它的並行度,掌握Spark是如何提供並行服務的,從而是我們更好的提高並行度。

對於提高並行度,對於RDD,需要從幾個方面入手,1、配置num-executor。2、配置executor-cores。3、配置spark.default.parallelism。三者之間的關係一般為spark.default.parallelism=num-executors*executor-cores的2~3倍較為合適。對於Spark-sql,則設定spark.sql.shuffle.partitions、num-executor和executor-cores。

4.學會如何修改Spark程式碼

新手而言,特別是需要對Spark進行優化或者修改時,感到很迷茫,其實我們可以首先聚焦於區域性,而Spark確實也是模組化的,不需要覺得Spark複雜並且難以理解,我將從修改Spark程式碼的某一角度來進行分析。

首先,Spark的目錄結構如圖3-1所示,可以通過資料夾,快速知道sql、graphx等程式碼所在位置,而Spark的執行環境主要由jar包支撐,如圖3-2所示,這裡擷取部分jar包,實際上遠比這多,所有的jar包都可以通過Spark的原始碼進行編譯,當需要修改某個功能時,僅需要找到相應jar包的程式碼,修改之後,編譯該jar包,然後進行替換就行了。

圖3-1

圖3-2

而對於編譯原始碼這塊,其實也非常簡單,安裝好maven、scala等相關依賴,下載原始碼進行編譯即可,掌握修改原始碼技巧對於使用好開源專案十分重要。

四、DataMagic平臺中的Spark

Spark在DataMagic中使用,也是在邊使用邊探索的過程,在這過程中,列舉了其比較重要的特點。

1.快速部署

在計算中,計算任務的數量以及資料的量級每天都會發生變化,因此對於Spark平臺,需要有快速部署的特性,在實體機上,有一鍵部署指令碼,只要執行一個指令碼,則可以馬上上線一個擁有128G記憶體、48cores的實體機,但是實體機通常需要申請報備才能獲得,因此還會有docker來支援計算資源。

2.巧用配置優化計算

Spark大多數屬性都是通過配置來實現的,因此可以通過配置動態修改Spark的執行行為,這裡舉個例子,例如通過配置自動調整exector的數量。

2.1 在nodeManager的yarn-site.xml新增配置

yarn.nodemanager.aux-services
mapreduce_shuffle,spark_shuffle
yarn.nodemanager.aux-services.spark_shuffle.class
org.apache.spark.network.yarn.YarnShuffleService

2.2 將spark-2.2.0-yarn-shuffle.jar檔案拷貝到hadoop-yarn/lib目錄下(即yarn的庫目錄)

2.3 在Spark的spark-default.xml新增配置

spark.dynamicAllocation.minExecutors 1 #最小Executor數
spark.dynamicAllocation.maxExecutors 100 #最大Executor數

通過這種配置,可以達到自動調整exector的目的。

3.合理分配資源

作為一個平臺,其計算任務肯定不是固定的,有的資料量多,有的資料量少,因此需要合理分配資源,例如有些千萬、億級別的資料,分配20核計算資源就足夠了。但是有些資料量級達到百億的,就需要分配更多的計算資源了。參考第三章節的第3點。

4.貼合業務需求

計算的目的其實就是為了服務業務,業務的需求也理應是平臺的追求,當業務產生合理需求時,平臺方也應該儘量去滿足。如為了支援業務高併發、高實時性查詢的需求下,Spark在資料出庫方式上,支援了Cmongo的出庫方式。

sc = SparkContext(conf=conf) sqlContext = SQLContext(sc) database = d = dict((l.split('=') for l in dbparameter.split())) parquetFile = sqlContext.read.parquet(file_name) parquetFile.registerTempTable(tempTable) result = sqlContext.sql(sparksql) url = "mongodb://"+database['user']+":"+database['password']+"@"+database['host']+":"+database['port'] result.write.format("com.mongodb.spark.sql").mode('overwrite').options(uri=url,database=database['dbname'],collection=pg_table_name).save()

5.適用場景

Spark作為通用的計算平臺,在普通的應用的場景下,一般而言是不需要額外修改的,但是DataMagic平臺上,我們需要“在前行中改變”。這裡舉個簡單的場景,在日誌分析中,日誌的量級達到千億/日的級別,當底層日誌的某些欄位出現utf-8編碼都解析不了的時候,在Spark任務中進行計算會發生異常,然後失敗,然而如果在資料落地之前對亂碼資料進行過濾,則有可能會影響資料採集的效率,因此最終決定在Spark計算過程中解決中這個問題,因此在Spark計算時,對資料進行轉換的程式碼處加上異常判斷來解決該問題。

6.Job問題定位

Spark在計算任務失敗時候,需要去定位失敗原因,當Job失敗是,可以通過yarn logs -applicationId application 來合併任務log,開啟log,定位到Traceback,一般可以找到失敗原因。一般而言,失敗可以分成幾類。

a. 程式碼問題,寫的Sql有語法問題,或者Spark程式碼有問題。
b. Spark問題,舊Spark版本處理NULL值等。
c. 任務長時間Running狀態,則可能是資料傾斜問題。
d. 任務記憶體越界問題。

7.叢集管理

Spark叢集在日常使用中,也是需要運營維護的,從而運營維護,發現其存在的問題,不斷的對叢集進行優化,這裡從以下幾個方面進行介紹,通過運營手段來保障叢集的健壯性和穩定性,保證任務順利執行。

a. 定時檢視是否有lost node和unhealthy node,可以通過指令碼來定時設定告警,若存在,則需要進行定位處理。

b. 定時掃描hdfs的執行log是否滿了,需要定時刪除過期log。

c. 定時掃描叢集資源是否滿足計算任務使用,能夠提前部署資源。

五、總結

本文主要是通過作者在搭建使用計算平臺的過程中,寫出對於Spark的理解,並且介紹了Spark在當前的DataMagic是如何使用的,當前平臺已經用於架平離線分析,每天計算分析的資料量已經達到千億~萬億級別。

長按識別關注我們,每天都有精彩內容分享哦!~