1. 程式人生 > >【Spark深入學習 -12】Spark程序設計與企業級應用案例02

【Spark深入學習 -12】Spark程序設計與企業級應用案例02

提升 算子 lin count() roi println groupby 工作問題 衍生

----本節內容-------

1.遺留問題答疑

1.1 典型問題解答

1.2 知識點回顧

2.Spark編程基礎

2.1 Spark開發四部曲

2.2 RDD典型實例

2.3 非RDD典型實例

3.問題解答

4.參考資料

---------------------

技術分享

每一次答疑階段,我都會站在老師的角度去思考一下,如果是我,我應該怎麽回答,每每如此,不禁嚇出一身冷汗。有些問題看答案確實挺容易的,但當自己作為一個答疑者去思考,可能不一樣,因為快速確認一個答案的同時,你得否認很多的東西,腦海裏閃過很多的不確定,都要快速否決。董先生無疑是值得敬佩的,這方面體現的很內行,可見內功深厚,他是一個優秀的架構師,也是一個很好的傳道者。

1.遺留問題答疑

1.1 典型問題解答

1)spark通常和hadoop與yarn一同使用,在學spark過程中補充哪些知識點,可以整理列一下

HDFS 我覺得必須都得清楚,HDFS的東西其實很多,向老師說的文件基本的存儲策略,讀取操作,基本命令行、API等等;

YARN基本架構,ResoureManger,NodeManger,ApplicationMaster,container等等

其實我想說的是,最好是都過一遍吧,這些都是內功,都是相互依賴的,誰也離不了誰,還有比如Linux知識,Java基本知識,網絡基本知識等等,都是實際工作問題中要用到的知識點,我保證,都會用的到。

2)一個partition只會在一個節點上把,老師沒有將container的概念麽?老師能把partion,container,executor,task再細講一下麽?

臨時有事,這裏沒有聽。這個其實挺基礎的,但是回答出來比較考驗基本功。我把問題衍生一下,包含如下問題:

a).partion的基本概念是什麽,partition和block是什麽關系?

partition是RDD內部並行計算的一個計算單元,RDD的數據集在邏輯上被劃分為多個分片,每一個分片叫做一個partition。block是HDFS存儲的最小單元

其實這裏還有一個問題,就是Spark為什麽要進行分區,出於什麽原因要搞一個分區的概念?,我覺得對partiion的理解非常有幫助,答案請參考博文《Spark你媽喊你回家吃飯-08 說說Spark分區原理及優化方法》

b).container的概念是什麽,yarn是如何基於container進行資源分配的?

c).Spark應用程序的執行過程是什麽?

還有兩個問題都是任務提交模式問題,了解spark程序運行的各種模式基本都能解答,這幾個問題在前面其實都已經有很明確的講解過。

1.2 知識點回顧

1)核心概念RDD

彈性分布式數據集,把握以下幾點

· RDD是什麽數據集,他是一個描述數據在哪裏,對數據做什麽操作,以及操作之間的依賴關系的一個數據集

· 為什麽是彈性,主要是說他的存儲,既可以在內存,也可以在磁盤

· 分布式:分布在集群上

· 自動重構:失效夠可以自動重構

技術分享

2) 程序架構

application = 1個driver +多個executor

Excecutor = 多個task+cache

搞清楚driver做了什麽,Executor做了什麽,Task又做了什麽,如何配合

技術分享

3)Yarn分布式模式

a.client發送資源申請請求

b.RM發送通知NodeManger要調用資源,

c.NodeManger啟動AppAplicationMaster

d.AppAplicationMaster通知nodeManager啟動各個Executor

e.nodeManager啟動Executor

f.nodeManager向Driver回報實時執行情況,也會告知AppAplicationMaster

技術分享

2.Spark編程基礎

2.1 Spark開發四部曲

每一個spark程序都有一個main,我們稱之為driver,driver將程序分解成多個task, task分發到多個executor,從而完成並行化。Spark程序開發的四部曲總結起來如下:

· 創建SparkContext對象

封裝了spark執行環境信息

· 創建RDD

可從scala結合或hadoop數據集上創建

· 在RDD上執行轉換和action

spark提供了多種轉換和action函數

· 返回結果

保存到HDFS中,或者直接打印出來

2.2 RDD典型實例

啟動spark shell:

bin/spark-shell --master spark://master01:7077 --driver-class-path/usr/local/tdr_hadoop/spark/spark-1.6.0-bin-hadoop2.6/lib/mysql-connector-java-5.1.40-bin.jar

