1. 程式人生 > >spark運算元系列

spark運算元系列

一.Action操作

1.first:返回rdd中的以一個元素

scala> var rdd = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
scala> rdd.first
//輸出結果為:(A,1)
scala> var rdd = sc.makeRDD(Seq(10, 3, 1, 13, 6))
scala> rdd.first
//輸出結果為:10
2.count:返回rdd中的元素數量

scala> var rdd = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
scala>rdd.count
//輸出結果為:3
3.reduce():根據需求對rdd裡的元素進行運算,返回結果

scala>var rdd=sc.makeRDD(Array(("A",2),("A",5),("B",2),("C",3))
scala>varrdd.reduce((x,y)=>{(x._1+y._1,x._2+y._2)})
//輸出結果為(CBAA,12)
scala>var rdd1=sc.makeRDD(1 to 10,2)
scala>rdd1.reduce(_+_)
//輸出結果為1到10相加的和
4.collect:將一個rdd轉換成陣列

scala> var rdd1 = sc.makeRDD(1 to 3,2)
scala> rdd1.collect
//結果為Array(1,2,3)
5.take(n):獲得1到n之間的元素(不排序)

scala> var rdd = sc.makeRDD(Seq(10, 3, 1, 13, 6))
scala>rdd.take(3)
//輸出結果為Array(10,3,1)
6.top (n):預設降序返回n個元素 

   takeOrdered(n):按照與top相反返回n個元素

scala> var rdd = sc.makeRDD(Seq(10, 3, 1, 13, 6))
scala>rdd.top(3)
//返回結果為Array(13,10,6)
scala>rdd.takeOrdered(3)
//此時輸出結果為Array(1,3,6)
7.lookup:lookup用於(K,V)型別的RDD,指定K值,返回RDD中該K對應的所有V值。
scala>var rdd=sc.makeRDD(Array(("A",2),("A",5),("B",2),("C",3))
rdd.lookup("A")
//輸出為:(2,5)
8.countByKey:統計RDD[K,V]中每個K的數量

scala>var rdd=sc.makeRDD(Array(("A",2),("A",5),("B",2),("C",3))
scala>rdd.countByKey
//輸出結果為scala.collection.Map[String,Long] = Map(A -> 2, B -> 1,C ->1)
9.sortby:根據給定的排序k函式將RDD中的元素進行排序。

scala> var rdd1 = sc.makeRDD(Seq(3,6,7,1,2,0),2) 
scala> rdd1.sortBy(x => x).collect 
//輸出結果為 Array(0, 1, 2, 3, 6, 7) //預設升序 
 scala> rdd1.sortBy(x => x,false).collect 
//輸出結果為:Array(7, 6, 3, 2, 1, 0) //降序 
 //RDD[K,V]型別
scala>var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7))) 
scala> rdd1.sortBy(x => x).collect 
//輸出結果為Array((A,1), (A,2), (B,3), (B,6), (B,7)) 
scala> rdd1.sortBy(x => x._2,false).collect 
//輸出結果為:Array((B,7), (B,6), (B,3), (A,2), (A,1))
10.saveAsTextFile:以Text型別儲存到指定路徑

scala>var rdd1 = sc.makeRDD(1 to 10,2)
scala> rdd1.saveAsTextFile("hdfs://user/tmp/zhangjiaxin/")]//以text形式將檔案儲存到hdfs
scala>rdd1.saveAsTextFile("file:///user/tmp/zhangjiaxin/")//將檔案儲存到本地
scala> rdd1.saveAsTextFile("hdfs://user/tmp/zhangjiaxin/"),classOf[com.hadoop.compression.lzo.LzopCodec]////指定壓縮格式儲存
scala>rdd1.saveAsSequenceFile("hdfs://user/tmp/zhangjiaxin/")]//以Sequence形式將檔案儲存到hdfs
scala>rdd1.saveAssaveAsObjectFile("hdfs://user/tmp/zhangjiaxin/")]//以saveAsObject形式將檔案儲存到hdfs
11.saveAsHadoopDaraset將檔案儲存到HDFS中

