1. 程式人生 > >SparkCore基礎(一)

SparkCore基礎(一)

數據操作 步驟 kcon 用戶訪問 引擎 cdh 管理 你會 gpo

* SparkCore基礎(一)

學習Spark,首先要熟悉Scala,當然你說你會Python或者Java能不能玩Spark?能!但是不推薦,首推Scala,因為Scala非常便捷,而且Scala有非常好的交互式編程體驗(當然了,在這裏,Python也不差)。其次呢,我們要對Hadoop的MapReduce要是有一定的了解。不然,學習起來,是會稍微費點功夫。好,不扯這麽多了,相關的故事啊,疑問啊可以評論留言詢問或者百度查詢,我們現在直接進入正題。

技術分享圖片

Spark特征簡述

* Spark是什麽

官方描述:Spark is a fast and general engine for large-scale data processing

** Spark是一個快速的,通用的,大數據規模的運算引擎。這是一個非常精準的描述。

** Spark是基於MapReducer實現的通用的分布式計算框架,所以它繼承了MapReduce的優點,同時還支持將Job運算任務產生的中間結果和最終結果保存在內存中。

* Spark優勢

** Spark的中間數據放到內存中,對於叠代運算效率更高

** 運算速度奇快

** 更靈活的數據操作,比如:map, filter, flatMap, sample, groupByKey, reduceByKey, union, join, cogroup, mapValues, sort,partionBy等等

* Spark不適合做什麽

** 不適合做增量變化的應用模型

* Spark支持語言

Java、Scala、Python

* 適用場景討論

** 適用於需要多次操作特定數據集的應用場合。需要反復操作的次數越多,所需讀取的數據量越大,受益越大,數據量小但是計算密集度較大的場*合,受益就相對較小。

Spark下載

一般情況下,我們使用spark之前,都需要下載源碼,然後根據自己的集群環境(也就是Hadoop版本)進行編譯,然後再安裝使用。

Spark下載:

http://spark.apache.org/downloads.html

打開頁面後,做出如下選擇,即可開始下載源碼

技術分享圖片 在這裏我們使用1.6.1的源碼

Spark編譯

在此我們簡單介紹兩種方式:

** SBT編譯

這是一個類似Maven的倉庫,基於Scala

** Maven編譯

命令:

技術分享圖片

** make-distribution.sh編譯

修改源碼根目錄下的make-distribution.sh文件,修改內容如圖:

技術分享圖片

依次為:配置Spark版本,Scala版本,Hadoop版本,是否支持Hive,1為支持

配置鏡像:註意,如果編譯的是原版,請添加此鏡像,如果編譯的是CDH版本的,請註意去掉此鏡像。

技術分享圖片

配置域名解析服務器:

$ sudo vi /etc/resolv.conf,配置如下:

nameserver 8.8.8.8

nameserver 8.8.4.4

最後執行命令:

技術分享圖片 註意要支持yarn和hive

世界充滿愛之編譯好的Spark傳送門(分別包含包含Apache和CDH版本的):

鏈接:http://pan.baidu.com/s/1eRBJtjs 密碼:t03u

Spark運行模式

** Local

即本地模式

** Standalone

即Spark自帶的集群模式,分為Master節點和Worker節點,顧名思義,一個管理者,多個幹活的。:)

** Yarn

國內相當主流的一種運行部署模式,只是目前Yarn分配的Container是不能夠動態伸縮的,後續可能會考慮支持。

** Mesos

Spark在出生的時候就考慮支持該框架,很靈活,但國內使用似乎不多,感興趣請自行研究之。

Spark安裝部署

將Spark解壓出來,然後到conf目錄下,自己將template文件拷貝出文後提到的文件進行配置即可,在之前的章節我們已經提到過很多次,此步驟想必應該非常熟練了,不再贅述了。

Local模式:

spark-env.sh 文件配置如下:

技術分享圖片

Spark測試案例之Local模式

在案例開始前,請確保你的HDFS是可用的,並且spark-shell在active的NameNode節點上運行。此刻建議你已經熟知Hadoop中MapReduce的編寫過程以及運行原理。

案例一:基於本地模式的WordCount,words.txt中的內容:

技術分享圖片

Step1、進入spark根目錄使用$ bin/spark-shell命令啟動spark,如下圖:

技術分享圖片

Step2、讀取/input/words.txt文件,嘗試檢查一下words.txt文件有多少行數據,操作如下:

scala> val rdd = sc.textFile("/input/words.txt")

技術分享圖片

scala> rdd.count

技術分享圖片 當然了,統計詞頻,這個步驟可以省略,在此只是想驗證一下自己讀取到的數據有沒有問題

