1. 程式人生 > >Spark(四)Spark 鍵值對操作

Spark(四)Spark 鍵值對操作

目錄:

4、鍵值對操作

4.1、建立PairRDD

4.2、PairRDD的轉化操作

4.2.1、聚合操作

4.2.2、資料分組

4.2.3、連線

4.2.4、資料排序

4.3、PairRDD的行動操作

4.4、資料分割槽

4.4.1、獲取RDD的分割槽方式

4.4.2、從分割槽中或獲益的操作

4.3.3、影響分割槽方式的操作

4.4.4、示例:PageRank


4、鍵值對操作

Spark為包含鍵值對型別的RDD提供了一些專有的操作。這些RDD被稱為pairRDD。PairRDD是很多程式的構成要素,因為它們提供了並行操作各個鍵和跨節點重新進行資料分組的操作介面。例如,pairRDD提供reduceByKey()方法,可以分別歸約每個鍵對應的資料,還有join()方法,可以把兩個RDD中的鍵相同的元素組合到一起,合併為一個RDD。

4.1、建立PairRDD

Java中由於沒有自帶的二元組型別,因此Spark的JavaAPI讓使用者使用scale.Tuple2類來建立二元組。可以通過new Tuple2(e1, e2)來建立一個新的二元組。並且可以通過 ._1() 和 ._2() 方法訪問其中的元素。

使用Java從記憶體資料建立pairRDD的話,需要使用SparkContext.parallelizePairs()。

在Java中使用第一個單詞作為鍵創建出一個pairRDD。

PairFunction<String, String, String> keyData = 
new PairFunction<String, String, String>() {
	@Override
	public Tuple2<String, String> call(String x) throws Exception {
		return new Tuple2(x.split(" "), x);
	}
};
JavaPairRDD<String, String> pairs = lines.mapToPair(keyData);

4.2、PairRDD的轉化操作

PairRDD可以使用所有標準RDD上的可用的轉化操作。由於pairRDD中包含二元組,所以需要傳遞的函式應當操作二元組而不是獨立的元素。

表4-1 :PairRDD的轉化操作(以鍵值對集合 {(1, 2), (3, 4), (3, 6)} 為例)

表4-2:針對兩個pair RDD的轉化操作(rdd =  {(1, 2), (3, 4), (3, 6)} other =  {(3, 9)} )

對第二個元素進行篩選(篩選掉長度超過20個字元的行)

Function<Tuple2<String, String>, Boolean> longWordFilter = 
				 new Function<Tuple2<String, String>, Boolean>(){
	@Override
public Boolean call(Tuple2<String, String> keyValue) throws Exception {
		return (keyValue._2.length() < 20);
	}
};
JavaPairRDD<String, String> result = pairs.filter(longWordFilter);

4.2.1、聚合操作

reduceByKey() 與 reduce() 相當類似;它們都接收一個函式,並使用該函式對值進行合併。reduceByKey() 會為資料集中的每個鍵進行並行的歸約操作,每個歸約操作會將鍵相同的值合併起來。因為資料集中可能有大量的鍵,所以 reduceByKey() 沒有被實現為向用戶程式返回一個值的行動操作。實際上,它會返回一個由各鍵和對應鍵歸約出來的結果值組成的新的 RDD。

foldByKey() 則與 fold() 相當類似;它們都使用一個與 RDD 和合並函式中的資料型別相同的零值作為初始值。與 fold() 一樣, foldByKey() 操作所使用的合併函式對零值與另一個元素進行合併,結果仍為該元素。

java實現單詞計數

1、Java版本jdk1.8以下

2、Java版本jdk1.8:可以使用lambda表示式簡化程式碼

如何處理每個元素的。由於 combineByKey() 會遍歷分割槽中的所有元素,因此每個元素的鍵要麼還沒有遇到過,要麼就 和之前的某個元素的鍵相同。

         如果這是一個新的元素,combineByKey()會使用一個叫做createCombiner()的函式來建立那個鍵對應的累加器的初始值。需要注意的是,這個過程會在每個分割槽中第一次出現各個鍵時發生,而不是在整個RDD中第一次出現各個鍵時發生。

         如果這是一個在處理當前分割槽之前已經遇到的鍵,它會使用mergeValue()方法將該鍵的累加器對應的當前值域新的值進行合併。

         由於每個分割槽都是獨立處理的,因此對於同一鍵可以有多個累加器。如果有兩個或者更多的分割槽都有對應同一鍵的累加器,就需要使用使用者提供的mergeCombiners()方法將各個分割槽的結果合併。

         combineByKey()有多個引數分別對應聚合操作的各個階段,因而非常適合用來解釋聚合操作各個階段的功能劃分。為了更好的演示combineByKey()是如何工作的,下面展示如何在java中計算各鍵對應的平均值。

在Java中使用combineByKey()求各個鍵對應的平均值

並行度調優:

