Spark RDD基礎操作
阿新 • • 發佈:2018-11-15
標題 | 舉例 |
解釋 | Spark的基本資訊 |
Spark | 1個driver(膝上型電腦或者叢集閘道器機器上)和若干個executor(在各個節點上)組成。通過SparkContext(簡稱sc)連線Spark叢集、建立RDD、累加器(accumlator)、廣播變數(broadcast variables),簡單可以認為SparkContext是Spark程式的根本。 |
Driver | 把計算任務分成一系列小的task,然後送到executor執行。executor之間可以通訊,在每個executor完成自己的task以後,所有的資訊會被傳回。 |
RDD | Resilient Distributed Dataset(彈性分散式資料集,既可以儲存在本地,也可以儲存在叢集上,簡稱RDD).是一個包含諸多元素、被劃分到不同節點上進行並行處理的資料集合。在節點發生錯誤時RDD也可以自動恢復。RDD就像一個NumPy array或者一個Pandas Series,可以視作一個有序的item集合。只不過這些item並不存在driver端的記憶體裡,而是被分割成很多個partitions,每個partition的資料存在叢集的executor的記憶體中。 |
初始化RDD | 呼叫庫--設定路徑--初始化檔案為RDD檔案 |
呼叫Spark庫 | from pyspark import SparkContext sc = SparkContext('local', 'pyspark') #sc = SparkContext('spark://ha-nn-001:7077', 'pyspark') |
a.從本地記憶體中構造 | 1:makeRDD方法 val rdd01 = sc.makeRDD(List(1,2,3,4,5,6)) 2:parallelize方法 val rdd01 = sc.parallelize(List(1,2,3,4,5,6)) |
b.通過檔案系統構造,即將已有檔案初始化為RDD | |
b.1檔案路徑處理 | #使用os.path.join()拼接路徑 import os path=os.path.join('user','home','nave.txt') #path會自動返回user/home/nave.txt,好處是會根據不同的系統選擇連線符為/ 或 \ #直接拼路徑 import os cwd = os.getcwd() #記錄當前的python路徑 rdd = sc.textFile("file://" + cwd + "/names/yob1880.txt") #file:// 是告訴spark到本地去找文件 |
b.2 初始化本地檔案,初始化後每一行會被看成一個item | rdd = sc.textFile("file://" + cwd + "/names/yob1880.txt") #file:// 是告訴spark到本地去找文件 匯入整個文件,整個文件作為一個item rdd = sc.wholeTextFiles("file://" + cwd + "/names") |
b.2 初始化叢集HDFS上的檔案 | rdd = sc.textFile(cwd + "/names/yob1880.txt") |
檢視RDD資訊 | 檢視RDD檔案的內容 |
rdd :檢視RDD的開啟地址 | 直接輸入rdd檔名 |
rdd.first():顯示rdd的第一條item | rdd檔名.first() |
rdd.count():檢視rdd中的記錄數 | rdd檔名.count() |
transformation:轉化操作 | 僅僅是對RDD下達操作指令,Spark僅僅會記錄要進行的操作,並不執行操作,直到需要執行action指令時才會執行操作。 |
rdd.map(func): 對rdd中的每一條item執行func,並返回一個新的rdd檔案 |
quaresRDD = numbersRDD.map(lambda x: x**2) |
rdd.flatMap(): 對RDD中的item執行同一個操作以後得到一個list,然後以平鋪的方式把這些list裡所有的結果組成新的list類似append |
sentencesRDD = sc.parallelize(['Hello world', 'My name is Patrick']) wordsRDD = sentencesRDD.flatMap(lambda sentence: sentence.split(" ")) 結果:['Hello', 'world', 'My', 'name', 'is', 'Patrick'] 若使用map: wordsRDD = sentencesRDD.map(lambda sentence: sentence.split(" ")) 結果:['Hello', 'world', 'My', 'name', 'is', 'Patrick'] |
rdd.fiiter(func): 過濾功能,將所有符合函式條件的item組成一個新的list輸出 |
rddM = (rdd.filter(lambda x: x is not None and x.startswith('M'))) |
rdd.distinct(): 對RDD中的item去重 |
rdd檔名.distinct() |
rdd.sample(withReplacement, fraction, seed): 取樣函式 |
withReplacement:這個值如果是true時,採用PoissonSampler抽樣器(Poisson分佈),否則使用BernoulliSampler的抽樣器. Fraction:一個大於0,小於或等於1的小數值,用於控制要讀取的資料所佔整個資料集的概率. Seed:這個值如果沒有傳入,預設值是一個0~Long.maxvalue之間的整數. |
rdd1.union(rdd2): 所有rdd1和rdd2中的item組合 |
numbersRDD.union(moreNumbersRDD).collect() |
rdd1.intersection(rdd2): rdd1 和 rdd2的交集 |
numbersRDD.intersection(moreNumbersRDD).collect() |
rdd1.substract(rdd2): 所有在rdd1中但不在rdd2中的item(差集) |
numbersRDD.subtract(moreNumbersRDD).collect() |
rdd1.cartesian(rdd2): rdd1 和 rdd2中所有的元素笛卡爾乘積 |
numbersRDD.cartesian(moreNumbersRDD).collect() |
action:執行操作 | 輸出transformation的執行結果 |
rdd.collect(): 計算所有的items並返回所有的結果到driver端,接著 collect()會以Python list的形式返回結果 |
rdd.collect() |
rdd.first(): 和上面是類似的,不過只返回第1個item |
rdd.first() |
rdd.take(n): 類似,但是返回n個item |
rdd.take(n) |
rdd.count(): 計算RDD中item的個數 |
rdd.count() |
rdd.top(n): 返回頭n個items,按照自然結果排序 |
rdd.top(n) |
rdd.reduce(): 對RDD中的items做聚合 |
rdd = sc.parallelize(range(1,10+1)) rdd.reduce(lambda x, y: x + y) #實際上是對rdd裡面所有的元素進行求和,reduce 可以設定兩個未知數,並對兩個未知數進行處理 #處理方式,元素1=x,元素2=y,xy計算的結果作為x,元素3作為y,直到對所有的結果執行了操作 |
pair RDDs transformation操作 | 以元組形式組織的k-v對(key, value),叫做pair RDDs |
生成pair Tdd | rdd = sc.parallelize(["Hello hello", "Hello New York", "York says hello"]) resultRDD = ( rdd .flatMap(lambda sentence: sentence.split(" ")) # split into words .map(lambda word: word.lower()) # lowercase .map(lambda word: (word, 1)) # count each appearance .reduceByKey(lambda x, y: x + y) # add counts for each word .sortByKey() ) resultRDD.collect() |
reduceByKey(): 對所有有著相同key的items的value執行reduce操作 |
參照上例 |
groupByKey(): 返回類似(key, listOfValues)元組的RDD,後面的value List 是同一個key下面的 |
resultRDD.groupByKey().collect() |
sortByKey(): 按照key排序 |
參照上例 |
countByKey(): 按照key去對item個數進行統計 |
RDD.countByKey() |
collectAsMap(): 和collect有些類似,但是返回的是k-v的字典 |
|
join: 只合並具有相同鍵值的項,沒有相同的不顯示 |
homesRDD.join(lifeQualityRDD).collect() |