1) 設置conf

val conf=new org.apache.spark.SparkConf().setAppName("xx");

conf.set("spark.app.name","test");

conf.set("spark.driver.allowMultipleContexts","true");

val sc= new org.apache.spark.SparkContext(conf);

#讀取hdfs上的文件,如果你在hdfs-site.xml中配置hdfs集群的話

val a=sc.textFile("/tmp/test/core-site.xml");

a.collect().foreach(println);

#讀取hdfs上的文件

val a = sc.textFile("hdfs:///tmp/test/core-site.xml");

a.collect().foreach(println);

#讀取hdfs上的文件 ,這裏的端口是9000

vala=sc.textFile("hdfs://master02:9000/tmp/test/core-site.xml");

a.collect().foreach(println);

#讀取本地文件,這裏要註意,driver是在哪裏運行,如果driver不在本地運行,執行會報錯,找不到文件哦

val a=sc.textFile("file:///data/aa.txt");

a.collect().foreach(println);

報錯1:netty是spark通信框架,通信超時了。所以產生問題。把::1也就是ip6先註釋掉。但是還是沒有解決問題,後來把master HA給回退過去了,又恢復了,可能是HA的配置沒有配好導致

技術分享

報錯2:不能出現多個sc

設置參數:conf.set("spark.driver.allowMultipleContexts","true");

2)創建RDD

· 從HDFS中讀取數據

inputRdd=sc.textFile("hdfs://master01:8020/xx/xx");

· 從本地讀取數據

inputRdd=sc.textFile("file://data/input")

· 從Hbase讀取數據

技術分享

· 從自定義文件格式讀取數據

技術分享

2.3 RDD典型實例

1.回憶經典概念

再次提一下RDD的transformation與Action,現在假設你去面試,面試官問你,簡單說說你對Transformation和Action的理解,我個人覺得應該回答以下幾個知識點,可能你有很多要說的,但是要整理好思路,一個個知識點回答。

1).先說概念

先說RDD概念,RDD彈性分布式數據集。它記錄了2類東西一個是數據,一個是操作。數據,RDD記錄了數據是保存在內存還是磁盤,而且能控制數據分區。操作,RDD記錄了數據之上的各種操作,以及操作之間的依賴關系。最後說一下特點,RDD具有容錯,並行,是spark的核心概念。

接著引入Transformation,Transformation是根據特定規則將一個RDD變換為另一個RDD,記錄了RDD的演變過程,最後結果是RDD。

Action:將數據聚集起來執行實際的操作,觸發所有job,並且保存計算結果,最後結果是數據。

2).後說聯系

Transformation記錄RDD之間的轉換關系,為Action的觸發記錄RDD邏輯關系。

3).最後說區別

·Transformation不觸發job,返回RDD。

·Action觸發job,執行計算,返回數據。

2.典型例子

1)Transformation例子

例子1:

------------------------

val nums =sc.parallelize(List(1,2,3),3);

nums.collect();

//返回前k個

nums.take(2);

nums.count();

//累加求和

nums.reduce(_+_);

//寫數據到hdfs

nums.saveAsTextFile("/tmp/dxc/week2/output1");

import org.apache.spark.rdd._;

nums.saveAsSequenceFile("/tmp/dxc/week2/output2");

nums.saveAsSequenceFile("/tmp/dxc/week2/output2");

//讀取sequenceFile格式文件

lines = sc.sequenceFile("/tmp/dxc/week2/output2/") ;

---------

說明:

collection:轉為數組,單機概念,保存到driver裏面,非常危險,如果非常大

10G數據,2g內存,很可能打爆driver的內存,慎重

技術分享

從這裏可以看出,啟動了一個Executor,Executor上面起了2個task,因為是指定了2個分區,分區的個數決定了task的個數。

val nums =sc.parallelize(List(1,2,3),3);

nums.collect();

技術分享

這裏指定3個分區,啟動了3個task。

技術分享

保存到hdfs也是3個part,如果指定分區為2,那就保存為2個數據塊。

------------------------

例子2:

--------

valpets=sc.parallelize([("cat",1),("dog",2),("dog",2)]);

pets.reduceByKey(_+_);

pets.groupByKey();

pets.sortByKey();

pets.collect();

---------

說明:reduceByKey自動在本地進行combine

------------------------

例子3:

val line =sc.textFile("/tmp/test/core-site.xml");

val count=line.flatMap(line=>line.split(" ")).map(word=>(word,1)).reduceByKey(_+_);

count.collect();

line.foreach(println);

技術分享