Spark是怎樣確定如何分割工作的? 每個RDD都有固定數目的分割槽,分割槽數決定了在RDD上操作時的並行度。在執行聚合或分組操作時,可以要求Spark使用給定的分割槽數。Spark始終嘗試根據的大小推斷出一個有意義的預設值,對並行度進行調優來獲取更好的效能表現。

大多數操作符都能接受第二個引數,這個引數用來指定分組結果或聚合結果的RDD的分割槽數。

//設定並行度

sc.parallelize(data).reduceByKey(x, y -> x + y, 10)

Spark 提供了 repartition() 函式。它會把資料通過網路進行混洗,並創建出新的分割槽集合對資料進行重新分割槽是代價相對比較大的操作.Spark 中也有一個優化版的 repartition(),叫作 coalesce()。你可以使用 Java Scala 中的 rdd. partitions.size() 檢視 RDD 的分割槽數,並確保調 coalesce() 時將 RDD 合併到比現在的分割槽數更少的分割槽中。

4.2.2、資料分組

如果資料已經以預期的方式提取了鍵,groupByKey()就會使用RDD中的鍵來對資料進行分組。對一個由型別K的鍵和型別V的值組成的RDD,所得到的結果RDD型別會是[K, Iterable[V]]。

groupByKey()可以用於未成對的資料上,也可以根據鍵相同以外的條件進行分組,它可以接收一個函式,對源RDD中的每個元素使用該函式,將返回結果作為鍵再進行分組。

對單RDD 的資料進行分組,可使用 cogroup() 函式對多個共享同一個鍵的 RDD 進行分組。對兩個鍵的型別均為 K 而值的型別分別為V 和 W 的 RDD 進行 cogroup() 時,得到的結果 RDD 型別為 [(K, (Iterable[V], Iterable[W]))]。如果其中的 一個 RDD 對於另一個 RDD 中存在的某個鍵沒有對應的記錄,那麼對應的迭代器則為空。 cogroup() 提供了為多個 RDD 進行資料分組的方法。

cogroup() 不僅可以用於實現連線操作,還可以用來求鍵的交集。

4.2.3、連線

         將有鍵的資料與另一組有鍵的資料一起使用時對鍵值對資料執行的最有效的操作之一。連線資料可能是pairRDD最常用的操作之一。連線方式多種多樣:右外連線、左外連線、交叉連線以及內連線。

         普通的join操作符表示內連線。只有在兩個pairRDD中都存在的鍵才叫輸出。當一個輸入對應的某個鍵有多個值時,生成的pairRDD會包括來自兩個輸入RDD的每一組相對應的記錄。

         “連線”是資料庫術語,表示將兩張表根據資料相同的值來組合欄位。

         leftOuterJoin(other)和rightOuterJoin(other)都會根據鍵連線兩個RDD,但是允許結果中存在其中的一個pairRDD所缺失的鍵。

         使用leftOuterJoin()產生的pairRDD中,源RDD的每一個鍵都有對應的記錄。每一個鍵相應的值是由一個源RDD中的值與一個包含第二個RDD的值的Option(在Java中為Optional)物件組成的二元組。

4.2.4、資料排序

如果鍵有已定義的順序,就可以對這種鍵值對的RDD進行排序。當把資料排好序後,後續對資料進行collect()或save()等操作都會得到有序的資料。

我們經常要將RDD排序排列,因此sortByKey()函式接收一個叫做ascending的引數,表示我們是否要想讓結果按升序排序(預設為true)。以提供自定義的比較函式。

下面例子會將證書轉為字串,然後使用字串比較函式來對RDD進行排序。

class IntegerComparator implements Comparator<Integer> { 
	public int compare(Integer a, Integer b) { 
		return String.valueOf(a).compareTo(String.valueOf(b)); 
	} 
} 
rdd.sortByKey(comp);

4.3、PairRDD的行動操作

所有基礎RDD支援的傳統行動操作也都在pairRDD上可用。pairRDD提供一些額外的行動操作,可以讓我們充分利用資料的鍵值對特性。

pairRDD的行動操作(以鍵值對集合{(1, 2}, (3, 4), (3, 6)}為例)

函式

描述

示例

結果

countByKey()

對每個鍵對應的元素分別計數

rdd.countByKe()

{(1, 1), (3, 2)}

collectAsMap()

將結果以對映表的形式返回,以便查詢

rdd.collectAsMap()

Map{(1, 2),(3, 4),(3, 6)}

lookup(key)

返回給定鍵對應的所有值

rdd.lookup(3)

[4, 6]

4.4、資料分割槽

1. 什麼是分割槽

RDD 內部的資料集合在邏輯上(以及物理上)被劃分成多個小集合,這樣的每一個小集合被稱為分割槽。RDDprdd作為一個分散式的資料集,是分佈在多個worker節點上的。如下圖所示,RDD1有五個分割槽(partition),他們分佈在了四個worker nodes 上面,RDD2有三個分割槽,分佈在了三個worker nodes上面。

  

2. 為什麼要分割槽