好,大家可以看到,有3行數據,每一行都有若幹英文單詞。那麽這裏面涉及到幾個問題需要拿出來討論一下:

1、什麽是rdd?

RDD is a fault-tolerant collection of elements that can be operated on in parallel,RDD是彈性分布式數據集,全稱Resilient Distributed Datasets,具有分布式,高容錯性等特點,在這裏,剛開始接觸的話,你可暫且理解為一個集合就可以了,一個數據集合。

2、什麽是sc?

sc的全稱是SparkContext,即Spark的上下文對象,這個理解可以類比於在Hadoop階段我們在MapReduce中接觸到的Context,不管是讀取文件還是其他數據操作,都依賴於SparkContext的實例化。在這裏,sc即一個實例化好的SparkContext對象。

我們通過sc.textFile方法讀取到HDFS系統中存放的words.txt文件信息,該方法返回一個RDD對象,之後我們通過rdd對象調用count方法,來查看讀取到的文件中數據有多少行。

Step3、利用得到的rdd對象進行數據的拆分,即,每一個單詞都拆分成一個RDD對象,比如類似這樣的理解:RDD<String> rdd = new RDD("hadoop");那麽使用scala在spark中如何做呢?請看:

scala> val wordRdd = rdd.flatMap(line => line.split(" "))

技術分享圖片

然後我們使用wordRdd顯示一下第一個單詞看一看:

scala> wordRdd.first

技術分享圖片

Step4、將分割出來的每一個單詞做Map映射

scala> val mapRdd = wordRdd.map(word => (word, 1))

技術分享圖片

這是scala的高階函數,註意不理解請重新復習Scala語言。該語句的意思是:將wordRdd中存放的單詞映射為一個tuple元組,元組中有兩個元素,第一個元素為單詞,第二個元素為當前單詞本次的個數,固定為1,這個1就像Hadoop階段中Map的LongWritable一樣,這個word就像Text一樣。

Step5、這一步要做的就是講map映射出來的數據集進行reduce運算

scala> val reduceRdd = mapRdd.reduceByKey((x, y) => x + y)

技術分享圖片

該行代碼的意思是將某一個單詞的好多個1(當然如果進行Combine操作了,也許可能不是多個1,如果你無法理解我這一句在說什麽,請繼續前進,然後重新復習Hadoop的MapReduce相關知識點)進行相加運算。

Step6、查看一下結果

scala> reduceRdd.collect

技術分享圖片

顯示出來了,而且執行過程非常的迅速,你懂得。

當然了,以上的操作,完全可以使用一句話來實現,並且代碼的體現形式可以非常騷氣,如:

scala> sc.textFile("/input/words.txt").flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).collect

技術分享圖片

Step7、當然了結果也可以輸出到HDFS系統當中,比如:

scala> reduceRdd.saveAsTextFile("/output/spark/test01")

案例二:基於案例一,進行二次排序,即,將統計出的詞頻結果按照降序或者升序排列

sc.textFile("/input/words.txt")

.flatMap(_.split(" "))

.map((_, 1))

.reduceByKey(_ + _)

.map(x => (x._2, x._1))

.sortByKey()

.map(x => (x._2, x._1))

.collect

Step1、得到案例一的統計好的詞頻結果,然後做一個map映射,將單詞和單詞出現的次數顛倒過來,也就是說,(hadoop, 1)變成(1, hadoop),這樣做的原因是因為OrderedRDDFunctions類中有一個方法叫做:sortByKey,意思是按照Key的大小進行排序,默認參數是升序,如圖:

技術分享圖片

為了使用該方法,我們這麽做:

上一個案例,我們得到:

val reduce = sc.textFile("/input/words.txt").flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)

然後:

val reverseRdd = reduce.map(x => (x._2, x._1))

然後我們看一眼這個RDD集合:

技術分享圖片

Step2、直接使用sortByKey進行默認排序

val sortRdd = reverseRdd.sortByKey()

Step3、排序結束你不得給人家再反轉回來?所以:

sortRdd.map(x => (x._2, x._1)).collect,如圖:

技術分享圖片

當然了,以上分解步驟一氣呵成最爽快:

sc.textFile("/input/words.txt")

.flatMap(_.split(" "))

.map((_, 1))

.reduceByKey(_ + _)

.map(x => (x._2, x._1))

.sortByKey()

.map(x => (x._2, x._1))

.collect

Step4、當然了,sortByKey方法也可以實現倒序,如:

sc.textFile("/input/words.txt")

.flatMap(_.split(" "))

.map((_, 1))

.reduceByKey(_ + _)

.map(x => (x._2, x._1))

.sortByKey(false)

.map(x => (x._2, x._1))

.collect

