【Spark深入學習 -12】Spark程序設計與企業級應用案例02
----本節內容-------
1.遺留問題答疑
1.1 典型問題解答
1.2 知識點回顧
2.Spark編程基礎
2.1 Spark開發四部曲
2.2 RDD典型實例
2.3 非RDD典型實例
3.問題解答
4.參考資料
---------------------
每一次答疑階段,我都會站在老師的角度去思考一下,如果是我,我應該怎麽回答,每每如此,不禁嚇出一身冷汗。有些問題看答案確實挺容易的,但當自己作為一個答疑者去思考,可能不一樣,因為快速確認一個答案的同時,你得否認很多的東西,腦海裏閃過很多的不確定,都要快速否決。董先生無疑是值得敬佩的,這方面體現的很內行,可見內功深厚,他是一個優秀的架構師,也是一個很好的傳道者。
1.遺留問題答疑
1.1 典型問題解答
1)spark通常和hadoop與yarn一同使用,在學spark過程中補充哪些知識點,可以整理列一下
HDFS 我覺得必須都得清楚,HDFS的東西其實很多,向老師說的文件基本的存儲策略,讀取操作,基本命令行、API等等;
YARN基本架構,ResoureManger,NodeManger,ApplicationMaster,container等等
其實我想說的是,最好是都過一遍吧,這些都是內功,都是相互依賴的,誰也離不了誰,還有比如Linux知識,Java基本知識,網絡基本知識等等,都是實際工作問題中要用到的知識點,我保證,都會用的到。
2)一個partition只會在一個節點上把,老師沒有將container的概念麽?老師能把partion,container,executor,task再細講一下麽?
臨時有事,這裏沒有聽。這個其實挺基礎的,但是回答出來比較考驗基本功。我把問題衍生一下,包含如下問題:
a).partion的基本概念是什麽,partition和block是什麽關系?
partition是RDD內部並行計算的一個計算單元,RDD的數據集在邏輯上被劃分為多個分片,每一個分片叫做一個partition。block是HDFS存儲的最小單元
其實這裏還有一個問題,就是Spark為什麽要進行分區,出於什麽原因要搞一個分區的概念?,我覺得對partiion的理解非常有幫助,答案請參考博文《Spark你媽喊你回家吃飯-08 說說Spark分區原理及優化方法》
b).container的概念是什麽,yarn是如何基於container進行資源分配的?
c).Spark應用程序的執行過程是什麽?
還有兩個問題都是任務提交模式問題,了解spark程序運行的各種模式基本都能解答,這幾個問題在前面其實都已經有很明確的講解過。
1.2 知識點回顧
1)核心概念RDD
彈性分布式數據集,把握以下幾點
· RDD是什麽數據集,他是一個描述數據在哪裏,對數據做什麽操作,以及操作之間的依賴關系的一個數據集
· 為什麽是彈性,主要是說他的存儲,既可以在內存,也可以在磁盤
· 分布式:分布在集群上
· 自動重構:失效夠可以自動重構
2) 程序架構
application = 1個driver +多個executor
Excecutor = 多個task+cache
搞清楚driver做了什麽,Executor做了什麽,Task又做了什麽,如何配合
3)Yarn分布式模式
a.client發送資源申請請求
b.RM發送通知NodeManger要調用資源,
c.NodeManger啟動AppAplicationMaster
d.AppAplicationMaster通知nodeManager啟動各個Executor
e.nodeManager啟動Executor
f.nodeManager向Driver回報實時執行情況,也會告知AppAplicationMaster
2.Spark編程基礎
2.1 Spark開發四部曲
每一個spark程序都有一個main,我們稱之為driver,driver將程序分解成多個task, task分發到多個executor,從而完成並行化。Spark程序開發的四部曲總結起來如下:
· 創建SparkContext對象
封裝了spark執行環境信息
· 創建RDD
可從scala結合或hadoop數據集上創建
· 在RDD上執行轉換和action
spark提供了多種轉換和action函數
· 返回結果
保存到HDFS中,或者直接打印出來
2.2 RDD典型實例
啟動spark shell:
bin/spark-shell --master spark://master01:7077 --driver-class-path/usr/local/tdr_hadoop/spark/spark-1.6.0-bin-hadoop2.6/lib/mysql-connector-java-5.1.40-bin.jar
1) 設置conf
val conf=new org.apache.spark.SparkConf().setAppName("xx");
conf.set("spark.app.name","test");
conf.set("spark.driver.allowMultipleContexts","true");
val sc= new org.apache.spark.SparkContext(conf);
#讀取hdfs上的文件,如果你在hdfs-site.xml中配置hdfs集群的話
val a=sc.textFile("/tmp/test/core-site.xml");
a.collect().foreach(println);
#讀取hdfs上的文件
val a = sc.textFile("hdfs:///tmp/test/core-site.xml");
a.collect().foreach(println);
#讀取hdfs上的文件 ,這裏的端口是9000
vala=sc.textFile("hdfs://master02:9000/tmp/test/core-site.xml");
a.collect().foreach(println);
#讀取本地文件,這裏要註意,driver是在哪裏運行,如果driver不在本地運行,執行會報錯,找不到文件哦
val a=sc.textFile("file:///data/aa.txt");
a.collect().foreach(println);
報錯1:netty是spark通信框架,通信超時了。所以產生問題。把::1也就是ip6先註釋掉。但是還是沒有解決問題,後來把master HA給回退過去了,又恢復了,可能是HA的配置沒有配好導致
報錯2:不能出現多個sc
設置參數:conf.set("spark.driver.allowMultipleContexts","true");
2)創建RDD
· 從HDFS中讀取數據
inputRdd=sc.textFile("hdfs://master01:8020/xx/xx");
· 從本地讀取數據
inputRdd=sc.textFile("file://data/input")
· 從Hbase讀取數據
· 從自定義文件格式讀取數據
2.3 RDD典型實例
1.回憶經典概念
再次提一下RDD的transformation與Action,現在假設你去面試,面試官問你,簡單說說你對Transformation和Action的理解,我個人覺得應該回答以下幾個知識點,可能你有很多要說的,但是要整理好思路,一個個知識點回答。
1).先說概念
先說RDD概念,RDD彈性分布式數據集。它記錄了2類東西一個是數據,一個是操作。數據,RDD記錄了數據是保存在內存還是磁盤,而且能控制數據分區。操作,RDD記錄了數據之上的各種操作,以及操作之間的依賴關系。最後說一下特點,RDD具有容錯,並行,是spark的核心概念。
接著引入Transformation,Transformation是根據特定規則將一個RDD變換為另一個RDD,記錄了RDD的演變過程,最後結果是RDD。
Action:將數據聚集起來執行實際的操作,觸發所有job,並且保存計算結果,最後結果是數據。
2).後說聯系
Transformation記錄RDD之間的轉換關系,為Action的觸發記錄RDD邏輯關系。
3).最後說區別
·Transformation不觸發job,返回RDD。
·Action觸發job,執行計算,返回數據。
2.典型例子
1)Transformation例子
例子1:
------------------------
val nums =sc.parallelize(List(1,2,3),3);
nums.collect();
//返回前k個
nums.take(2);
nums.count();
//累加求和
nums.reduce(_+_);
//寫數據到hdfs
nums.saveAsTextFile("/tmp/dxc/week2/output1");
import org.apache.spark.rdd._;
nums.saveAsSequenceFile("/tmp/dxc/week2/output2");
nums.saveAsSequenceFile("/tmp/dxc/week2/output2");
//讀取sequenceFile格式文件
lines = sc.sequenceFile("/tmp/dxc/week2/output2/") ;
---------
說明:
collection:轉為數組,單機概念,保存到driver裏面,非常危險,如果非常大
10G數據,2g內存,很可能打爆driver的內存,慎重
從這裏可以看出,啟動了一個Executor,Executor上面起了2個task,因為是指定了2個分區,分區的個數決定了task的個數。
val nums =sc.parallelize(List(1,2,3),3);
nums.collect();
這裏指定3個分區,啟動了3個task。
保存到hdfs也是3個part,如果指定分區為2,那就保存為2個數據塊。
------------------------
例子2:
--------
valpets=sc.parallelize([("cat",1),("dog",2),("dog",2)]);
pets.reduceByKey(_+_);
pets.groupByKey();
pets.sortByKey();
pets.collect();
---------
說明:reduceByKey自動在本地進行combine
------------------------
例子3:
val line =sc.textFile("/tmp/test/core-site.xml");
val count=line.flatMap(line=>line.split(" ")).map(word=>(word,1)).reduceByKey(_+_);
count.collect();
line.foreach(println);
例子4:
val left = sc.parallelize(List(("spark",1),("hadoop",1),("storm",1)))
val right=sc.parallelize(List(("scala",1),("hadoop",1),("spark",1)))
val result = left.join(right);
val result = left.cogroup(right);
result.collect();
result.foreach(println);
---------
說明:
cogroup:保證key唯一,key只占一行,可用做一個笛卡爾積,cogroup結果如下
(scala,(CompactBuffer(),CompactBuffer(1)))
(spark,(CompactBuffer(1),CompactBuffer(1)))
(hadoop,(CompactBuffer(1),CompactBuffer(1)))
(storm,(CompactBuffer(1),CompactBuffer()))
------------------------
還有其他的RDD操作可以參考之前寫的博客。
2.3 非RDD典型實例
1.Accumulator
Accumulator是spark提供的累加器,顧名思義,該變量只能夠增加。 只有driver能獲取到Accumulator的值(使用value方法),Task只能對其做增加操作(使用 +=)。你也可以在為Accumulator命名(不支持Python),這樣就會在spark web ui中顯示,可以幫助你了解程序運行的情況。用於監控和調試,記錄符合某類特征的數據數據等。
-----------------
//在driver中定義
val accum = sc.accumulator(0, "Example Accumulator")
//在task中進行累加
sc.parallelize(1 to10,5).foreach(x=> accum += 1)
//在driver中輸出 accum.value
//結果將返回10 res: 10
---
說明:
在web ui中可以看到Accumulators在task進行累加的具體情況,driver將accumulator收集過來匯總
-----------------
3.廣播變量
Spark有兩種共享變量——累加器、廣播變量。廣播變量可以讓程序高效地向所有工作節點發送一個較大的只讀值,以供一個或多個Spark操作使用。高效分發大對象,比如字典map,集合set,每個executor一份,而不是每個task一份.
Spark中分布式執行的代碼需要傳遞到各個Executor的Task上運行。對於一些只讀、固定的數據(比如從DB中讀出的數據),每次都需要Driver廣播到各個Task上,這樣效率低下。廣播變量允許將變量只廣播(提前廣播)給各個Executor。該Executor上的各個Task再從所在節點的BlockManager獲取變量,而不是從Driver獲取變量,從而提升了效率。
一個Executor只需要在第一個Task啟動時,獲得一份Broadcast數據,之後的Task都從本節點的BlockManager中獲取相關數據。
場景1:
val data = Set(1,2,3,4,5,6,7)
val rdd=sc.parallelize(1 to 6,2)
val result =rdd.map(_=>data.size)
result.collect();
場景2:
val data = Set(1,2,3,4,5,6,7)
val bddata=sc.broadcast(data)
val rdd=sc.parallelize(1 to 6,2)
val result =rdd.map(_=>data.size)
result.collect();
區別是:data數值的獲取方式,場景1 executor 每次都要從driver那裏獲取,要和交互7次,而場景2使用廣播變量,將data分發到executor,那麽driver和executor只需要交互一次。
4.cache的使用
val data=sc.textFile("/tmp/tbMonitordataResultHbase");
val a = data.count
println(a);
執行了29秒
val data=sc.textFile("/tmp/tbMonitordataResultHbase");
data.cache()
val a = data.count
println(a);
cache了100多M
val a = data.count
println(a);
data.persist(org.apache.spark.storage.StorageLevel. MEMORY_ONLY)
3.問題解答
1).spark計算時,數據會寫入磁盤麽,什麽條件下寫入磁盤
shuffle的時候都會寫到磁盤,帶shuffle的時候會寫入到磁盤,有哪算子呢?
2).報錯日誌裏的錯誤是,ERRORCoarsegRainedExcecutorBackend:RECEIVED SIGNAL TERM ,這個可能是什麽原因
虛擬內存超了,被yarn的nodemanager殺了,配置這個參數, 如果是1G內存,可以達到1.1G的使用內存
<property>
<name>yarn.nodemanager.vmem-pmem-ratio</name>
<value>10</value>
</property>
3).中文亂碼問題,出現中文亂碼和spark沒有關系
4).resourceManager的工作負擔重不重,實際部署時候是否有必要單獨到一臺機器上?
機器的規模,50或者100,可以nodemanager合並
5).hbase預分區個數和spark過程中reduce個數相同麽
設置,map階段和region一樣,reduce取決於task
4.參考資料
1.董西成ppt
【Spark深入學習 -12】Spark程序設計與企業級應用案例02