例子4:

val left = sc.parallelize(List(("spark",1),("hadoop",1),("storm",1)))

val right=sc.parallelize(List(("scala",1),("hadoop",1),("spark",1)))

val result = left.join(right);

val result = left.cogroup(right);

result.collect();

result.foreach(println);

---------

說明:

cogroup:保證key唯一,key只占一行,可用做一個笛卡爾積,cogroup結果如下

(scala,(CompactBuffer(),CompactBuffer(1)))

(spark,(CompactBuffer(1),CompactBuffer(1)))

(hadoop,(CompactBuffer(1),CompactBuffer(1)))

(storm,(CompactBuffer(1),CompactBuffer()))

------------------------

還有其他的RDD操作可以參考之前寫的博客。

2.3 非RDD典型實例

1.Accumulator

Accumulator是spark提供的累加器,顧名思義,該變量只能夠增加。 只有driver能獲取到Accumulator的值(使用value方法),Task只能對其做增加操作(使用 +=)。你也可以在為Accumulator命名(不支持Python),這樣就會在spark web ui中顯示,可以幫助你了解程序運行的情況。用於監控和調試,記錄符合某類特征的數據數據等。

-----------------

//在driver中定義

val accum = sc.accumulator(0, "Example Accumulator")

//在task中進行累加

sc.parallelize(1 to10,5).foreach(x=> accum += 1)

//在driver中輸出 accum.value

//結果將返回10 res: 10

技術分享

---

說明:

在web ui中可以看到Accumulators在task進行累加的具體情況,driver將accumulator收集過來匯總

-----------------

3.廣播變量

Spark有兩種共享變量——累加器、廣播變量。廣播變量可以讓程序高效地向所有工作節點發送一個較大的只讀值,以供一個或多個Spark操作使用。高效分發大對象,比如字典map,集合set,每個executor一份,而不是每個task一份.

Spark中分布式執行的代碼需要傳遞到各個Executor的Task上運行。對於一些只讀、固定的數據(比如從DB中讀出的數據),每次都需要Driver廣播到各個Task上,這樣效率低下。廣播變量允許將變量只廣播(提前廣播)給各個Executor。該Executor上的各個Task再從所在節點的BlockManager獲取變量,而不是從Driver獲取變量,從而提升了效率。

一個Executor只需要在第一個Task啟動時,獲得一份Broadcast數據,之後的Task都從本節點的BlockManager中獲取相關數據。

技術分享

場景1:

val data = Set(1,2,3,4,5,6,7)

val rdd=sc.parallelize(1 to 6,2)

val result =rdd.map(_=>data.size)

result.collect();

場景2:

val data = Set(1,2,3,4,5,6,7)

val bddata=sc.broadcast(data)

val rdd=sc.parallelize(1 to 6,2)

val result =rdd.map(_=>data.size)

result.collect();

區別是:data數值的獲取方式,場景1 executor 每次都要從driver那裏獲取,要和交互7次,而場景2使用廣播變量,將data分發到executor,那麽driver和executor只需要交互一次。

4.cache的使用

val data=sc.textFile("/tmp/tbMonitordataResultHbase");

val a = data.count

println(a);

執行了29秒

技術分享

val data=sc.textFile("/tmp/tbMonitordataResultHbase");

data.cache()

val a = data.count

println(a);

技術分享

cache了100多M

val a = data.count

println(a);

data.persist(org.apache.spark.storage.StorageLevel. MEMORY_ONLY)

3.問題解答

1).spark計算時,數據會寫入磁盤麽,什麽條件下寫入磁盤

shuffle的時候都會寫到磁盤,帶shuffle的時候會寫入到磁盤,有哪算子呢?

2).報錯日誌裏的錯誤是,ERRORCoarsegRainedExcecutorBackend:RECEIVED SIGNAL TERM ,這個可能是什麽原因

虛擬內存超了,被yarn的nodemanager殺了,配置這個參數, 如果是1G內存,可以達到1.1G的使用內存

<property>

<name>yarn.nodemanager.vmem-pmem-ratio</name>

<value>10</value>

</property>

3).中文亂碼問題,出現中文亂碼和spark沒有關系

4).resourceManager的工作負擔重不重,實際部署時候是否有必要單獨到一臺機器上?

機器的規模,50或者100,可以nodemanager合並

5).hbase預分區個數和spark過程中reduce個數相同麽

設置,map階段和region一樣,reduce取決於task

4.參考資料

1.董西成ppt

【Spark深入學習 -12】Spark程序設計與企業級應用案例02