分割槽的個數決定了平行計算的粒度。多個分割槽平行計算,能夠充分利用計算資源。

3. 如何手動分割槽

java的分割槽可以這樣(parallelize)

JavaRDDrdd = sc.parallelize(list, 2); // 這個是分割槽用了,指定建立得到的 RDD 分割槽個數為 2

pairs.partitions().size() 分割槽數量檢視

對資料在節點間的分割槽進行控制。在分散式程式中,通訊的代價是很大的,因此控制資料分佈以獲得最少的網路傳輸可以極大的提升整體效能和單點的程式需要為記錄選擇合適的資料結構一樣, Spark程式可以通過控制RDD分割槽的方式來減少通訊開銷。

只有當資料集多次在諸如連線這種基於鍵的操作中使用時,分割槽才會有幫助。Spark中所有的鍵對RDD都可以進行分割槽。系統會根據一個針對鍵的函式對元素進行分組。Spark可以確保同一組的鍵出現在同一個節點上。

你可以使用雜湊分割槽將一個RDD分成了100個分割槽,此時鍵的雜湊值對100取模的結果相同的記錄會被放在一個節點上。你也可以使用範圍分割槽法,將鍵在同一個範圍區間的記錄都放在同一個節點上。

預設情況下,連線操作會將兩個資料集中的所有鍵的雜湊值都求出來,將該雜湊值相同的記錄通過網路傳到同一臺機器上,然後再那臺機器上對所有鍵相同的記錄進行連線操作。因為userData表比每分鐘出現的訪問日誌表events要大得多,所有要浪廢時間做很多額外工作:在每次呼叫時都對userData表進行雜湊值計算和跨節點資料混洗,雖然這些從來都不會變化。

解決:在程式開始時,對userDat a表使用partitionBy()轉化操作,將這些錶轉為雜湊分割槽。可以通過向partitionBy()傳遞一個spark.HashPartirion物件來實現該操作。

4.4.1、獲取RDD的分割槽方式

在Scale和Java中可以使用RDD的partitioner屬性(java中使用partitioner()方法來獲取RDD的分割槽方式。

在Sparkshell中使用partitioner屬性不僅是檢驗各種Spark操作如何影響分割槽方式的一種好方法,還可以在你的程式中檢查想要使用的操作是否會生成正確的結果。

4.4.2、從分割槽中或獲益的操作

Spark的許多操作都引入了將資料根據鍵跨節點進行混洗的過程。所有這些操作都會 從資料分割槽中獲益(減少網路傳輸)。

對於像 reduceByKey() 這樣只作用於單個RDD 的操作,執行在未分割槽的 RDD 上的時候會導致每個鍵的所有對應值都在每臺機器上進行本地計算,只需要把本地最終歸約出的結果值從各工作節點傳回主節點,所以原本的網路開銷就不算大。

而對於諸如 cogroup() join() 這樣的二元操作,預先進行資料分割槽會導致其中至少一個 RDD(使用已知分割槽器的那個 RDD)不發生資料混洗。如果兩個 RDD 使用同樣的分割槽方式,並且它們還快取在 同樣的機器上(比如一個 RDD 是通過 mapValues() 從另一個 RDD 中創建出來的,這兩個 RDD 就會擁有相同的鍵和分割槽方式),或者其中一個 RDD 還沒有被計算出來,那麼跨節 點的資料混洗就不會發生了。

4.3.3、影響分割槽方式的操作

Spark 內部知道各操作會如何影響分割槽方式,並將會對資料進行分割槽的操作的結果 RDD 自動設定為對應的分割槽器。

轉化操作的結果並不一定會按已知的分割槽方式分割槽,這時輸出的 RDD 可能就會沒有設定分割槽器。

另外兩個操作 mapValues() 和 flatMapValues() 作為替代方法,它們可以保證每個二元組的鍵保持不變。

其他所有的操作生成的結果都不會存在特定的分割槽方式。

4.4.4、示例:PageRank

PageRank是一種從RDD分割槽中獲益的更復雜的演算法。是用來根據外部文件指向一個文件的連結,對集合中每個文件的重要程度賦一個度量值。該演算法可以用於對網頁進行排序,當然,也可以用於排序科技文章或社交網路中有影響的使用者。

PageRank 是執行多次連線的一個迭代演算法,因此它是 RDD 分割槽操作的一個很好的用例。演算法會維護兩個資料集:一個由 (pageID, linkList) 的元素組成,包含每個頁面的相鄰頁面的列表;另一個由 (pageID, rank) 元素組成,包含每個頁面的當前排序值。它按如下步驟進行計算。

(1) 將每個頁面的排序值初始化為 1.0 。

(2) 在每次迭代中,對頁面 p ,向其每個相鄰頁面(有直接連結的頁面)傳送一個值為rank(p)/numNeighbors(p) 的貢獻值。

(3) 將每個頁面的排序值設為 0.15 + 0.85 * contributionsReceived 。