import org.apache.spark.SparkConf 
import org.apache.spark.SparkContext 
import SparkContext._ 
import org.apache.hadoop.mapred.TextOutputFormat 
import org.apache.hadoop.io.Text 
import org.apache.hadoop.io.IntWritable 
import org.apache.hadoop.mapred.JobConf
 
var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7))) 
var jobConf = new JobConf() 
jobConf.setOutputFormat(classOf[TextOutputFormat[Text,IntWritable]]) 
jobConf.setOutputKeyClass(classOf[Text]) 
jobConf.setOutputValueClass(classOf[IntWritable]) 
jobConf.set("mapred.output.dir","/tmp/zhang/") 
rdd1.saveAsHadoopDataset(jobConf)
 
12.資料儲存到HBase

import org.apache.spark.SparkConf 
import org.apache.spark.SparkContext 
import SparkContext._ 
import org.apache.hadoop.mapred.TextOutputFormat 
import org.apache.hadoop.io.Text 
import org.apache.hadoop.io.IntWritable 
import org.apache.hadoop.mapred.JobConf 
import org.apache.hadoop.hbase.HBaseConfiguration 
import org.apache.hadoop.hbase.mapred.TableOutputFormat 
import org.apache.hadoop.hbase.client.Put 
import org.apache.hadoop.hbase.util.Bytes 
import org.apache.hadoop.hbase.io.ImmutableBytesWritable 
 
var conf = HBaseConfiguration.create() 
var jobConf = new JobConf(conf) 
jobConf.set("hbase.zookeeper.quorum","zkNode1,zkNode2,zkNode3") 
jobConf.set("zookeeper.znode.parent","/hbase") 
jobConf.set(TableOutputFormat.OUTPUT_TABLE,"hyxy") 
jobConf.setOutputFormat(classOf[TableOutputFormat]) 
 
var rdd1 = sc.makeRDD(Array(("A",2),("B",6),("C",7))) 
rdd1.map(x => { 
var put = new Put(Bytes.toBytes(x._1)) 
 put.add(Bytes.toBytes("f1"), Bytes.toBytes("c1"), Bytes.toBytes(x._2)) 
(new ImmutableBytesWritable,put) 
 } 
).saveAsHadoopDataset(jobConf)
二.轉換操作

1.map 有多少輸入分割槽就有多少輸出分割槽

hadoop fs -cat /user/tmp/a.txt
hello,world
hello,scala
hello,spark
 
//讀取檔案到RDD
var data=sc.textFile("/user/tmp/a.txt")
var result=data.map(line=>line.split(","))//使用map運算元呼叫.split方法按照,進行切分
result.collect
//執行結果為Array[Array[String]] = Array(Array(hello, world), Array(hello, scala), Array(hello, spark))
2.flatMap 與map大致相同 只不過把輸出結果合併為一個分割槽

hadoop fs -cat /user/tmp/a.txt
hello,world
hello,scala
hello,spark
//讀取檔案到RDDvar data=sc.textFile("/user/tmp/a.txt")var result=data.flatMap(line=>line.split(","))//使用flatMap運算元呼叫.split方法按照,進行切分
result.collect
//執行結果為Array[String] = Array(hello, world, hello, scala, hello, spark)
result.distinct.collect //.distinct(去重)
3.coalesce(n)定義n個重分割槽 如果重分割槽數量大於之前的分割槽數量那麼必須加true

hadoop fs -cat /user/tmp/a.txt
hello,world
hello,scala
hello,spark
 
var data=sc.textFile("/user/tmp/a.txt").replaceAll(","," ")//使用.replaceAll方法把,替換成空格
data.collect
//結果為Array[String] = Array(hello world, hello scala, hello spark)
data.partitions.size
//結果為Int=2 //預設倆分割槽
var rdd=data.coalesce(1) //將分割槽改為1
rdd.partitions.size //結果為Int=1
var rdd=data.coalesce(3)
rdd.partitions.size//結果為2  如果重分割槽的數目大於原來的分割槽數,那麼必須指定shuffle引數為true,否則,分割槽數不便
var rdd=data.coalesce(3,true)
rdd.partitions.size//結果為Int=4
4.repartition:該函式其實就是coalesce函式第二個引數為true的實現 

