1. 程式人生 > 實用技巧 >在 64M 以下使用 jupyter notebook 編寫 Python3 程式碼(用 rpyc 取代 ipykernel)

在 64M 以下使用 jupyter notebook 編寫 Python3 程式碼(用 rpyc 取代 ipykernel)

技術標籤:Spark

RDD

一、RDD建立

(1)從檔案系統中載入資料

SparkContext通過testFile()讀取資料生成記憶體中的RDD

textFile()支援的資料型別非常多,既可以從本地檔案系統去載入,也可以從分散式檔案系統HDFS中載入,也可以從雲端載入。

val lines = sc.textFile("file:///usr/cueb/00lifeng/airdelay_small.csv")  //三個斜槓表示要讀取本地檔案
//系統會自動把讀取進來的每一行生成一個RDD元素
val lines = sc.textFile("file://localhost:9000/zhangxinxin/aird.csv"
) //兩個斜槓代表讀取HDFS中的檔案 val lines = sc.textFile("/zhangxinxin/aird.csv") val lines = sc.textFile("aird.csv") //上述三條語句等價

(2)通過並行集合(陣列、列表等)

呼叫SparkContext物件的Parallelize()方法

val array = Array(1,2,3,4,5)  //array中的每個元素都對應著一個RDD元素
val rdd = sc.parallelize(array)
val list = List(1,2,3,4,5)  //list中的每個元素都對應著一個RDD元素
val rdd = sc.parallelize(list)

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-lijYKkNe-1608863973309)(C:\Users\llll\AppData\Roaming\Typora\typora-user-images\1604582139455.png)]

二、RDD操作

RDD的操作大致可以分為兩種,Transformation操作和Action操作。

1.Transformation操作

只記錄轉換軌跡,不進行具體操作。

常用轉換操作:filter,map,flatmap,groupByKey,reduceByKey

filter(func)過濾

這是一個高階函式,篩選出滿足函式func的元素,並返回一個新的資料集

val lines = sc.textFile("file://localhost:9000/zhangxinxin/aird.csv")
val linesWithPlane = lines.filter(line => line.contains("Plane"))

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-Y2jbRJ73-1608863973315)(C:\Users\llll\AppData\Roaming\Typora\typora-user-images\1604582608536.png)]

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-W2oU3EYq-1608863973317)(C:\Users\llll\AppData\Roaming\Typora\typora-user-images\1604582703887.png)]

map(func)對映

這是一個高階函式,將每個元素傳遞到函式func中,並將結果返回為一個新的資料集

val array = Array(1,2,3,4,5)
val rdd1 = sc.parallelize(array)
val rdd2 = rdd1.map(x => x+10)
val lines = sc.textFile("file:///usr/cueb/zhangxinxin/word.txt")
val word = rdd1.map(line => line.split(" "))  //拆分後生成的結果都會保留到陣列中
//新生成的word裡有三個元素,每個元素都是一個數組

flatMap(func)

map()相似,但是每個輸入元素都可以對映到0或多個輸出結果

val lines = sc.textFile("file:///usr/cueb/zhangxinxin/word.txt")
val word = rdd1.flatMap(line => line.split(" ")) 

groupByKey()

應用於(K,V)鍵值對的資料集時,返回一個新的(K,Iterable)形式的資料集,即(key,value list),一個key對應著一個值的列表

reduceByKey()

應用於(K,V)鍵值對的資料集時,返回一個新的(K,V)形式的資料集,其中每個值是將每個key傳遞到函式func中進行聚合後的結果

2.Action操作

count()

返回資料集中元素的個數

collect()

以陣列形式返回資料集中所有元素

first()

返回資料集中第一個元素

take(n)

以陣列的形式返回資料集中的前n個元素

reduce(func)

通過函式func(輸入兩個引數並返回一個值)聚合資料集中的元素

foreach()

遍歷

val rdd = sc.parallelize(Array(1,2,3,4,5))
rdd.count()
rdd.first
rdd.reduce((a,b)=>a+b)
rdd.foreach(elem=>println(elem))

三、RDD持久化

