【spark你媽喊你回家吃飯-05】RDD程式設計之旅基礎篇-01
1.RDD工作流程
1.1 RDD理解
RDD是spark特有的資料模型,談到RDD就會提到什麼彈性分散式資料集,什麼有向無環圖,本文暫時不去展開這些高深概念,在閱讀本文時候,大家可以就把RDD當作一個數組,這樣的理解對我們學習RDD的API是非常有幫助的。本文所有示例程式碼都是使用scala語言編寫的。RDD的執行過程如下:
- 從外部資料創建出輸入RDD,或者從驅動程式分發驅動程式中的物件集合
- 對RDD進行轉化,一個RDD轉化為一個新的RDD,如filter()操作
- 如果需要重用,告知RDD執行persist()操作
- 執行action觸發計算平行計算,spark先優化再執行計算,如count()和first()
RDD的建立有2種方式
1)從驅動程式分發驅動程式中的物件集合
從記憶體裡構造RDD,使用的方法:makeRDD和parallelize方法
val rdd01 = sc.makeRDD(List(1,2,3,4,5,6)); val r01 = rdd01.map { x => x * x } println(r01.collect().mkString(",")) /* Array */ val rdd02 = sc.makeRDD(Array(1,2,3,4,5,6)) val r02 = rdd02.filter { x => x < 5} println(r02.collect().mkString(",")) val rdd03 = sc.parallelize(List(1,2,3,4,5,6), 1) val r03 = rdd03.map { x => x + 1 } println(r03.collect().mkString(",")) /* Array */ val rdd04 = sc.parallelize(Array(1,2,3,4,5,6) ,1) val r04 = rdd04.filter { x => x > 3 } println(r04.collect().mkString(","))
1.2.makeRDD和parallelize的區別
makeRDD有兩種實現方式,第一種方式parallelize宣告都一樣,接收的引數和parallelize完全一樣,def makeRDD[T:ClassTag],這種實現方式的makeRDD依賴了parallelize;makeRDD第二種實現方式defmakeRDD[T:ClassTag](T,Seq(String)))
第一種:mkRDD實現方式
val blog1=sc.parallelize(List(1,2,3));
val blog2=sc.makeRDD(List(1,2,3)
第二種:mkRDD實現方式
val seq=List((1,List("a","b","c")),(2,List("aa","bb","cc")));
val seq=List((1,List("a","b","c")),(2,List("aa","bb","cc")));
val blog3=sc.makeRDD(seq);
blog3.preferredLocations(blog3.partitions(0));
blog3.preferredLocations(blog3.partitions(1));
2.WordCount解說
WordCount是分散式程式設計的入門示例,本節也從WordCount舉例說明RDD DEMO
2.1.Spark shell版本
//載入hdfs上的檔案
val txtFile ="/tmp/test/core-site.xml" ;
val txtData = sc.textFile(txtFile);
//將上一步生成的RDD物件儲存到快取中,在此之後Spark就不需要在每次資料查詢時都重新計算
txtData.cache() ;
// flatMap先對映後扁平化,
val wcData = txtData.flatMap(l =>l.split(" ")).map(word => (word, 1)).reduceByKey(_ + _);
//可以提取出所有rdd裡的資料項,逐行輸出
wcData.collect().foreach(println);
備註:
A.關於spark-shell的啟動引數指定
bin/spark-shell --executor-memory 1G --total-executor-cores10 --executor-cores 1 --master yarn-client --driver-class-path /usr/local/tdr_hadoop/spark/spark-1.6.0-bin-hadoop2.6/lib/mysql-connector-java-5.1.40-bin.jar
--executor-memory: 指定每個executor(執行器)佔用的記憶體
--total-executor-cores: 所有executor總共使用的cpu核數
--executor-cores:每個executor使用的cpu核數
--driver-class-path:指定要載入的jar包
--master:
local[8]:表示在本地執行,資料會下載到介面機本地來執行,單機版
spark://master01:7077:表示在叢集上執行應用程式,指定任務提交的叢集路徑在哪裡。這就需要提前啟動一個真實的Standalone叢集。可以指定多個master的地址,用逗號隔開。
yarn-client:在客戶模式上,driver與提交程式的客戶端在一個程序
yarn-cluster:在叢集模式上,driver是從叢集中的一個worker程序中啟動的,這個程序只要完成了提交作業任務就會退出,不會等待提交的應用程式的完成。Spark-shell時,必須使用yarn-client模式,因為你要在client上寫SQL。
B.spark-shell 是一個spark application,執行時需要向資源管理器申請資源,如standalone spark、YARN、Mesos。本例向standalone spark申請資源,所以在執行spark-shell時需要指向申請資源的standalonespark叢集資訊,其引數為MASTER。
如果未在spark-env.sh中申明MASTER,則使用命令MASTER=spark://cdh1:7077bin/spark-shell啟動;如果已經在spark-env.sh中申明MASTER,則可以直接用bin/spark-shell啟動。
由於spark-shell預設的情況下,會申請所有的CPU資源
2.2. java 版本
搭建Spark開發環境
(1)前提:配置好jdk和scala到windows
(2)安裝Intellij去官網下載Intellij:https://www.jetbrains.com/idea/,在windows環境下雙擊安裝即可
(3)安裝scala外掛,安裝好scala外掛後,點選restart重啟intellij
(4)、使用Intellij寫WordCount程式碼
a.新建scala工程:File -> new -> project -> scala project –>scala,專案名稱:spark02
在src目錄下,建立cn.com包,在該包下建立object 類,命名為word,完成word.scala程式碼如下所示:
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
/**
* Created by Administrator on 2016/11/2.
*/
object word {
def main(args: Array[String]) {
if(args.length < 1) {
System.err.println("Usage: <file>")
System.exit(1)
}
val conf = new SparkConf()
val sc = new SparkContext(conf)
//SparkContext 是把程式碼提交到叢集或者本地的通道,我們編寫Spark程式碼,無論是要本地執行還是叢集執行都必須有SparkContext的例項
val line = sc.textFile(args(0))
//把讀取的內容儲存給line變數,其實line是一個MappedRDD,Spark的所有操作都是基於RDD的
line.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).collect.foreach(println)
sc.stop
}
}
b.匯入spark包
File
->Project structure
->project settting
->libraries->+
匯入spark-assembly-1.6.0-hadoop2.6.0.jar包(該包從spark安裝包的lib下獲得)
c.選擇Artifacts
File
->Project structure
->project settting
->Artifacts->+,選擇要匯入的專案,以及main類
並且指定jar包輸出的位置
d.輸出jar包
Build -> Build ArtiFacts ->build,打好jar包到:D:\spark02\out\artifacts\spark02_jar\spark02.jar
e.上傳jar包到spark客戶端,並執行
執行命令:
spark-submit --master yarn --executor-memory 1000M /usr/local/tdr_hadoop/spark/spark02.jarhdfs://tdrHadoop/tmp/test/core-site.xml
在yarn的前臺顯示正在執行
執行結果輸出:
本文為原創文章,同時在微信公眾號釋出,在轉載請聯絡作者,想了解更多精彩大資料技術文章,請關注微信公眾號:大資料梅峰谷 ,也可以掃描二維碼