hadoop fs -cat /user/tmp/a.txt
hello,world
hello,scala
hello,spark
 
var data=sc.textFile("/user/tmp/a.txt").replaceAll(","," ")//使用.replaceAll方法把,替換成空格
data.collect
//結果為Array[String] = Array(hello world, hello scala, hello spark)
data.partitions.size
//結果為Int=2 //預設倆分割槽
var rdd=data.repartition(1) //將分割槽改為1
rdd.partitions.size //結果為Int=1
var rdd=data.repartition(3)
rdd.partitions.size//結果為3
5.glom 將每個分割槽中的元素放到一個數組中,這樣,結果就變成了3個數組
var rdd= sc.makeRDD(1 to 10,3)
rdd.partitions.size //結果為3個分割槽
rdd.glom().collect//結果為Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6), Array(7, 8, 9, 10))
//glom將每個分割槽中的元素放到一個數組中,這樣,結果就變成了3個數組
6.union 將兩個RDD進行合併,不去重

var rdd1 = sc.makeRDD(1 to 2,1)
var rdd2 = sc.makeRDD(2 to 3,1)
rdd1.union(rdd2).collect
//結果為Array[Int] = Array(1, 2, 2, 3)
7.intersection 該函式返回兩個RDD的交集,並且去重

var rdd1 = sc.makeRDD(1 to 2,1)
var rdd2 = sc.makeRDD(2 to 3,1)
rdd1.intersection(rdd2).collect//結果為3
8.subtract 該函式類似於intersection,但返回在RDD中出現,並且不在otherRDD中出現的元素,不去重。
var rdd1 = sc.makeRDD(Seq(1,2,2,3))
var rdd2 = sc.makeRDD(3 to 4)
rdd1.subtract(rdd2).collect
//結果為Array[Int] = Array(1, 2, 2)
9.mapPartitions 該函式和map函式類似,只不過對映函式的引數由RDD中的每一個元素變成了RDD中每一個分割槽的迭代器。如果在對映的過程中需要頻繁建立額外的物件,使用mapPartitions要比map高效的多
 var rdd1 = sc.makeRDD(1 to 5,2) 
 //rdd1有兩個分割槽 
 scala> var rdd3 = rdd1.mapPartitions{ x => { 
| var result = List[Int]() 
| var i = 0 
 | while(x.hasNext){ 
 | i += x.next() 
| } 
 | result.::(i).iterator 
 | }} 
//rdd3將rdd1中每個分割槽中的數值累加 
scala> rdd3.collect
res65: Array[Int] = Array(3, 12) 
scala> rdd3.partitions.size 
res66: Int = 2
10.zip 函式用於將兩個RDD組合成Key/Value形式的RDD,這裡預設兩個RDD的partition數量以及元素數量必須都相同,否則會丟擲異常。

var rdd1 = sc.makeRDD(1 to 5,2)
var rdd2 = sc.makeRDD(Seq("A","B","C","D","E"),2)
rdd1.zip(rdd2).collect//結果為Array[(Int, String)] = Array((1,A), (2,B), (3,C), (4,D), (5,E))
rdd2.zip(rdd1).collect//結果為Array[(String, Int)] = Array((A,1), (B,2), (C,3), (D,4), (E,5))
11.zipPartitions zipPartitions函式將多個RDD按照partition組合成為新的RDD,該函式需要組合的RDD具有相同的分割槽數,但對於每個分割槽內的元素數量沒有要求

var rdd1 = sc.makeRDD(1 to 5,2)
var rdd2 = sc.makeRDD(Seq("A","B","C","D","E"),2)
scala> rdd1.mapPartitionsWithIndex{ 
 | (x,iter) => { 
 | var result = List[String]() 
 | while(iter.hasNext){ 
 | result ::= ("part_" + x + "|" + iter.next()) 
 | } 
 | result.iterator 
 | 
 | } 
 | }.collect  //結果為Array[String] = Array(part_0|2, part_0|1, part_1|5, part_1|4, part_1|3)
