Spark基礎 -- Spark Shell -- RDD -- 運算元
Spark基礎 – Spark Shell – RDD – 運算元
文章目錄
一、簡介
Apache Spark 是專為大規模資料處理而設計的快速通用的計算引擎。Spark是UC Berkeley AMP lab (加州大學伯克利分校的AMP實驗室)所開源的類Hadoop MapReduce的通用並行框架,Spark,擁有Hadoop MapReduce所具有的優點;但不同於MapReduce的是——Job中間輸出結果可以儲存在記憶體中,從而不再需要讀寫HDFS,因此Spark能更好地適用於資料探勘與機器學習等需要迭代的MapReduce的演算法。
二、Spark 1.6.3部署
準備工作
-
三臺Linux伺服器,安裝好JDK1.8、Hadoop2.6
-
下載安裝包spark1.6.3
-
將安裝包上傳到第一臺Linux伺服器上
解壓安裝
解壓安裝包到指定位置
tar -zxvf spark-1.6.3-bin-hadoop2.6.tgz -C /home/bigdata/installsoft/
將資料夾重新命名為spark-1.6.3
配置spark,master高可用
-
進入spark安裝目錄下的conf目錄
cd /home/bigdata/installsoft/spark-1.6.3/conf
-
將spark-env.sh.template重新命名為spark-env.sh
mv spark-env.sh.template spark-env.sh
-
編輯spark-env.sh並新增配置
export JAVA_HOME=/home/bigdata/installsoft/jdk1.8.0_181/ export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=cdhnocms01,cdhnocms02,cdhnocms03 -Dspark.deploy.zookeeper.dir=/spark" export SPARK_MASTER_PORT=7077 export HADOOP_CONF_DIR=/home/bigdata/installsoft/hadoop-2.6.0-cdh5.13.2/etc/hadoop
-
將slaves.template重新命名為slaves
mv slaves.template slaves
-
在slaves檔案彙總新增worker節點所在的主機
cdhnocms01
cdhnocms02
cdhnocms03
配置環境變數
在使用者家目錄下.bash_profile檔案中新增
SPARK_HOME=/home/bigdata/installsoft/spark-1.6.3/
PATH= SPARK_HOME/bin:$SPARK_HOME/sbin
儲存退出後source .bash_profile
分發配置好的Spark到其他節點
scp -r /home/bigdata/installsoft/spark-1.6.3/ cdhnocms02:/home/bigdata/installsoft/
scp -r /home/bigdata/installsoft/spark-1.6.3/ cdhnocms02:/home/bigdata/installsoft/
直接將環境變數檔案傳送到其他節點,或者在其他節點上一一配置環境變數
三、Spark叢集啟動和測試
啟動
在cdhnocms01節點上執行/home/bigdata/installsoft/spark-1.6.3/sbin/start-all.sh
在cdhnocms02節點上執行/home/bigdata/installsoft/spark-1.6.3/sbin/start-master.sh
此時使用jps檢視三臺機器程序,如下表
cdhnocms01 | cdhnocms02 | cdhnocms03 |
---|---|---|
Master、Worker | Master、Worker | Worker |
注意:雖然配置了環境變數,但由於名稱相同,如果直接在任意目錄直接執行start-all.sh,啟動的將會是hadoop的相關程序。解決辦法:修改啟動指令碼的檔名。
測試
執行官方自帶的例子
spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://cdhnocms01:7077 \
--executor-memory 1G \
--total-executor-cores 2 \
/home/bigdata/installsoft/spark-1.6.3/lib/spark-examples-1.6.3-hadoop2.6.0.jar \
100
執行完成後可以在命令列中找到結果
在web監控頁面:cdhnocms02:8080上可以檢視任務狀態
四、Spark Shell
spark shell簡介
spark-shell是Spark自帶的互動式Shell程式,方便使用者進行互動式程式設計,使用者可以在該命令列下用scala編寫spark程式。
spark shell 啟動
spark-shell \
--master spark://cdhnocms01:7077 \
--executor-memory 1G \
--total-executor-cores 2
引數說明
–master spark://cdhnocms02:7077 指定Master的地址
–executor-memory 1G 指定每個worker可用記憶體為1G
–total-executor-cores 2 指定整個叢集使用的cup核數為2個
注意
如果啟動spark shell時沒有指定master地址,但是也可以正常啟動spark shell和執行spark shell中的程式,其實是啟動了spark的local模式,該模式僅在本機啟動一個程序,沒有與叢集建立聯絡。
啟動spark shell後,可以注意到在控制檯有如下兩條語句:
意思是Spark Shell中已經預設將SparkContext類初始化為物件sc,SQLContext類初始化為物件sqlContext。使用者程式碼如果需要用到,則直接使用對應的物件名即可即可。
在spark shell中編寫wordcount程式
- 上傳資料檔案到hdfs://cdhnocms01:8020/userdata/wc.txt
- val file = sc.textFile(“hdfs://cdhnocms01:8020/userdata/wc.txt”)
- val words = file.flatMap(_.split(" "))
- val map = words.map((_,1))
- val result = map.reduceByKey(+)
- 接下來可以直接通過result.collect將結果列印到控制檯,或者result.saveAsTextFile(“hdfs://cdhnocms01:8020/out/20181119”),將結果檔案儲存在hdfs後檢視結果
五、IDEA中編寫WordCount
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com</groupId>
<artifactId>spark</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>1.6.3</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/day01</sourceDirectory>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.19</version>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
</plugins>
</build>
</project>
編寫程式碼
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object SparkWordCount_scala {
def main(args: Array[String]): Unit = {
// 1.獲取spark的conf
// 本地執行
val conf = new SparkConf().setAppName("spark_wordcount_scala").setMaster("local[2]")
// 提交到叢集中執行
//val conf = new SparkConf().setAppName("spark_wordcount_scala").setMaster("spark://cdhnocms01:7077")
// 2.根據conf物件獲取sparkContext(spark的上下文)
val sc = new SparkContext(conf)
// 讀取hdfs中的資料
sc.hadoopConfiguration.addResource("core-site.xml")
sc.hadoopConfiguration.addResource("hdfs-site.xml")
//第二種讀取hdfs的HA的檔案資料
// sc.hadoopConfiguration.set("")
// 3.使用sc進行操作
// 讀取資料來源
val words:RDD[String] = sc.textFile(args(0))
val res:RDD[(String,Int)] = words.flatMap(_.split(" ")).map((_,1)).groupBy(_._1).mapValues(_.size)
// 列印
res.foreach(f=>println(f))
//4. 關閉sc
sc.stop()
}
}
可以直接在idea中配置好輸入引數後執行,可以的到結果
打包到Linux伺服器中執行
使用Maven打包後,將jar包上傳至Linux中,執行命令:
spark-submit \
--class SparkWordCount_scala \
--master spark://cdhnocms01:7077 \
/home/bigdata/userjars/spark-1.0-SNAPSHOT.jar \
hdfs://bigdata/userdata/wc.txt
程式碼的輸出結果(在網頁監控埠任務的stdout中檢視):
六、彈性分散式資料集RDD
RDD簡介
RDD(Resilient Distributed Dataset),分散式資料集,是Spark中最基本的資料抽象,它代表一個不可變、可分割槽、裡面的元素可平行計算的集合。RDD具有資料流模型的特點:自動容錯、位置感知性排程和可伸縮性。RDD允許使用者在執行多個查詢時顯式地將工作集快取在記憶體中,後續的查詢能夠重用工作集,這極大地提升了查詢速度。
RDD屬性
- A list of partitions
- A function for computing each split
- A list of dependencies on other RDDs
- Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
- Optionally, a list of preferred locations to compute each split on (e.g. block locations for
an HDFS file)
屬性詳解:
- 一組分片(Partition),即資料集的基本組成單位。對於RDD來說,每個分片都會被一個計算任務處理,並決定平行計算的粒度。使用者可以在建立RDD時指定RDD的分片個數,如果沒有指定,那麼就會採用預設值。預設值就是程式所分配到的CPU Core的數目。
- 一個計算每個分割槽的函式。Spark中RDD的計算是以分片為單位的,每個RDD都會實現compute函式以達到這個目的。compute函式會對迭代器進行復合,不需要儲存每次計算的結果。
- RDD之間的依賴關係。RDD的每次轉換都會生成一個新的RDD,所以RDD之間就會形成類似於流水線一樣的前後依賴關係。在部分分割槽資料丟失時,Spark可以通過這個依賴關係重新計算丟失的分割槽資料,而不是對RDD的所有分割槽進行重新計算。
- 一個Partitioner,即RDD的分片函式。當前Spark中實現了兩種型別的分片函式,一個是基於雜湊的HashPartitioner,另外一個是基於範圍的RangePartitioner。只有對於key-value的RDD,才會有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函式不但決定了RDD本身的分片數量,也決定了parent RDD Shuffle輸出時的分片數量。
- 一個列表,儲存存取每個Partition的優先位置(preferred location)。對於一個HDFS檔案來說,這個列表儲存的就是每個Partition所在的塊的位置。按照“移動資料不如移動計算”的理念,Spark在進行任務排程的時候,會盡可能地將計算任務分配到其所要處理資料塊的儲存位置。
RDD建立
-
由一個已經存在的Scala集合建立(Array、List、Seq等)
sc.parallelize(args(0),args(1))
第一個引數代表已存在的Scala集合,第二個引數代表分片個數,如果不指定則會採用預設值—分配的CPU core數量
-
由外部儲存系統的資料集建立,包括本地的檔案系統,還有所有Hadoop支援的資料集,比如HDFS、Cassandra、HBase等
sc.textFile(“hdfs://cdhnocms01:8020/userdata/wc.txt”)
七、RDD程式設計API
Transformation
RDD中的所有轉換都是延遲載入的,也就是說,它們並不會直接計算結果。相反的,它們只是記住這些應用到基礎資料集(例如一個檔案)上的轉換動作。只有當發生一個要求返回結果給Driver的動作時,這些轉換才會真正執行。
Transformation | 含義 |
---|---|
map(func) | 返回一個新的RDD,該RDD由每一個輸入元素經過func函式轉換後組成 |
filter(func) | 返回一個新的RDD,該RDD由經過func函式計算後返回值為true的輸入元素組成 |
flatMap(func) | 類似於map,但是每一個輸入元素可以被對映為0或多個輸出元素(所以func應該返回一個序列,而不是單一元素) |
mapPartitions(func) | 類似於map,但獨立地在RDD的每一個分片上執行,因此在型別為T的RDD上執行時,func的函式型別必須是Iterator[T]=> Iterator[U] |
mapPartitionsWithIndex(func) | 類似於mapPartitions,但func帶有一個整數引數表示分片的索引值,因此在型別為T的RDD上執行時,func的函式型別必須是(Int, Iterator[T]) => Iterator[U] |
sample(withReplacement,fraction, seed) | 根據fraction指定的比例對資料進行取樣,可以選擇是否使用隨機數進行替換,seed用於指定隨機數生成器種子 |
union(otherDataset) | 對源RDD和引數RDD求並集後返回一個新的RDD |
intersection(otherDataset) | 對源RDD和引數RDD求交集後返回一個新的RDD |
distinct([numTasks])) | 對源RDD進行去重後返回一個新的RDD |
groupByKey([numTasks]) | 在一個(K,V)的RDD上呼叫,返回一個(K, Iterator[V])的RDD |
reduceByKey(func,[numTasks]) | 在一個(K,V)的RDD上呼叫,返回一個(K,V)的RDD,使用指定的reduce函式,將相同key的值聚合到一起,與groupByKey類似,reduce任務的個數可以通過第二個可選的引數來設定 |
aggregateByKey(zeroValue)(seqOp,combOp, [numTasks]) | |
sortByKey([ascending], [numTasks]) | 在一個(K,V)的RDD上呼叫,K必須實現Ordered介面,返回一個按照key進行排序的(K,V)的RDD |
sortBy(func,[ascending],[numTasks]) | 與sortByKey類似,但是可以指定根據什麼排序 |
join(otherDataset,[numTasks]) | 在型別為(K,V)和(K,W)的RDD上呼叫,返回一個相同key對應的所有元素對在一起的(K,(V,W))的RDD |
cogroup(otherDataset,[numTasks]) | 在型別為(K,V)和(K,W)的RDD上呼叫,返回一個(K,(Iterable,Iterable))型別的RDD |
cartesian(otherDataset) | 笛卡爾積 |
coalesce(numPartitions) | 重新分割槽 |
repartition(numPartitions) | 重新分割槽 |
repartitionAndSortWithinPartitions(partitioner) | 重新分割槽 |
Action
Action | 含義 |
---|---|
reduce(func) | 通過func函式聚集RDD中的所有元素,這個功能必須是可交換且可並聯的 |
collect() | 在驅動程式中,以陣列的形式返回資料集的所有元素 |
count() | 返回RDD的元素個數 |
first() | 返回RDD的第一個元素(類似於take(1)) |
take(n) | 返回一個由資料集的前n個元素組成的陣列 |
takeSample(withReplacement,num, [seed]) | 返回一個數組,該陣列由從資料集中隨機取樣的num個元素組成,可以選擇是否用隨機數替換不足的部分,seed用於指定隨機數生成器種子 |
takeOrdered(n, [ordering]) | takeOrdered和top類似,只不過以和top相反的順序返回元素 |
saveAsTextFile(path) | 將資料集的元素以textfile的形式儲存到HDFS檔案系統或者其他支援的檔案系統,對於每個元素,Spark將會呼叫toString方法,將它裝換為檔案中的文字 |
saveAsSequenceFile(path) | 將資料集中的元素以Hadoopsequencefile的格式儲存到指定的目錄下,可以使HDFS或者其他Hadoop支援的檔案系統。 |
saveAsObjectFile(path) | |
countByKey() | 針對(K,V)型別的RDD,返回一個(K,Int)的map,表示每一個key對應的元素個數。 |
foreach(func) | 在資料集的每一個元素上,執行函式func進行更新。 |
八、運算元進階
-
map/mapPartitions
val rdd1 = sc.parallelize(List(1,2,3,4,5,6), 2) val rdd2 = rdd1.map(_ * 10) val rdd2 = rdd1.mapPartitions(_.map(_ * 10)) rdd2.collect
Array[Int] = Array(10, 20, 30, 40, 50, 60)
Array[Int] = Array(10, 20, 30, 40, 50, 60)
map運算元是將rdd中的每一個元素拿出來進行操作
mapPartitions運算元是將一整個分片中的資料拿出來操作,所以需要繼續對每一個分片中各個資料拿出來操作
rdd1.mapPartitions(_.toList.reverse.iterator).collect
此操作是將每一個分片中的資料翻轉
-
mapWith
引數列表:(constructA: Int => A, preservesPartitioning: Boolean = false)(f: (T, A) => U)
其中preservesPartitioning指定是否需要使用父RDD的分片
rdd1.mapWith(i => i*10)((a, b) => b+2).collect
Array[Int] = Array(2, 2, 2, 12, 12, 12)
mapWith運算元是將rdd的分片下標取出進行操作元組(a,b)中a指資料,b指該資料的下標
-
flatMapWith
引數列表:(constructA: Int => A, preservesPartitioning: Boolean = false)(f: (T, A) => Seq[U])
rdd1.flatMapWith(i => i, true)((x, y) => List((y, x))).collect
Array[(Int, Int)] = Array((0,1), (0,2), (0,3), (1,4), (1,5), (1,6))
flatMapWith運算元類似於mapWith,但是每一個輸入元素可以被對映為0或多個輸出元素
-
mapPartitionsWithIndex
引數列表:(f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false)
val func = (index: Int, iter: Iterator[(Int)]) => { iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator } val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2) rdd1.mapPartitionsWithIndex(func).collect
Array[String] = Array([partID:0, val: 1], [partID:0, val: 2], [partID:0, val: 3], [partID:0, val: 4], [partID:1, val: 5], [partID:1, val: 6], [partID:1, val: 7], [partID:1, val: 8], [partID:1, val: 9])
-
aggregate
引數列表:(zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
zeroValue:初始值;seqOp:單個分割槽的合併操作;combOp:所有分割槽的彙總操作
def func1(index: Int, iter: Iterator[(Int)]) : Iterator[String] = { iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator } val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2) rdd1.mapPartitionsWithIndex(func1).collect
Array[String] = Array([partID:0, val: 1], [partID:0, val: 2], [partID:0, val: 3], [partID:0, val: 4], [partID:1, val: 5], [partID:1, val: 6], [partID:1, val: 7], [partID:1, val: 8], [partID:1, val: 9])
rdd1.aggregate(0)(math.max(_, _), _ + _) rdd1.aggregate(5)(math.max(_, _), _ + _)
Int = 13 //首先在兩個分割槽中各自獲得最大值4、9,相加等於13
Int = 19 //首先在兩個分割槽中各自獲得最大值5、9,相加等於14,再加上初始值5等於19
val rdd2 = sc.parallelize(List("a","b","c","d","e","f"),2) def func2(index: Int, iter: Iterator[(String)]) : Iterator[String] = { iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator } rdd2.mapPartitionsWithIndex(func2).collect
Array[String] = Array([partID:0, val: a], [partID:0, val: b], [partID:0, val: c], [partID:1, val: d], [partID:1, val: e], [partID:1, val: f])
rdd2.aggregate("")(_ + _, _ + _) rdd2.aggregate("=")(_ + _, _ + _)
String = abcdef 或 String = defabc //字串拼接操作,在兩個分割槽中先各自拼接,最終的拼接時的順序是哪個分割槽先完成就哪個分割槽在前
String = ==def=abc 或 String = ==abc=def //同上,但是在拼接前先加上初始值"="
val rdd3 = sc.parallelize(List("12","23","345","4567"),2) rdd3.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y)
String = 42 或 String = 24
前一個引數列表 (x,y):第一次時分別代表初始值和分割槽中的第一個值,以後是分別代表上一次結果的值和分割槽中新的值
max(0,2) = 2, max(2,2) = 2
max(0,3) = 3, max(3,4) = 4
後一個引數列表(x,y):第一次是代表初始值與第一個分割槽的結果拼接,以後代表上一次的結果和新的分割槽的結果拼接
同上,由於不同分割槽的完成時間不同,結果會出現兩種情況
val rdd4 = sc.parallelize(List("12","23","345",""),2) rdd4.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)
String = 10 或 String = 01
關鍵在於"".length=0,“0”.length=1
val rdd5 = sc.parallelize(List("12","23","","345"),2) rdd5.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)
String = 11
-
aggregateByKey
相同的key進行操作
引數列表:(zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)
val pairRDD = sc.parallelize(List(("mouse", 2),("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12)), 2) def func2(index: Int, iter: Iterator[(String, Int)]) : Iterator[String] = { iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator } pairRDD.mapPartitionsWithIndex(func2).collect
Array[String] = Array([partID:0, val: (mouse,2)], [partID:0, val: (cat,2)], [partID:0, val: (cat,5)], [partID:1, val: (mouse,4)], [partID:1, val: (cat,12)], [partID:1, val: (dog,12)])
pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect pairRDD.aggregateByKey(100)(math.max(_, _), _ + _).collect
Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6)) // dog:12;cat:5+12;mouse:2+4
Array[(String, Int)] = Array((dog,100), (cat,200), (mouse,200)) // dog:100;cat:100+100;mouse:100+100
-
combineByKey
引數列表:(createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C)
val rdd1 = sc.textFile("hdfs://cdhnocms01:8020/userdata/wc.txt").flatMap(_.split(" ")).map((_, 1)) rdd1.combineByKey(x => x, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n).collect
Array[(String, Int)] = Array((word,2), (hello,2), (sql,1), (spark,3), (hadoop,2), (hi,1))
rdd1.combineByKey(x => x + 10, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n).collect
Array[(String, Int)] = Array((word,12), (hello,12), (sql,11), (spark,13), (hadoop,12), (hi,11))
// 對每一個value加10
val rdd4 = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3) val rdd5 = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3) val rdd6 = rdd5.zip(rdd4) rdd6.combineByKey(List(_), (x: List[String], y: String) => x :+ y, (m: List[String], n: List[String]) => m ++ n).collect
Array[(Int, List[String])] = Array((1,List(dog, cat, turkey)), (2,List(salmon, rabbit, wolf, bear, bee, gnu)))
-
countByKey / countByValue
val rdd1 = sc.parallelize(List(("a", 1), ("b", 2), ("b", 2), ("c", 2), ("c", 1))) rdd1.countByKey rdd1.countByValue
scala.collection.Map[String,Long] = Map(b -> 2, a -> 1, c -> 2) // 統計相同key出現的次數
scala.collection.Map[(String, Int),Long] = Map((b,2) -> 2, (c,2) -> 1, (a,1) -> 1, (c,1) -> 1) // 統計相同元素出現的次數
-
filterByRange
val rdd1 = sc.parallelize(List(("e", 5), ("c", 3), ("d", 4), ("c", 2), ("a", 1))) val rdd2 = rdd1.filterByRange("c", "d") rdd2.collect
Array[(String, Int)] = Array((c,3), (d,4), (c,2))
// 對給定的範圍進行過濾
-
flatMapValues
val rdd3 = sc.parallelize(List(("a", "1 2"), ("b", "3 4"))) rdd3.flatMapValues(_.split(" ")).collect
Array[(String, String)] = Array((a,1), (a,2), (b,3), (b,4))
對value進行相應的操作後壓頻
-
foldByKey
val rdd1 = sc.parallelize(List("dog", "wolf", "cat", "bear"), 2) val rdd2 = rdd1.map(x => (x.length, x)) val rdd3 = rdd2.foldByKey("")(_+_) rdd3.collect
Array[(Int, String)] = Array((4,bearwolf), (3,dogcat))
val rdd = sc.textFile("hdfs://cdhnocms01:8020/userdata/wc.txt").flatMap(_.split(" ")).map((_, 1)) rdd.foldByKey(0)(_+_).collect
Array[(String, Int)] = Array((word,2), (hello,2), (sql,1), (spark,3), (hadoop,2), (hi,1))
-
foreachPartition
val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3) rdd1.foreachPartition(x => println(x.reduce(_ + _)))
-
keyBy
val rdd1 = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3) val rdd2 = rdd1.keyBy(_.length) rdd2.collect
Array[(Int, String)] = Array((3,dog), (6,salmon), (6,salmon), (3,rat), (8,elephant))
// 將結果作為key-value的key
-
keys / values
val rdd1 = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2) val rdd2 = rdd1.map(x => (x.length, x)) rdd2.keys.collect rdd2.values.collect
Array[Int] = Array(3, 5, 4, 3, 7, 5) // 獲得key
Array[String] = Array(dog, tiger, lion, cat, panther, eagle) // 獲得value
-
collectAsMap
val rdd = sc.parallelize(List(("a", 1), ("b", 2))) rdd.collectAsMap
scala.collection.Map[String,Int] = Map(b -> 2, a -> 1)
-
repartition, coalesce, partitionBy
重新分割槽
val rdd1 = sc.parallelize(1 to 10, 3) val rdd2 = rdd1.coalesce(2, false) rdd2.partitions.length
-
checkpoint
sc.setCheckpointDir("hdfs://cdhnocms01:8020/userdata/cp") val rdd = sc.textFile("hdfs://cdhnocms01:8020/userdata/wc.txt").flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_) rdd.checkpoint rdd.isCheckpointed rdd.count rdd.isCheckpointed rdd.getCheckpointFile