Step5、二次排序還可以使用top

top源碼:

技術分享圖片

這是一個柯裏化的函數,top命令是查看前多少條數據,如圖可見,在查看之時,元素也是排序好的

比如:

sc.textFile("/input/words.txt")

.flatMap(_.split(" "))

.map((_, 1))

.reduceByKey(_ + _)

.map(x => (x._2, x._1))

.top(12)

輸出如圖:

技術分享圖片

Spark運行模式之Standalone

配置:spark-env.sh

技術分享圖片

Master節點:SPARK_MASTER_IP=z01

Master節點端口號:SPARK_MASTER_PORT=7077

Master WebUI端口號:SPARK_MASTER_WEBUI_PORT=8080

Worker節點可用CPU核心數:SPARK_WORKER_CORES=2

Worker可用內存:SPARK_WORKER_MEMORY=2g

Worker端口號:SPARK_WORKER_PORT=7078

Worker WebUI端口號:SPARK_WORKER_WEBUI_PORT=8081

允許在每臺機器上開啟幾個Worker進程,默認為1個SPARK_WORKER_INSTANCES=1

配置:slaves

即配置允許哪幾臺機器當做Woker節點

技術分享圖片

以上配置完成後,scp到其他集群節點

啟動:

Master

$ sbin/start-master.sh

Worker

$ sbin/start-slaves.sh

完成後通過z01:8080端口訪問即可如圖所示:

技術分享圖片

也可以JPS看一下進程:

技術分享圖片

在Standalone上運行Spark

首先,查看一下spark的幫助文檔來引導該怎麽做:

$ bin/spark-shell --help

技術分享圖片

註意紅框內的內容,那麽接下來,我們應該知道怎麽讓spark運行在standalone上了:

$ bin/spark-shell --master spark://z01:7077

如圖:註意紅框內容

技術分享圖片

尖叫提示:如果直接不加參數的使用spark-shell方式啟動,則還是在本地模式(Local)啟動的。

Spark測試案例之Standalone模式

案例一:跑一個一氣呵成的WordCount

scala> sc.textFile("/input/words.txt").flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).collect

WEBUI,http://192.168.122.200:4040/jobs/ 如圖:

技術分享圖片

可以看到,有一個Job任務已經運行完畢了。

案例二:做一個每日的PV分析

Step1、首先,我們將網站的訪問數據導入到hive當中,執行:

$ cat hql-file/track-log.hql

其中track-log.hql文件如下:

技術分享圖片 該部分內容可以參看Hive框架基礎(一)

Step2、通過Hive查看track_log文件在哪

hive> desc formatted track_log;

如圖:註意紅框內容,對於我們來講,有用的即:/user/hive/warehouse/track_log/2015082818

技術分享圖片 技術分享圖片

Step3、將日誌數據讀入到RDD中等待分析

scala> val rdd = sc.textFile("/user/hive/warehouse/track_log/2015082818")

Step4、清洗無效的數據,即空白行,以及url字段為空的,我們要過濾掉

1、先過濾空白行

2、再分割字段值

3、最後過濾url字段為空的

綜合來寫:

scala> val validRdd = rdd.filter(line => line.length > 0).map(_.split("\t")).filter(arr => arr(1).length > 0)

當然了,此時你可以count一下,看看過濾後剩下多少數據

scala> validRdd.count

技術分享圖片

Step4、將URL做map映射,比如做出這樣的映射:(今日日期, 1)

那麽今日的日期在tracktime字段,屬於分割後的數組的第17個索引處

在hive中我們查看一下該日期的格式:

hive> select tracktime from track_log limit 1;

如圖:

技術分享圖片

那麽截取出2015-08-28應該很容易,所以:

scala> val mapRdd = validRdd.map(arr => (arr(17).substring(0, 10), 1))

Step5、你懂得,再來一個Reduce即可

scala> val reduceRdd = mapRdd.reduceByKey(_ + _)

完事之後可以查看一下結果:

scala> reduceRdd.collect

如圖:

技術分享圖片

當然了也可以一氣呵成走你:

scala> sc.textFile("/user/hive/warehouse/track_log/2015082818")

.filter(line => line.length > 0)

.map(_.split("\t"))

.filter(arr => arr(1).length > 0)

.map(arr => (arr(17).substring(0, 10), 1))

.reduceByKey(_ + _)

.collect

Step6、我們使用Hive來驗證一下

註意如果你的Yarn沒有啟動,需要將Hive設置成Local模式:

hive> set hive.exec.mode.local.auto = true;

然後執行:

技術分享圖片

結果如圖:

技術分享圖片

對比可知,兩個結果是一樣的。

案例三:PV和UV分析

PV:即頁面訪問次數