//rdd2兩個分割槽中元素分佈
 rdd2.mapPartitionsWithIndex{ 
 | (x,iter) => { 
 | var result = List[String]() 
 | while(iter.hasNext){ 
 | result ::= ("part_" + x + "|" + iter.next()) 
 | } 
 | result.iterator 
 | 
 | } 
 | }.collect  /結果為Array[String] = Array(part_0|B, part_0|A, part_1|E, part_1|D, part_1|C)
//rdd1和rdd2做zipPartition
rdd1.zipPartitions(rdd2){ 
 | (rdd1Iter,rdd2Iter) => { 
 | var result = List[String]() 
 | while(rdd1Iter.hasNext && rdd2Iter.hasNext) { 
 | result::=(rdd1Iter.next() + "_" + rdd2Iter.next()) 
 | } 
 | result.iterator 
 | }
 | }.collect  //結果為Array[String] = Array(2_B, 1_A, 5_E, 4_D, 3_C)
12 zipWithIndex 該函式將RDD中的元素和這個元素在RDD中的ID(索引號)組合成鍵/值對。
var rdd2 = sc.makeRDD(Seq("A","B","R","D","F"),2)
rdd2.zipWithIndex().collect //結果為Array[(String, Long)] = Array((A,0), (B,1), (R,2), (D,3), (F,4))
13.zipWithUniqueId 與zipWithIndex相似只不過他是按照分割槽的索引
var rdd1 = sc.makeRDD(Seq("A","B","C","D","E","F"),2)
rdd1.zipWithUniqueId().collect //結果為Array[(String, Long)] = Array((A,0), (B,2), (C,4), (D,1), (E,3), (F,5))
//總分割槽數為2 
//第一個分割槽第一個元素ID為0,第二個分割槽第一個元素ID為1 
//第一個分割槽第二個元素ID為0+2=2,第一個分割槽第三個元素ID為2+2=4 
//第二個分割槽第二個元素ID為1+2=3,第二個分割槽第三個元素ID為3+2=5
三.鍵值對轉換操作

1.mapValues 同基本轉換操作中的map,只不過mapValues是針對[K,V]中的V值進行map操作。

 var rdd1 = sc.makeRDD(Array((1,"A"),(2,"B"),(3,"C"),(4,"D")),2)
rdd1.mapValues(x => x + "_").collect //結果為Array[(Int, String)] = Array((1,A_), (2,B_), (3,C_), (4,D_))
2.flatMapValues 同基本轉換操作中的flatMap,只不過flatMapValues是針對[K,V]中的V值進行flatMap操作
var rdd1 = sc.makeRDD(Array((1,"A"),(2,"B"),(3,"C"),(4,"D")),2)
rdd1.flatMapValues(x => x + "_").collect //結果為Array((1,A), (1,_), (2,B), (2,_), (3,C), (3,_), (4,D), (4,_))
3.combineByKey 該函式用於將RDD[K,V]轉換成RDD[K,C],這裡的V型別和C型別可以相同也可以不同。

var rdd1 = sc.makeRDD(Array(("A",1),("A",2),("B",1),("B",2),("C",1)))
scala> rdd1.combineByKey( 
 | (v : Int) => v + "_", 
 | (c : String, v : Int) => c + "@" + v, 
 | (c1 : String, c2 : String) => c1 + "$" + c2 
 | ).collect
//結果為Array[(String, String)] = Array((A,2_$1_), (B,1_$2_), (C,1_))
//其中三個對映函式分別為: 
//createCombiner: (V) => C (v : Int) => v + “_” //在每一個V值後面加上字元_,返回C型別(String)
//mergeValue: (C, V) => C (c : String, v : Int) => c + “@” + v //合併C型別和V型別,中間加字元@,返回C(String) 
//mergeCombiners: (C, C) => C (c1 : String, c2 : String) => c1 + “$” + c2 //合併C型別和C型別,中間加$,返回C(String) 其他引數為預設值。
//最終,將RDD[String,Int]轉換為RDD[String,String]。
rdd1.combineByKey( 
 (v : Int) => List(v), 
 (c : List[Int], v : Int) => v :: c, 
 (c1 : List[Int], c2 : List[Int]) => c1 ::: c2 
 ).collect 
 //結果為 Array[(String, List[Int])] = Array((A,List(2, 1)), (B,List(2, 1)), (C,List(1)))
