在 64M 以下使用 jupyter notebook 編寫 Python3 程式碼(用 rpyc 取代 ipykernel)
技術標籤:Spark
RDD
一、RDD建立
(1)從檔案系統中載入資料
SparkContext通過testFile()
讀取資料生成記憶體中的RDD
textFile()
支援的資料型別非常多,既可以從本地檔案系統去載入,也可以從分散式檔案系統HDFS中載入,也可以從雲端載入。
val lines = sc.textFile("file:///usr/cueb/00lifeng/airdelay_small.csv") //三個斜槓表示要讀取本地檔案
//系統會自動把讀取進來的每一行生成一個RDD元素
val lines = sc.textFile("file://localhost:9000/zhangxinxin/aird.csv" ) //兩個斜槓代表讀取HDFS中的檔案
val lines = sc.textFile("/zhangxinxin/aird.csv")
val lines = sc.textFile("aird.csv")
//上述三條語句等價
(2)通過並行集合(陣列、列表等)
呼叫SparkContext
物件的Parallelize()
方法
val array = Array(1,2,3,4,5) //array中的每個元素都對應著一個RDD元素
val rdd = sc.parallelize(array)
val list = List(1,2,3,4,5) //list中的每個元素都對應著一個RDD元素
val rdd = sc.parallelize(list)
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-lijYKkNe-1608863973309)(C:\Users\llll\AppData\Roaming\Typora\typora-user-images\1604582139455.png)]
二、RDD操作
RDD的操作大致可以分為兩種,Transformation操作和Action操作。
1.Transformation操作
只記錄轉換軌跡,不進行具體操作。
常用轉換操作:filter,map,flatmap,groupByKey,reduceByKey
filter(func)過濾
這是一個高階函式,篩選出滿足函式func的元素,並返回一個新的資料集
val lines = sc.textFile("file://localhost:9000/zhangxinxin/aird.csv")
val linesWithPlane = lines.filter(line => line.contains("Plane"))
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-Y2jbRJ73-1608863973315)(C:\Users\llll\AppData\Roaming\Typora\typora-user-images\1604582608536.png)]
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-W2oU3EYq-1608863973317)(C:\Users\llll\AppData\Roaming\Typora\typora-user-images\1604582703887.png)]
map(func)對映
這是一個高階函式,將每個元素傳遞到函式func中,並將結果返回為一個新的資料集
val array = Array(1,2,3,4,5)
val rdd1 = sc.parallelize(array)
val rdd2 = rdd1.map(x => x+10)
val lines = sc.textFile("file:///usr/cueb/zhangxinxin/word.txt")
val word = rdd1.map(line => line.split(" ")) //拆分後生成的結果都會保留到陣列中
//新生成的word裡有三個元素,每個元素都是一個數組
flatMap(func)
與map()
相似,但是每個輸入元素都可以對映到0或多個輸出結果
val lines = sc.textFile("file:///usr/cueb/zhangxinxin/word.txt")
val word = rdd1.flatMap(line => line.split(" "))
groupByKey()
應用於(K,V)鍵值對的資料集時,返回一個新的(K,Iterable)形式的資料集,即(key,value list),一個key對應著一個值的列表
reduceByKey()
應用於(K,V)鍵值對的資料集時,返回一個新的(K,V)形式的資料集,其中每個值是將每個key傳遞到函式func中進行聚合後的結果
2.Action操作
count()
返回資料集中元素的個數
collect()
以陣列形式返回資料集中所有元素
first()
返回資料集中第一個元素
take(n)
以陣列的形式返回資料集中的前n個元素
reduce(func)
通過函式func(輸入兩個引數並返回一個值)聚合資料集中的元素
foreach()
遍歷
val rdd = sc.parallelize(Array(1,2,3,4,5))
rdd.count()
rdd.first
rdd.reduce((a,b)=>a+b)
rdd.foreach(elem=>println(elem))
三、RDD持久化
Spark允許將一些需要反覆使用的資料集持久化到記憶體中,不需要寫磁碟。
val list = List("Hadoop","Spark","Hive")
val rdd = sc.parallelize(list)
println(rdd.count()) //動作操作,觸發一次真正從頭到尾的計算
println(rdd.collect().mkString(",")) //動作操作,觸發一次真正從頭到尾的計算
為什麼一個Spark應用程式會被分為很多個job?
每次動作操作發生後,都會觸發一次從頭到尾的計算,每次從頭到尾的計算都生成一個job
persist()
對一個RDD標記為持久化
當呼叫了persist()
方法以後,並不會馬上計算生成RDD並將它持久化
只有在遇到動作型別操作以後,才會真正持久化
persist(MEMORY_ONLY)
將資料快取到記憶體中,如果記憶體不足,則去掉一部分舊資料,將新的放進來
persist(MEMORY_AND_DISK)
將資料快取到記憶體中,如果記憶體不足,則放到底層硬碟中
因為persist(MEMORY_ONLY)
常見,所以簡化為了.cache()
方法
.unpersist()
可以手動地把持久化的RDD從快取中移除
val list = List("Hadoop","Spark","Hive")
val rdd = sc.parallelize(list)
rdd.cache() //會呼叫persist(MEMORY_ONLY),但是語句執行到這裡,並不會快取rdd,因為rdd還沒有被計算生成
println(rdd.count()) //第一次動作操作,觸發一次從頭到尾的計算,這時上面的rdd.cache()才會被執行,把這個RDD放到快取中
println(rdd.collect().mkString(",")) //第二次動作操作,不需要觸發從頭到尾的計算,只需要重複使用上述快取中的RDD
四、RDD分割槽
1.RDD分割槽的作用與原則
作用:(1)增加程式的並行度實現分散式的計算 (2)減少通訊開銷
原則:(1)使分割槽個數儘量等於叢集中的CPU核心數目
2.RDD分割槽的方法
sc.textFile(path,partitionNum)
指定分割槽個數
val array = Array(1,2,3,4,5)
val rdd = sc.parallelize(array,2) //設定兩個分割槽
.reparitition
可以重新設定分割槽個數
val data = sc.textFile("file:///home/cueb/zhangxinxin/aird.csv",2)
data.partitions.size //顯示data這個RDD的分割槽數量
val rdd = data.repartition(1) //對data這個RDD進行分割槽
rdd.partitions.size
//自定義分割槽
//繼承org.apache.spark.Partitioner
//覆蓋numPartitions:Int:Int 返回創建出的分割槽數
//getPartition(key:Any):Int 返回給定鍵的分割槽編號(0到numPartitions-1)
//equals() Java判斷相等性的標準方法
//根據key值的最後一位數字,寫到不同的檔案
import org.apache.spark.{Partitioner,SparkContext,SparkConf}
class MyPartitioner(numParts:Int) extends Partitioner{
override def numPartitions:Int = numParts
override def getPartition(key:Any):Int = {
key.toSting.toInt % 10
}
}
object MyPartitioner{
def main(args:Array[String]){
val conf = new SparkConf()
val sc = new SparkContext(conf)
val data = sc.parallelize(1 to 10,5) //模擬五個分割槽的資料
data.map((_,1)).partitionBy(new MyPartitioner(10).map(_,_1).saveAsTextFile("file:///home/cueb/zhangxinxin/output1"))
}
}
五、鍵值對RDD
1.鍵值對RDD的建立
1.從檔案中讀取
val lines = sc.textFile("file:///home/cueb/zhangxinxin/word.txt")
val pairRDD = lines.flatMap(line => line.split(" ")).map(word => (word,1))
pairRDD.foreach(println)
2.通過並行集合建立
val list = List("Hadoop","Spark","Hive","Spark")
val rdd = sc.parallelize(list)
val pairRDD = rdd.map(word => (word,1))
pairRDD.foreach(println)
2.常用的鍵值對RDD轉換操作
reduceByKey(func)
使用func函式合併具有相同鍵的值
groupByKey()
把具有相同鍵的值進行分組
val words = Array("one","two","two","three","three","three")
val wordPairsRDD = sc.parallelize(words).map(word => (word,1))
val wordCountsWithReduce = wordPairsRDD.reduceByKey(_+_)
val wordCountsWithGroup = wordPairsRDD.groupByKey().map(t => (t._1,t._2.sum))
keys
把Pair RDD中的key返回形成一個新的RDD
val words = Array("Hive","Hadoop","Spark","Scala")
val pairRDD = sc.parallelizx(words).map(word => (word,1))
pairRDD.keys
pairRDD.keys.foreach(println)
values
把Pair RDD中的value返回形成一個新的RDD
val words = Array("Hive","Hadoop","Spark","Scala")
val pairRDD = sc.parallelizx(words).map(word => (word,1))
pairRDD.values
pairRDD.values.foreach(println)
sortByKey()
返回一個根據鍵排序的RDD
pairRDD.sortByKey() //預設為true,即升序排序,可以指定為false,則為降序
pairRDD.sortByKey(false)
val d1 = sc.parallelize(Array(("c",8),("b",25),("d",3),("a",35),("g",2423),("e",232),("b",25),("n",25),("t",55),("g",265),("a",225),("w",15),("b",25),("b",25)))
d1.groupByKey().map(t => (t._1,t._2.sum)).sortByKey().collect()
//上面一行等價於
d1.reduceByKey(_+_).sortByKey().collect()
sortBy()與sortBy()的區別
val d2 = sc.parallelize(Array(("c",8),("b",25),("d",3),("a",35),("g",2423),("e",232),("b",25),("n",25),("t",55),("g",265),("a",225),("w",15),("b",25),("b",25)))
d2.reduceByKey(_+_).sortBy(_._2,false).collect()
mapValues(func)
把鍵值對RDD中的每個value都應用於一個函式,key不會發生變化
pairRDD.mapValues(x => x+1).foreach(println)
join
把幾個RDD當中元素key相同的進行連線
val pairRDD1 = sc.parallelize(Array(("Spark",1),("Scala",2),("Hive",3),("Hadoop",4)))
val pairRDD2 = sc.parallelize(Array(("Spark","fast")))
pairRDD1.join(pairRDD2).collect() // 1和fast會組合起來
combineByKey
日後再續
3.一個綜合例項
key為圖書名稱
value為某天圖書銷量
任務為計算每個key對應的平均值,即每種圖書每天的平均銷量
val rdd = sc.parallelize(Array(("Spark",2),("Hadoop",6),("Hadoop",20),("Spark",600)))
rdd.mapValues(x => (x,1)).reduceByKey((x,y) => (x._1 + y._1,x._2 + y._2)).mapValues(x => x._1 / x._2).collect()
六、資料讀寫
1.檔案資料讀寫
(1)本地檔案系統的資料讀寫
val textFile = sc.textFile("file:///home/cueb/zhangxinxin/word.txt") //讀檔案
//即使輸入了錯誤的地址,spark-shell也不會報錯,只有當有Action操作時才會真的報錯
textFile.savaAsTextFile("file:///home/cueb/zhangxinxin") //寫檔案
//寫的時候是指定一個目錄,不是指定一個具體檔案
val textFile = sc.textFile("file:///home/cueb/zhangxinxin") //當目錄作為輸入的時候,可以讀取所有檔案生成一個RDD
(2)分散式檔案系統HDFS的資料讀寫
val textFile = sc.textFile("hdfs://localhost:9000/zhangxinxin/aird.csv")
textFile.first() //基本與本地檔案讀取相同
(3)JSON檔案的資料讀寫
val jsonStr = sc.textFile("file:///home/cueb/zhangxinxin/testjson.json")
jsonStr.foreach(println)
Scala中自帶一個json解析庫,scala.util.parsing.json.JSON
其中的方法JSON.parseFULL(jsonString:String)
可以用於解析,它可以接受一行一行的json文字
如果解析成功,它會將結果封裝在Some(map:Map[String,Any])
物件中返回,如果解析失敗,就返回None