SparkCore基礎(一)
* SparkCore基礎(一)
學習Spark,首先要熟悉Scala,當然你說你會Python或者Java能不能玩Spark?能!但是不推薦,首推Scala,因為Scala非常便捷,而且Scala有非常好的交互式編程體驗(當然了,在這裏,Python也不差)。其次呢,我們要對Hadoop的MapReduce要是有一定的了解。不然,學習起來,是會稍微費點功夫。好,不扯這麽多了,相關的故事啊,疑問啊可以評論留言詢問或者百度查詢,我們現在直接進入正題。
Spark特征簡述
* Spark是什麽
官方描述:Spark is a fast and general engine for large-scale data processing
** Spark是一個快速的,通用的,大數據規模的運算引擎。這是一個非常精準的描述。
** Spark是基於MapReducer實現的通用的分布式計算框架,所以它繼承了MapReduce的優點,同時還支持將Job運算任務產生的中間結果和最終結果保存在內存中。
* Spark優勢
** Spark的中間數據放到內存中,對於叠代運算效率更高
** 運算速度奇快
** 更靈活的數據操作,比如:map, filter, flatMap, sample, groupByKey, reduceByKey, union, join, cogroup, mapValues, sort,partionBy等等
* Spark不適合做什麽
** 不適合做增量變化的應用模型
* Spark支持語言
Java、Scala、Python
* 適用場景討論
** 適用於需要多次操作特定數據集的應用場合。需要反復操作的次數越多,所需讀取的數據量越大,受益越大,數據量小但是計算密集度較大的場*合,受益就相對較小。
Spark下載
一般情況下,我們使用spark之前,都需要下載源碼,然後根據自己的集群環境(也就是Hadoop版本)進行編譯,然後再安裝使用。
Spark下載:
http://spark.apache.org/downloads.html
打開頁面後,做出如下選擇,即可開始下載源碼
在這裏我們使用1.6.1的源碼Spark編譯
在此我們簡單介紹兩種方式:
** SBT編譯
這是一個類似Maven的倉庫,基於Scala
** Maven編譯
命令:
** make-distribution.sh編譯
修改源碼根目錄下的make-distribution.sh文件,修改內容如圖:
依次為:配置Spark版本,Scala版本,Hadoop版本,是否支持Hive,1為支持
配置鏡像:註意,如果編譯的是原版,請添加此鏡像,如果編譯的是CDH版本的,請註意去掉此鏡像。
配置域名解析服務器:
$ sudo vi /etc/resolv.conf,配置如下:
nameserver 8.8.8.8
nameserver 8.8.4.4
最後執行命令:
註意要支持yarn和hive世界充滿愛之編譯好的Spark傳送門(分別包含包含Apache和CDH版本的):
鏈接:http://pan.baidu.com/s/1eRBJtjs 密碼:t03u
Spark運行模式
** Local
即本地模式
** Standalone
即Spark自帶的集群模式,分為Master節點和Worker節點,顧名思義,一個管理者,多個幹活的。:)
** Yarn
國內相當主流的一種運行部署模式,只是目前Yarn分配的Container是不能夠動態伸縮的,後續可能會考慮支持。
** Mesos
Spark在出生的時候就考慮支持該框架,很靈活,但國內使用似乎不多,感興趣請自行研究之。
Spark安裝部署
將Spark解壓出來,然後到conf目錄下,自己將template文件拷貝出文後提到的文件進行配置即可,在之前的章節我們已經提到過很多次,此步驟想必應該非常熟練了,不再贅述了。
Local模式:
spark-env.sh 文件配置如下:
Spark測試案例之Local模式
在案例開始前,請確保你的HDFS是可用的,並且spark-shell在active的NameNode節點上運行。此刻建議你已經熟知Hadoop中MapReduce的編寫過程以及運行原理。
案例一:基於本地模式的WordCount,words.txt中的內容:
Step1、進入spark根目錄使用$ bin/spark-shell命令啟動spark,如下圖:
Step2、讀取/input/words.txt文件,嘗試檢查一下words.txt文件有多少行數據,操作如下:
scala> val rdd = sc.textFile("/input/words.txt")
scala> rdd.count
當然了,統計詞頻,這個步驟可以省略,在此只是想驗證一下自己讀取到的數據有沒有問題好,大家可以看到,有3行數據,每一行都有若幹英文單詞。那麽這裏面涉及到幾個問題需要拿出來討論一下:
1、什麽是rdd?
RDD is a fault-tolerant collection of elements that can be operated on in parallel,RDD是彈性分布式數據集,全稱Resilient Distributed Datasets,具有分布式,高容錯性等特點,在這裏,剛開始接觸的話,你可暫且理解為一個集合就可以了,一個數據集合。
2、什麽是sc?
sc的全稱是SparkContext,即Spark的上下文對象,這個理解可以類比於在Hadoop階段我們在MapReduce中接觸到的Context,不管是讀取文件還是其他數據操作,都依賴於SparkContext的實例化。在這裏,sc即一個實例化好的SparkContext對象。
我們通過sc.textFile方法讀取到HDFS系統中存放的words.txt文件信息,該方法返回一個RDD對象,之後我們通過rdd對象調用count方法,來查看讀取到的文件中數據有多少行。
Step3、利用得到的rdd對象進行數據的拆分,即,每一個單詞都拆分成一個RDD對象,比如類似這樣的理解:RDD<String> rdd = new RDD("hadoop");那麽使用scala在spark中如何做呢?請看:
scala> val wordRdd = rdd.flatMap(line => line.split(" "))
然後我們使用wordRdd顯示一下第一個單詞看一看:
scala> wordRdd.first
Step4、將分割出來的每一個單詞做Map映射
scala> val mapRdd = wordRdd.map(word => (word, 1))
這是scala的高階函數,註意不理解請重新復習Scala語言。該語句的意思是:將wordRdd中存放的單詞映射為一個tuple元組,元組中有兩個元素,第一個元素為單詞,第二個元素為當前單詞本次的個數,固定為1,這個1就像Hadoop階段中Map的LongWritable一樣,這個word就像Text一樣。
Step5、這一步要做的就是講map映射出來的數據集進行reduce運算
scala> val reduceRdd = mapRdd.reduceByKey((x, y) => x + y)
該行代碼的意思是將某一個單詞的好多個1(當然如果進行Combine操作了,也許可能不是多個1,如果你無法理解我這一句在說什麽,請繼續前進,然後重新復習Hadoop的MapReduce相關知識點)進行相加運算。
Step6、查看一下結果
scala> reduceRdd.collect
顯示出來了,而且執行過程非常的迅速,你懂得。
當然了,以上的操作,完全可以使用一句話來實現,並且代碼的體現形式可以非常騷氣,如:
scala> sc.textFile("/input/words.txt").flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).collect
Step7、當然了結果也可以輸出到HDFS系統當中,比如:
scala> reduceRdd.saveAsTextFile("/output/spark/test01")
案例二:基於案例一,進行二次排序,即,將統計出的詞頻結果按照降序或者升序排列
sc.textFile("/input/words.txt")
.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
.map(x => (x._2, x._1))
.sortByKey()
.map(x => (x._2, x._1))
.collect
Step1、得到案例一的統計好的詞頻結果,然後做一個map映射,將單詞和單詞出現的次數顛倒過來,也就是說,(hadoop, 1)變成(1, hadoop),這樣做的原因是因為OrderedRDDFunctions類中有一個方法叫做:sortByKey,意思是按照Key的大小進行排序,默認參數是升序,如圖:
為了使用該方法,我們這麽做:
上一個案例,我們得到:
val reduce = sc.textFile("/input/words.txt").flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
然後:
val reverseRdd = reduce.map(x => (x._2, x._1))
然後我們看一眼這個RDD集合:
Step2、直接使用sortByKey進行默認排序
val sortRdd = reverseRdd.sortByKey()
Step3、排序結束你不得給人家再反轉回來?所以:
sortRdd.map(x => (x._2, x._1)).collect,如圖:
當然了,以上分解步驟一氣呵成最爽快:
sc.textFile("/input/words.txt")
.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
.map(x => (x._2, x._1))
.sortByKey()
.map(x => (x._2, x._1))
.collect
Step4、當然了,sortByKey方法也可以實現倒序,如:
sc.textFile("/input/words.txt")
.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
.map(x => (x._2, x._1))
.sortByKey(false)
.map(x => (x._2, x._1))
.collect
Step5、二次排序還可以使用top
top源碼:
這是一個柯裏化的函數,top命令是查看前多少條數據,如圖可見,在查看之時,元素也是排序好的
比如:
sc.textFile("/input/words.txt")
.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
.map(x => (x._2, x._1))
.top(12)
輸出如圖:
Spark運行模式之Standalone
配置:spark-env.sh
Master節點:SPARK_MASTER_IP=z01
Master節點端口號:SPARK_MASTER_PORT=7077
Master WebUI端口號:SPARK_MASTER_WEBUI_PORT=8080
Worker節點可用CPU核心數:SPARK_WORKER_CORES=2
Worker可用內存:SPARK_WORKER_MEMORY=2g
Worker端口號:SPARK_WORKER_PORT=7078
Worker WebUI端口號:SPARK_WORKER_WEBUI_PORT=8081
允許在每臺機器上開啟幾個Worker進程,默認為1個SPARK_WORKER_INSTANCES=1
配置:slaves
即配置允許哪幾臺機器當做Woker節點
以上配置完成後,scp到其他集群節點
啟動:
Master
$ sbin/start-master.sh
Worker
$ sbin/start-slaves.sh
完成後通過z01:8080端口訪問即可如圖所示:
也可以JPS看一下進程:
在Standalone上運行Spark
首先,查看一下spark的幫助文檔來引導該怎麽做:
$ bin/spark-shell --help
註意紅框內的內容,那麽接下來,我們應該知道怎麽讓spark運行在standalone上了:
$ bin/spark-shell --master spark://z01:7077
如圖:註意紅框內容
尖叫提示:如果直接不加參數的使用spark-shell方式啟動,則還是在本地模式(Local)啟動的。
Spark測試案例之Standalone模式
案例一:跑一個一氣呵成的WordCount
scala> sc.textFile("/input/words.txt").flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).collect
WEBUI,http://192.168.122.200:4040/jobs/ 如圖:
可以看到,有一個Job任務已經運行完畢了。
案例二:做一個每日的PV分析
Step1、首先,我們將網站的訪問數據導入到hive當中,執行:
$ cat hql-file/track-log.hql
其中track-log.hql文件如下:
該部分內容可以參看Hive框架基礎(一)Step2、通過Hive查看track_log文件在哪
hive> desc formatted track_log;
如圖:註意紅框內容,對於我們來講,有用的即:/user/hive/warehouse/track_log/2015082818
Step3、將日誌數據讀入到RDD中等待分析
scala> val rdd = sc.textFile("/user/hive/warehouse/track_log/2015082818")
Step4、清洗無效的數據,即空白行,以及url字段為空的,我們要過濾掉
1、先過濾空白行
2、再分割字段值
3、最後過濾url字段為空的
綜合來寫:
scala> val validRdd = rdd.filter(line => line.length > 0).map(_.split("\t")).filter(arr => arr(1).length > 0)
當然了,此時你可以count一下,看看過濾後剩下多少數據
scala> validRdd.count
Step4、將URL做map映射,比如做出這樣的映射:(今日日期, 1)
那麽今日的日期在tracktime字段,屬於分割後的數組的第17個索引處
在hive中我們查看一下該日期的格式:
hive> select tracktime from track_log limit 1;
如圖:
那麽截取出2015-08-28應該很容易,所以:
scala> val mapRdd = validRdd.map(arr => (arr(17).substring(0, 10), 1))
Step5、你懂得,再來一個Reduce即可
scala> val reduceRdd = mapRdd.reduceByKey(_ + _)
完事之後可以查看一下結果:
scala> reduceRdd.collect
如圖:
當然了也可以一氣呵成走你:
scala> sc.textFile("/user/hive/warehouse/track_log/2015082818")
.filter(line => line.length > 0)
.map(_.split("\t"))
.filter(arr => arr(1).length > 0)
.map(arr => (arr(17).substring(0, 10), 1))
.reduceByKey(_ + _)
.collect
Step6、我們使用Hive來驗證一下
註意如果你的Yarn沒有啟動,需要將Hive設置成Local模式:
hive> set hive.exec.mode.local.auto = true;
然後執行:
結果如圖:
對比可知,兩個結果是一樣的。
案例三:PV和UV分析
PV:即頁面訪問次數
UV:即不同用戶訪問頁面次數
Step1、讀取網站日誌文件生成RDD對象
scala> val rdd = sc.textFile("/user/hive/warehouse/track_log")
Step2、過濾不必要的數據,並生成map映射,註意此時的操作與之前的案例略有不同,請註意觀察,如圖:
scala> val mapRdd = rdd.filter(_.length > 0).map(line => {
| val arr = line.split("\t")
| val date = arr(17).substring(0, 10)
| val guid = arr(5)
| val url = arr(1)
| (date, guid, url)
| }).filter(tuple => tuple._3.length > 0)
Step3、可選步驟,此處可以將數據cache到內存中,註意,cache後,不會立刻緩存到內存中,需要執行一個action,比如count,take,collect都可以
scala> mapRdd.cache
scala> mapRdd.count
在此之後就可以在4040端口的頁面是storge選項中看到緩存到內存中的數據信息,如圖:
Step4、統計PV
scala> val pvRdd = mapRdd.map(tuple => (tuple._1, 1)).reduceByKey(_ + _)
scala> pvRdd.first,如圖:
Step5、UV統計
scala> val uvRdd = mapRdd.map(tuple => (tuple._1 + "_" + tuple._2, 1)).distinct.map(tuple => {
val arr = tuple._1.split("_")
(arr(0), 1)
}).reduceByKey(_ + _)
此時可以自行使用uvRdd.first查看結果,不再展示Step6、合並PV和UV的結果進行顯示
union方式:
scala> val pv_uvRdd = pvRdd.union(uvRdd)
scala> pv_uvRdd.collect,如圖:
join方式:
scala> val pv_uvRdd = pvRdd.join(uvRdd)
scala> pv_uvRdd.first,如圖:
驗證:使用Hive或者SparkSQL驗證結果一致性
首先創建SQL語句:
SparkSQL方式:
scala> val sql = """ 上邊的SQL代碼 """,如圖:
然後執行:
scala> val result = sqlContext.sql(sql)
scala> result.show()
尖叫提示:如果你的hive使用了thrift的metastore方式,請把hive的hive-site.xml文件軟連接到spark的conf目錄下!!否則上述指令將會出現找不到table的錯誤。
HIVE方式:直接使用Hive客戶端執行上面的SQL語句,如圖:
Spark任務歷史服務
對於Yarn有mr-historyserver
對於Spark有SparkHistory
所以應該很容易明白這是一個任務日誌的歷史服務,比如你可以查看昨天半夜運行的任務情況。
開啟這個服務也很簡單:
可以參看:http://spark.apache.org/docs/1.6.1/monitoring.html
Step1、配置參數
配置:spark-env.sh,日誌默認是保存在本地的,此刻我們將日誌保存到HDFS系統當中如圖:
配置:spark-defaults.conf,spark啟動時默認加載的配置文件
Step2、在HDFS系統中創建目錄/user/z/spark-events
Step3、將配置文件重新scp到其他節點之後,重啟服務,然後開啟歷史服務
$ sbin/start-history-server.sh
JPS看一眼:
然後在瀏覽器打開:http://z01:18080/
如圖:
Step4、測試玩一玩?
$ bin/spark-shell --master spark://z01:7077
隨便執行執行一個我們之前的案例任務,即可,運行幾個任務,成功運行幾個,再失敗幾個,如圖:
註意紅框內容,如果你當前的spark-shell沒有退出,那麽該任務就是屬於正在運行的任務。請自行切換觀察即可。
* 總結
對於RDD到RDD的 操作,我們稱之為Transformation操作
例如:我們在案例中使用的過濾,或者map,或者reduce等等
對RDD到其他類型的操作,我們稱之為Action
例如:我們在案例中使用的top,或者take、collect等操作
另外RDD中的數據可以持久化到內存中來操作,使用:
rdd.cache來操作,比較適用於頻繁使用的。
這一節我們大概了解了Spark的操作,也應該更加深刻的熟悉了Scala的操作。下一節我們針對Spark進行更深入的探討。
個人微博:http://weibo.com/seal13
QQ大數據技術交流群(廣告勿入):476966007
作者:Z盡際
鏈接:https://www.jianshu.com/p/136874812e11
來源:簡書
著作權歸作者所有。商業轉載請聯系作者獲得授權,非商業轉載請註明出處。
SparkCore基礎(一)