UV:即不同用戶訪問頁面次數

Step1、讀取網站日誌文件生成RDD對象

scala> val rdd = sc.textFile("/user/hive/warehouse/track_log")

Step2、過濾不必要的數據,並生成map映射,註意此時的操作與之前的案例略有不同,請註意觀察,如圖:

scala> val mapRdd = rdd.filter(_.length > 0).map(line => {

| val arr = line.split("\t")

| val date = arr(17).substring(0, 10)

| val guid = arr(5)

| val url = arr(1)

| (date, guid, url)

| }).filter(tuple => tuple._3.length > 0)

技術分享圖片

Step3、可選步驟,此處可以將數據cache到內存中,註意,cache後,不會立刻緩存到內存中,需要執行一個action,比如count,take,collect都可以

scala> mapRdd.cache

scala> mapRdd.count

在此之後就可以在4040端口的頁面是storge選項中看到緩存到內存中的數據信息,如圖:

技術分享圖片

Step4、統計PV

scala> val pvRdd = mapRdd.map(tuple => (tuple._1, 1)).reduceByKey(_ + _)

scala> pvRdd.first,如圖:

技術分享圖片

Step5、UV統計

scala> val uvRdd = mapRdd.map(tuple => (tuple._1 + "_" + tuple._2, 1)).distinct.map(tuple => {

val arr = tuple._1.split("_")

(arr(0), 1)

}).reduceByKey(_ + _)

技術分享圖片 此時可以自行使用uvRdd.first查看結果,不再展示

Step6、合並PV和UV的結果進行顯示

union方式:

scala> val pv_uvRdd = pvRdd.union(uvRdd)

scala> pv_uvRdd.collect,如圖:

技術分享圖片

join方式:

scala> val pv_uvRdd = pvRdd.join(uvRdd)

scala> pv_uvRdd.first,如圖:

技術分享圖片

驗證:使用Hive或者SparkSQL驗證結果一致性

首先創建SQL語句:

技術分享圖片

SparkSQL方式:

scala> val sql = """ 上邊的SQL代碼 """,如圖:

技術分享圖片

然後執行:

scala> val result = sqlContext.sql(sql)

技術分享圖片

scala> result.show()

技術分享圖片

尖叫提示:如果你的hive使用了thrift的metastore方式,請把hive的hive-site.xml文件軟連接到spark的conf目錄下!!否則上述指令將會出現找不到table的錯誤。

HIVE方式:直接使用Hive客戶端執行上面的SQL語句,如圖:

技術分享圖片

Spark任務歷史服務

對於Yarn有mr-historyserver

對於Spark有SparkHistory

所以應該很容易明白這是一個任務日誌的歷史服務,比如你可以查看昨天半夜運行的任務情況。

開啟這個服務也很簡單:

可以參看:http://spark.apache.org/docs/1.6.1/monitoring.html

Step1、配置參數

配置:spark-env.sh,日誌默認是保存在本地的,此刻我們將日誌保存到HDFS系統當中如圖:

技術分享圖片

配置:spark-defaults.conf,spark啟動時默認加載的配置文件

技術分享圖片

Step2、在HDFS系統中創建目錄/user/z/spark-events

Step3、將配置文件重新scp到其他節點之後,重啟服務,然後開啟歷史服務

$ sbin/start-history-server.sh

JPS看一眼:

技術分享圖片

然後在瀏覽器打開:http://z01:18080/

如圖:

技術分享圖片

Step4、測試玩一玩?

$ bin/spark-shell --master spark://z01:7077

隨便執行執行一個我們之前的案例任務,即可,運行幾個任務,成功運行幾個,再失敗幾個,如圖:

技術分享圖片

註意紅框內容,如果你當前的spark-shell沒有退出,那麽該任務就是屬於正在運行的任務。請自行切換觀察即可。

* 總結

對於RDD到RDD的 操作,我們稱之為Transformation操作

例如:我們在案例中使用的過濾,或者map,或者reduce等等

對RDD到其他類型的操作,我們稱之為Action

例如:我們在案例中使用的top,或者take、collect等操作

另外RDD中的數據可以持久化到內存中來操作,使用:

rdd.cache來操作,比較適用於頻繁使用的。

這一節我們大概了解了Spark的操作,也應該更加深刻的熟悉了Scala的操作。下一節我們針對Spark進行更深入的探討。


個人微博:http://weibo.com/seal13

QQ大數據技術交流群(廣告勿入):476966007



作者:Z盡際
鏈接:https://www.jianshu.com/p/136874812e11
來源:簡書
著作權歸作者所有。商業轉載請聯系作者獲得授權,非商業轉載請註明出處。

SparkCore基礎(一)