跟天齊老師學Spark(6)--Spark的常用運算元介紹
阿新 • • 發佈:2019-01-01
spark的常用運算元介紹:
Resilient(彈性且可復原) Distributed(分散式) Datasets(資料集) (RDDs)
我們以前學的scala的原生方法都是陣列或者集合上定義的,它只能操作單機。
而spark中的方法都是定義在RDD上的,它們操作的是叢集。
spark的方法有兩類操作:
一種是:Transformations,它是lazy的,不會觸發任務的執行,是一種轉換
(從一種形式變成另一種形式),是延遲載入的;
一種是:Actions,執行出結果,是一種動作,是立即載入(執行)的;
看一下spark 的常用運算元:登入spark的官網
http://spark.apache.org/
下面是官方的Spark程式設計指南:
http://spark.apache.org/docs/1.6.2/programming-guide.html
在指南的列表目錄中有 Transformations 和 Actions 。
Diver是客戶端,它提交程式到spark叢集,而計算的返回結果會再發回給Diver客戶端。
Transformations有很多,比如說map方法,filter方法,mapPartitions方法,
sample方法,union方法,intersection方法,distinct方法,groupByKey方法,
reduceByKey方法,aggregateByKey方法,sortByKey方法,join方法,
Transformations 的特點是,它是lazy的,並不會觸發任務的執行。
有scala的API:
http://spark.apache.org/docs/1.6.2/api/scala/index.html#org.apache.spark.rdd.RDD
也有java的API:
http://spark.apache.org/docs/1.6.2/api/java/index.html?org/apache/spark/api/java/JavaRDD.html
Actions的方法有:reduce方法,collect方法,count方法,first方法,
take方法,takaSample方法,takeOrdered方法,saveAsTextFile方法,
countByKey方法,foreach方法。
---------------------------------------------------------------------
說一下spark的第一類運算元:Transformations
如果要從hdfs中讀取資料,或者從關係型資料庫中讀資料,如果我們的hdfs或者關係型資料庫沒有啟動,
此時使用Transformations型別的運算元,就不會報錯。因為它只會記錄一下它將從哪裡讀取資料。
Transformations僅僅是一種轉換操作,而Action則是一些執行操作。
spark支援從本地檔案系統、hdfs、s3、hbase等讀取資料。
在Worker中有一個程序叫做"CoarseGrainedExecutorBackend"程序,它負責讀取資料、
對資料進行切分、聚合等操作,都是在Executor中完成的。
在Driver端啟動了一個spark-shell,然後在這裡寫任務,比如寫一些轉換和Action,在寫
轉換的時候,它會在這個spark-shell中記錄我們是怎樣轉換的,一旦觸發Action,Driver端就會把
任務提交到我們的Executor上執行,執行返回的結果會收集到Driver端。每個Executor只負責計算其
中的屬於它的資料。
這就是分散式思維,一個大的任務讓一臺機器去幹,它幹不了,那麼我們就分成很多的小任務,
由多臺機器去並行執行。最後將所有的結果收集起來,存到外部儲存介質中。
從外部儲存介質中將資料讀進來然後建立的RDD,還有一個種建立RDD的方法:
//使用spark的並行化方法建立RDD,並指定分割槽數為2個:
sc.parallelize(Array(1,2,3,4,5,6,7,8,9),2),也能建立一個RDD,
它會將一個集合或者陣列轉換成RDD,還能指定分割槽數,這裡我們給它指定了分割槽數為2。
這樣它就會生成兩個結果檔案。就像MapReduce中的一樣,如果有兩個reduce就
會有兩個結果檔案一樣。
RDD上的這些常用運算元一定要練習。
------------------------------------------------
RDD的Action運算元:
我們把Spark的客戶端叫做Driver。
啟動Master;
啟動Worker;
Worker向Master註冊;
Master向Worker反饋;
Worker向Master傳送心跳。
Driver首先會跟Master進行RPC通訊(因為我們在shell命令
中指定了--master的位置,所以它會去找Master)。向Master申請資源;
Master啟動符合條件的Worker來啟動Executor(Master會告訴Worker
啟動Executor這個java子程序。Executor程序是由Worker程序啟動的。);
Executor會主動跟Driver進行通訊(因為Driver會把它的一些資訊傳送給Master,而Master會把Driver的資訊
封裝起來,傳送給Worker,而Executor就通過Worker知道了Driver的資訊,所以就能找到Driver了),
接下來就可以寫spark程式了。
可以為資料指定分割槽的。
一個分割槽一定屬於一臺機器,但是一臺機器可能有多個分割槽。
資料的讀取和計算都是在Executor中完成的。
rdd的foreachPartition方法可以將資料取出來存到關係型資料庫中(後面再介紹)。
Resilient(彈性且可復原) Distributed(分散式) Datasets(資料集) (RDDs)
我們以前學的scala的原生方法都是陣列或者集合上定義的,它只能操作單機。
而spark中的方法都是定義在RDD上的,它們操作的是叢集。
spark的方法有兩類操作:
一種是:Transformations,它是lazy的,不會觸發任務的執行,是一種轉換
(從一種形式變成另一種形式),是延遲載入的;
一種是:Actions,執行出結果,是一種動作,是立即載入(執行)的;
看一下spark 的常用運算元:登入spark的官網
http://spark.apache.org/
下面是官方的Spark程式設計指南:
http://spark.apache.org/docs/1.6.2/programming-guide.html
在指南的列表目錄中有 Transformations 和 Actions 。
Diver是客戶端,它提交程式到spark叢集,而計算的返回結果會再發回給Diver客戶端。
Transformations有很多,比如說map方法,filter方法,mapPartitions方法,
sample方法,union方法,intersection方法,distinct方法,groupByKey方法,
reduceByKey方法,aggregateByKey方法,sortByKey方法,join方法,
Transformations 的特點是,它是lazy的,並不會觸發任務的執行。
有scala的API:
http://spark.apache.org/docs/1.6.2/api/scala/index.html#org.apache.spark.rdd.RDD
也有java的API:
http://spark.apache.org/docs/1.6.2/api/java/index.html?org/apache/spark/api/java/JavaRDD.html
Actions的方法有:reduce方法,collect方法,count方法,first方法,
take方法,takaSample方法,takeOrdered方法,saveAsTextFile方法,
countByKey方法,foreach方法。
---------------------------------------------------------------------
說一下spark的第一類運算元:Transformations
如果要從hdfs中讀取資料,或者從關係型資料庫中讀資料,如果我們的hdfs或者關係型資料庫沒有啟動,
此時使用Transformations型別的運算元,就不會報錯。因為它只會記錄一下它將從哪裡讀取資料。
Transformations僅僅是一種轉換操作,而Action則是一些執行操作。
spark支援從本地檔案系統、hdfs、s3、hbase等讀取資料。
在Worker中有一個程序叫做"CoarseGrainedExecutorBackend"程序,它負責讀取資料、
對資料進行切分、聚合等操作,都是在Executor中完成的。
在Driver端啟動了一個spark-shell,然後在這裡寫任務,比如寫一些轉換和Action,在寫
轉換的時候,它會在這個spark-shell中記錄我們是怎樣轉換的,一旦觸發Action,Driver端就會把
任務提交到我們的Executor上執行,執行返回的結果會收集到Driver端。每個Executor只負責計算其
中的屬於它的資料。
這就是分散式思維,一個大的任務讓一臺機器去幹,它幹不了,那麼我們就分成很多的小任務,
由多臺機器去並行執行。最後將所有的結果收集起來,存到外部儲存介質中。
從外部儲存介質中將資料讀進來然後建立的RDD,還有一個種建立RDD的方法:
//使用spark的並行化方法建立RDD,並指定分割槽數為2個:
sc.parallelize(Array(1,2,3,4,5,6,7,8,9),2),也能建立一個RDD,
它會將一個集合或者陣列轉換成RDD,還能指定分割槽數,這裡我們給它指定了分割槽數為2。
這樣它就會生成兩個結果檔案。就像MapReduce中的一樣,如果有兩個reduce就
會有兩個結果檔案一樣。
RDD上的這些常用運算元一定要練習。
------------------------------------------------
RDD的Action運算元:
我們把Spark的客戶端叫做Driver。
啟動Master;
啟動Worker;
Worker向Master註冊;
Master向Worker反饋;
Worker向Master傳送心跳。
Driver首先會跟Master進行RPC通訊(因為我們在shell命令
中指定了--master的位置,所以它會去找Master)。向Master申請資源;
Master啟動符合條件的Worker來啟動Executor(Master會告訴Worker
啟動Executor這個java子程序。Executor程序是由Worker程序啟動的。);
Executor會主動跟Driver進行通訊(因為Driver會把它的一些資訊傳送給Master,而Master會把Driver的資訊
封裝起來,傳送給Worker,而Executor就通過Worker知道了Driver的資訊,所以就能找到Driver了),
接下來就可以寫spark程式了。
可以為資料指定分割槽的。
一個分割槽一定屬於一臺機器,但是一臺機器可能有多個分割槽。
資料的讀取和計算都是在Executor中完成的。
rdd的foreachPartition方法可以將資料取出來存到關係型資料庫中(後面再介紹)。