Spark允許將一些需要反覆使用的資料集持久化到記憶體中,不需要寫磁碟。

val list = List("Hadoop","Spark","Hive")
val rdd = sc.parallelize(list)
println(rdd.count())  //動作操作,觸發一次真正從頭到尾的計算
println(rdd.collect().mkString(","))  //動作操作,觸發一次真正從頭到尾的計算

為什麼一個Spark應用程式會被分為很多個job?

每次動作操作發生後,都會觸發一次從頭到尾的計算,每次從頭到尾的計算都生成一個job

persist()對一個RDD標記為持久化

當呼叫了persist()方法以後,並不會馬上計算生成RDD並將它持久化

只有在遇到動作型別操作以後,才會真正持久化

persist(MEMORY_ONLY)將資料快取到記憶體中,如果記憶體不足,則去掉一部分舊資料,將新的放進來

persist(MEMORY_AND_DISK)將資料快取到記憶體中,如果記憶體不足,則放到底層硬碟中

因為persist(MEMORY_ONLY)常見,所以簡化為了.cache()方法

.unpersist()可以手動地把持久化的RDD從快取中移除

val list = List("Hadoop","Spark","Hive")
val rdd = sc.parallelize(list)
rdd.cache()  //會呼叫persist(MEMORY_ONLY),但是語句執行到這裡,並不會快取rdd,因為rdd還沒有被計算生成
println(rdd.count())  //第一次動作操作,觸發一次從頭到尾的計算,這時上面的rdd.cache()才會被執行,把這個RDD放到快取中
println(rdd.collect().mkString(","))  //第二次動作操作,不需要觸發從頭到尾的計算,只需要重複使用上述快取中的RDD

四、RDD分割槽

1.RDD分割槽的作用與原則

作用:(1)增加程式的並行度實現分散式的計算 (2)減少通訊開銷

原則:(1)使分割槽個數儘量等於叢集中的CPU核心數目

2.RDD分割槽的方法

sc.textFile(path,partitionNum)指定分割槽個數

val array = Array(1,2,3,4,5)
val rdd = sc.parallelize(array,2)  //設定兩個分割槽

.reparitition可以重新設定分割槽個數

val data = sc.textFile("file:///home/cueb/zhangxinxin/aird.csv",2)
data.partitions.size   //顯示data這個RDD的分割槽數量
val rdd = data.repartition(1)  //對data這個RDD進行分割槽
rdd.partitions.size  
//自定義分割槽
//繼承org.apache.spark.Partitioner
//覆蓋numPartitions:Int:Int  返回創建出的分割槽數
//getPartition(key:Any):Int  返回給定鍵的分割槽編號(0到numPartitions-1)
//equals()  Java判斷相等性的標準方法
//根據key值的最後一位數字,寫到不同的檔案
import org.apache.spark.{Partitioner,SparkContext,SparkConf}
class MyPartitioner(numParts:Int) extends Partitioner{
    override def numPartitions:Int = numParts
    override def getPartition(key:Any):Int = {
        key.toSting.toInt % 10
    }
}

object MyPartitioner{
    def main(args:Array[String]){
        val conf = new SparkConf()
        val sc = new SparkContext(conf)
        val data = sc.parallelize(1 to 10,5)  //模擬五個分割槽的資料
        data.map((_,1)).partitionBy(new MyPartitioner(10).map(_,_1).saveAsTextFile("file:///home/cueb/zhangxinxin/output1"))
    }
}

五、鍵值對RDD

1.鍵值對RDD的建立

1.從檔案中讀取

val lines = sc.textFile("file:///home/cueb/zhangxinxin/word.txt")
val pairRDD = lines.flatMap(line => line.split(" ")).map(word => (word,1))
pairRDD.foreach(println)

2.通過並行集合建立

val list = List("Hadoop","Spark","Hive","Spark")
val rdd = sc.parallelize(list)
val pairRDD = rdd.map(word => (word,1))
pairRDD.foreach(println)

2.常用的鍵值對RDD轉換操作

reduceByKey(func)

使用func函式合併具有相同鍵的值

groupByKey()