//最終將RDD[String,Int]轉換為RDD[String,List[Int]]。
4.foldByKey  該函式用於RDD[K,V]根據K將V做摺疊、合併處理,其中的引數zeroValue表示先根據對映函式將zeroValue應用於V,進行初始化V,再將對映函式應用於初始化後的V
var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))
 rdd1.foldByKey(0)(_+_).collect
//結果:Array[(String, Int)] = Array((A,2), (B,3), (C,1))
//將rdd1中每個key對應的V進行累加,注意zeroValue=0,需要先初始化V,對映函式為+操 
//作,比如("A",0), ("A",2),先將zeroValue應用於每個V,得到:("A",0+0), ("A",2+0),即: 
//("A",0), ("A",2),再將對映函式應用於初始化後的V,最後得到(A,0+2),即(A,2)
rdd1.foldByKey(2)(_+_).collect
//結果Array[(String, Int)] = Array((A,6), (B,7), (C,3))
//先將zeroValue=2應用於每個V,得到:("A",0+2), ("A",2+2),即:("A",2), ("A",4),再將對映函 
//數應用於初始化後的V,最後得到:(A,2+4),即:(A,6)
5.groupByKey該函式用於將RDD[K,V]中每個K對應的V值,合併到一個集合Iterable[V]中

var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))
 rdd1.groupByKey().collec
//結果為:Array[(String, Iterable[Int])] = Array((A,CompactBuffer(0, 2)), (B,CompactBuffer(2, 1)), (C,CompactBuffer(1)))
6.reduceByKey該函式用於將RDD[K,V]中每個K對應的V值根據對映函式來運算。

var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))
var rdd2 = rdd1.reduceByKey((x,y) => x + y)
 rdd2.collect
//結果為Array[(String, Int)] = Array((A,2), (B,3), (C,1))
7.reduceByKeyLocally該函式將RDD[K,V]中每個K對應的V值根據對映函式來運算,運算結果對映到一個Map[K,V]中,而不是RDD[K,V]

var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))
rdd1.reduceByKeyLocally((x,y) => x + y)//res90: scala.collection.Map[String,Int] = Map(B -> 3, A -> 2, C -> 1)
 
8.cogroup 相當於SQL中的全外關聯full outer join,返回左右RDD中的記錄,關聯不上的為空

var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
var rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")),2)
var rdd3 = rdd1.cogroup(rdd2)//如果還要關聯多個rdd則直接作為另一個引數傳進去即可
rdd3.collect
//結果:Array[(String, (Iterable[String], Iterable[String]))] = Array( 
(B,(CompactBuffer(2),CompactBuffer())), 
(D,(CompactBuffer(),CompactBuffer(d))), 
(A,(CompactBuffer(1),CompactBuffer(a))), 
(C,(CompactBuffer(3),CompactBuffer(c)))
9.join 相當於SQL中的內關聯join,只返回兩個RDD根據K可以關聯上的結果,join只能用於兩個RDD之間的關聯,如果要多個RDD關聯,多關聯幾次即可。

var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
var rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")),2)
rdd1.join(rdd2).collect  //結果為Array[(String, (String, String))] = Array((A,(1,a)), (C,(3,c)))
10.leftOuterJoin  類似於SQL中的左外關聯left outer join,返回結果以前面的RDD為主,關聯不上的記錄為空。只能用於兩個RDD之間的關聯,如果要多個RDD關聯,多關聯幾次即可
var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
var rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")),2
rdd1.leftOuterJoin(rdd2).collect
//結果為Array[(String, (String, Option[String]))] = Array((B,(2,None)), (A,(1,Some(a))), (C,(3,Some(c))))
11.rightOuterJoin 與leftOuterJoin相反
var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
var rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")),2
rdd1.rightOuterJoin(rdd2).collect
//結果為Array[(String, (Option[String], String))] = Array((D,(None,d)), (A,(Some(1),a)), (C,(Some(3),c)))
--------------------- 
作者:不會水的魚i 
來源:CSDN 
原文:https://blog.csdn.net/z1219346000/article/details/80465409 
版權宣告:本文為博主原創文章,轉載請附上博文連結!