把具有相同鍵的值進行分組

val words = Array("one","two","two","three","three","three")
val wordPairsRDD = sc.parallelize(words).map(word => (word,1))
val wordCountsWithReduce = wordPairsRDD.reduceByKey(_+_)
val wordCountsWithGroup = wordPairsRDD.groupByKey().map(t => (t._1,t._2.sum))

keys

把Pair RDD中的key返回形成一個新的RDD

val words = Array("Hive","Hadoop","Spark","Scala")
val pairRDD = sc.parallelizx(words).map(word => (word,1))
pairRDD.keys
pairRDD.keys.foreach(println)

values

把Pair RDD中的value返回形成一個新的RDD

val words = Array("Hive","Hadoop","Spark","Scala")
val pairRDD = sc.parallelizx(words).map(word => (word,1))
pairRDD.values
pairRDD.values.foreach(println)

sortByKey()

返回一個根據鍵排序的RDD

pairRDD.sortByKey()  //預設為true,即升序排序,可以指定為false,則為降序
pairRDD.sortByKey(false)  
val d1 = sc.parallelize(Array(("c",8),("b",25),("d",3),("a",35),("g",2423),("e",232),("b",25),("n",25),("t",55),("g",265),("a",225),("w",15),("b",25),("b",25)))
d1.groupByKey().map(t => (t._1,t._2.sum)).sortByKey().collect()
//上面一行等價於
d1.reduceByKey(_+_).sortByKey().collect()

sortBy()與sortBy()的區別

val d2 = sc.parallelize(Array(("c",8),("b",25),("d",3),("a",35),("g",2423),("e",232),("b",25),("n",25),("t",55),("g",265),("a",225),("w",15),("b",25),("b",25)))
d2.reduceByKey(_+_).sortBy(_._2,false).collect()

mapValues(func)

把鍵值對RDD中的每個value都應用於一個函式,key不會發生變化

pairRDD.mapValues(x => x+1).foreach(println)

join

把幾個RDD當中元素key相同的進行連線

val pairRDD1 = sc.parallelize(Array(("Spark",1),("Scala",2),("Hive",3),("Hadoop",4)))
val pairRDD2 = sc.parallelize(Array(("Spark","fast")))
pairRDD1.join(pairRDD2).collect() // 1和fast會組合起來

combineByKey

日後再續

3.一個綜合例項

key為圖書名稱

value為某天圖書銷量

任務為計算每個key對應的平均值,即每種圖書每天的平均銷量

val rdd = sc.parallelize(Array(("Spark",2),("Hadoop",6),("Hadoop",20),("Spark",600)))
rdd.mapValues(x => (x,1)).reduceByKey((x,y) => (x._1 + y._1,x._2 + y._2)).mapValues(x => x._1 / x._2).collect()

六、資料讀寫

1.檔案資料讀寫

(1)本地檔案系統的資料讀寫

val textFile = sc.textFile("file:///home/cueb/zhangxinxin/word.txt")  //讀檔案
//即使輸入了錯誤的地址,spark-shell也不會報錯,只有當有Action操作時才會真的報錯
textFile.savaAsTextFile("file:///home/cueb/zhangxinxin")  //寫檔案
//寫的時候是指定一個目錄,不是指定一個具體檔案
val textFile = sc.textFile("file:///home/cueb/zhangxinxin") //當目錄作為輸入的時候,可以讀取所有檔案生成一個RDD

(2)分散式檔案系統HDFS的資料讀寫

val textFile = sc.textFile("hdfs://localhost:9000/zhangxinxin/aird.csv")
textFile.first()  //基本與本地檔案讀取相同

(3)JSON檔案的資料讀寫

val jsonStr = sc.textFile("file:///home/cueb/zhangxinxin/testjson.json")
jsonStr.foreach(println)

Scala中自帶一個json解析庫,scala.util.parsing.json.JSON

其中的方法JSON.parseFULL(jsonString:String)可以用於解析,它可以接受一行一行的json文字

如果解析成功,它會將結果封裝在Some(map:Map[String,Any])物件中返回,如果解析失敗,就返回None