1. 程式人生 > >spark常用函式:transformation和action

spark常用函式:transformation和action

1、RDD提供了兩種型別的操作:transformation和action

所有的transformation都是採用的懶策略,如果只是將transformation提交是不會執行計算的,計算只有在action被提交的時候才被觸發。

1)transformation操作:得到一個新的RDD,比如從資料來源生成一個新的RDD,從RDD生成一個新的RDD

map(func):對呼叫map的RDD資料集中的每個element都使用func,然後返回一個新的RDD,這個返回的資料集是分散式的資料集

mapValues顧名思義就是輸入函式應用於RDDKev-ValueValue,原RDD中的Key保持不變,與新的

Value一起組成新的RDD中的元素。因此,該函式只適用於元素為KV對的RDD。

mapWith是map的另外一個變種,map只需要一個輸入函式,而mapWith有兩個輸入函式。第一個函式是把RDD的partition index(index0開始)作為輸入,輸出為新型別A;第二個函式是把二元組(T, A)作為輸入(其中T為原RDD中的元素,A為第一個函式的輸出),輸出型別為U。

mapPartitions(func):和map很像,但是map是每個element,而mapPartitions是每個partition

mapPartitionsWithSplit(func):和mapPartitions很像,但是func作用的是其中一個split上,所以func中應該有index

mapPartitionsWithIndex(func)函式:mapPartitionsWithIndex的func接受兩個引數,第一個引數是分割槽的索引,第二個是一個數據集分割槽的迭代器。而輸出的是一個包含經過該函式轉換的迭代器。下面測試中,將分割槽索引和分割槽資料一起輸出。

sample(withReplacement,faction,seed):抽樣,withReplacement為true表示有放回;faction表示取樣的比例;seed為隨機種子

takeSample() 函式和上面的sample 函式是一個原理,但是不使用相對比例取樣,而是按設定的取樣個數進行取樣,同時返回結果不再是RDD,而是相當於對取樣後的資料進行Collect(),返回結果的集合為單機的陣列。

filter(func) : 對呼叫filter的RDD資料集中的每個元素都使用func,然後返回一個包含使func為true的元素構成的RDD

flatMap(func):和map差不多,但是flatMap生成的是多個結果

flatMapValues類似於mapValues,不同的在於flatMapValues應用於元素為KV對的RDD中Value。每個一元素的Value被輸入函式對映為一系列的值,然後這些值再與原RDD中的Key組成一系列新的KV對。

flatMapWith與mapWith很類似,都是接收兩個函式,一個函式把partitionIndex作為輸入,輸出是一個新型別A;另外一個函式是以二元組(T,A)作為輸入,輸出為一個序列,這些序列裡面的元素組成了新的RDD。

union(otherDataset):返回一個新的dataset,包含源dataset和給定dataset的元素的集合

distinct([numTasks]):返回一個包含源資料集中所有不重複元素的新資料集

groupByKey(numTasks):返回(K,Seq[V]),也就是hadoop中reduce函式接受的key-valuelist

reduceByKey(func,[numTasks]):就是用一個給定的reducefunc再作用在groupByKey產生的(K,Seq[V]),比如求和,求平均數

sortBy (dataSet, boolean)函式:排序。第二個引數預設為true,即升序排序。

sortByKey([ascending],[numTasks]):按照key來進行排序,是升序還是降序,ascending是boolean型別

join(otherDataset,[numTasks]):當有兩個KV的dataset(K,V)和(K,W),返回的是(K,(V,W))的dataset,numTasks為併發的任務數。本質是通過cogroup運算元先進行協同劃分,再通過flatMapValues 將合併的資料打散。

cogroup(otherDataset,[numTasks]):當有兩個KV的dataset(K,V)和(K,W),返回的是(K,Seq[V],Seq[W])的dataset,numTasks為併發的任務數

cartesian(otherDataset):笛卡爾積就是m*n,大家懂的

coalesce(numPartitions, true)函式:對RDD中的分割槽重新進行合併。返回一個新的RDD,且該RDD的分割槽個數等於numPartitions個數。如果shuffle設定為true,則會進行shuffle。

repartition(numPartitions)隨機重新shuffle RDD中的資料,並建立numPartitions個分割槽。此操作總會通過網路來shuffle全部資料

pipe(command, [envVars])通過POSIX 管道來將每個RDD分割槽的資料傳入一個shell命令(例如Perl或bash指令碼)。RDD元素會寫入到程序的標準輸入,其標準輸出會作為RDD字串返回。

2)action操作:action是得到一個值,或者一個結果(直接將RDD cache到記憶體中)

reduce(func):說白了就是聚集,但是傳入的函式是兩個引數輸入返回一個值,這個函式必須是滿足交換律和結合律的

collect():一般filter或者足夠小的結果的時候,再用collect封裝返回一個數組

count():返回的是dataset中的element的個數

first():返回的是dataset中的第一個元素      類似於take(1)

take(n):返回前n個elements,這個士driver program返回的

takeSample(withReplacement,num,seed):抽樣返回一個dataset中的num個元素,隨機種子seed

takeOrdered(n, [ordering])返回一個由資料集的前n個元素組成的有序陣列,使用自然序或自定義的比較器。

saveAsTextFile(path):把dataset寫到一個text file中,或者hdfs,或者hdfs支援的檔案系統中。對於每個元素,Spark將會呼叫toString方法,spark把每條記錄都轉換為一行記錄,然後寫到file中。

saveAsSequenceFile(path)將資料集的元素,以Hadoopsequencefile的格式,儲存到指定的目錄下,本地系統,HDFS或者任何其它hadoop支援的檔案系統。這個只限於由key-value對組成,並實現了Hadoop的Writable介面,或者隱式的可以轉換為Writable的RDD。(Spark包括了基本型別的轉換,例如Int,Double,String,等等)

saveAsObjectFile(path)將資料集元素寫入Java序列化的可以被SparkContext.objectFile()載入的簡單格式中

countByKey():返回的是key對應的個數的一個map,作用於一個RDD

foreach(func):對dataset中的每個元素都使用func

3)其它 函式操作:

lookup操作:通過key找value值。Lookup 函式對(Key, Value) 型的RDD 操作,返回指定Key 對應的元素形成的Seq。

contains(str)函式:包含str字串

take 和 takeAsTextFile操作

fold,foldLeft, and foldRight之間的區別

trim函式:把字元兩端的空格截掉

top 返回最大的 k 個元素。

take 返回最小的 k 個元素。

takeOrdered 返回最小的 k 個元素,並且在返回的陣列中保持元素的順序。

first 相當於top(1) 返回整個RDD中的前k 個元素,可以定義排序的方式 Ordering[T]。返回的是一個含前k 個元素的陣列。

fold 和reduce 的原理相同,但是與reduce 不同,相當於每個reduce 時,迭代器取的第一個元素是zeroValue。

aggregate 先對每個分割槽的所有元素進行aggregate 操作,再對分割槽的結果進行fold 操作。aggreagate 與fold 和reduce 的不同之處在於,aggregate相當於採用歸併的方式進行資料聚集,這種聚集是並行化的。而在fold 和reduce 函式的運算過程中,每個分割槽中需要進行序列處理,每個分割槽序列計算完結果,結果再按之前的方式進行聚集,並返回最終聚集結果。

4)Spark 1.4為DataFrame新增的統計與數學函式

隨機資料生成(RandomData Generation)

describe函式:概要與描述性統計(Summary and descriptive statistics)

協方差與相關性(Samplecovariance and correlation)

交叉列表(Crosstabulation)

頻率項(Frequentitems)

數學函式(Mathematicalfunctions)

2transformation操作函式總結:

1)、sortBy函式與sortByKey函式:

①sortBy(func, assending, numPartitions)函式:有三個引數,第一個必須要有,後兩個可以省略

第一個引數是一個函式,返回型別和RDD中元素的型別是一致的;

第二個引數是ascending,這引數決定排序後RDD中的元素是升序還是降序,預設是true,也就是升序;

第三個引數是numPartitions,該引數決定排序後的RDD的分割槽個數,預設排序後的分割槽個數和排序之前的個數相等,即為this.partitions.size。

scala>val data = List(3,1,90,3,5,12)   data: List[Int] =List(3, 1, 90, 3, 5, 12)

scala>val rdd = sc.parallelize(data)   

scala> val result = rdd.sortBy(x => x, false, 1)       false 為倒序排列;第三個引數為設定rdd的分割槽個數,預設為原rdd的個數

scala>result.partitions.size         res4: Int = 1

②sortByKey(boolean)函式:升序或降序由ascending布林引數決定,預設true為升序。sortByKey函式作用於Key-Value形式的RDD,並對Key進行排序:主要接受兩個函式,含義和sortBy一樣。該函式返回的RDD一定是ShuffledRDD型別的,因為對源RDD進行排序,必須進行Shuffle操作,而Shuffle操作的結果RDD就是ShuffledRDD。

對Key進行了排序:key為數字,也可以為字元

scala> val a =sc.parallelize(List("wyp", "iteblog", "com","397090770", "test"), 2)

scala> val b = sc.parallelize(List(3,1,9,12,4))        也可以為字元型 val b2 = sc.parallelize(List("3","1",  "9","12", "4"))

scala> val c = b.zip(a)           zipba組成key-value形式,其中bkeyavalue

scala> c.sortByKey().collect              注意sortByKey的小括號不能省

res33: Array[(Int,String)] = Array((1,iteblog), (3,wyp), (4,test), (9,com), (12,397090770))

2)、map函式、flatMap函式

①map(func)與flatMap(func)函式

map是對RDD中的每個元素都執行一個指定的函式來產生一個新的RDD。任何原RDD中的元素在新RDD中都有且只有一個元素與之對應。

scala> val a =sc.parallelize(1 to 9, 3)

scala> val b =a.map(x=>x*2)

scala> b.collect

res48: Array[Int] =Array(2, 4, 6, 8, 10, 12, 14, 16, 18)

把原RDD中每個元素都乘以2來產生一個新的RDD

flatMap(func)函式:與map類似,區別是原RDD中的元素經map處理後只能生成一個元素,而原RDD中的元素經flatmap處理後可生成多個元素來構建新RDD。 如果是多個集合,最後合併為一個集合

舉例:對原RDD中的每個元素x產生y個元素(從1到y,y為元素x的值)

scala> val a=sc.parallelize(1 to 4, 2)

scala> val b=a.flatMap(x => 1 to x)

res57: Array[Int] =Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4)

將Rdd的每個集合元素合併為一個集合:

scala> valkv  =sc.parallelize(List(List(1,2),List(3,4),List(3,6,8)))

scala> kv.collect

res8: Array[List[Int]] = Array(List(1, 2), List(3, 4), List(3, 6, 8))多個集合

scala>kv.flatMap(x=>x.map(_+1)).collect

res9: Array[Int] = Array(2, 3, 4, 5, 4, 7, 9)一個集合

②mapPartitions(func)函式:map的一個變種。map的輸入函式是應用於RDD中每個元素,而mapPartitions的輸入函式是應用於每個分割槽,也就是把每個分割槽中的內容作為整體來處理的。最終的RDD由所有分割槽經過輸入函式處理後的結果合併起來的。

mapPartitions還有些變種,比如mapPartitionsWithContext,它能把處理過程中的一些狀態資訊傳遞給使用者指定的輸入函式。還有mapPartitionsWithIndex,它能把分割槽的index傳遞給使用者指定的輸入函式。

scala> val nums =sc.parallelize(1 to 9, 3)

scala> defmyfunc[T](iter: Iterator[T]): Iterator[(T, T)]={

     | var res = List[(T, T)]()

     | var pre = iter.next

     | while(iter.hasNext) {

     |  val cur = iter.next;

     |  res.::= (pre, cur)

     |  }

     | res.iterator

     | }

scala>nums.mapPartitions(myfunc).collect

res10: Array[(Int,Int)] = Array((1,3), (1,2), (4,6), (4,5), (7,9), (7,8))

mapPartitionsWithIndex(func)函式:mapPartitionsWithIndex的func接受兩個引數,第一個引數是分割槽的索引,第二個是一個數據集分割槽的迭代器。而輸出的是一個包含經過該函式轉換的迭代器。下面測試中,將分割槽索引和分割槽資料一起輸出。

scala> val x =sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 3)

scala>def myfunc(index: Int, iter:Iterator[Int]): Iterator[String] = {

     | iter.toList.map(x => index+"-"+x).iterator

     | }

scala>x.mapPartitionsWithIndex(myfunc).collect

res12: Array[String]= Array(0-1, 0-2, 0-3, 1-4, 1-5, 1-6, 2-7, 2-8, 2-9, 2-10)

③mapValues與flatMapValues函式

mapValues顧名思義就是輸入函式應用於RDDKev-ValueValue,原RDD中的Key保持不變,與新的Value一起組成新的RDD中的元素。因此,該函式只適用於元素為KV對的RDD。

scala> val a =sc.parallelize(List("dog", "tiger", "lion","cat", "panther", "eagle"), 2)

scala> val b =a.map(x => (x.length, x))

scala>b.mapValues("x"+_+"x").collect

res54: Array[(Int,String)] = Array((3,xdogx), (5,xtigerx), (4,xlionx), (3,xcatx), (7,xpantherx),(5,xeaglex))

flatMapValues

flatMapValues類似於mapValues,不同的在於flatMapValues應用於元素為KV對的RDD中Value。每個一元素的Value被輸入函式對映為一系列的值,然後這些值再與原RDD中的Key組成一系列新的KV對。

scala> val a =sc.parallelize(List((1,2),(3,4),(3,6)))

scala> val b =a.flatMapValues(x=>x.to(5))

scala> b.collect

res59: Array[(Int,Int)] = Array((1,2), (1,3), (1,4), (1,5), (3,4), (3,5))

原RDD中每個元素的值被轉換為一個序列(從其當前值到5),比如第一個KV對(1,2), 其值2被轉換為2,3,4,5。然後其再與原KV對中Key組成一系列新的KV對(1,2),(1,3),(1,4),(1,5)。

④mapWith與flatMapWith函式

mapWith是map的另外一個變種,map只需要一個輸入函式,而mapWith有兩個輸入函式。第一個函式是把RDD的partition index(index0開始)作為輸入,輸出為新型別A;第二個函式是把二元組(T, A)作為輸入(其中T為原RDD中的元素,A為第一個函式的輸出),輸出型別為U。

舉例:把partitionindex 乘以10,然後加上2作為新的RDD的元素。

scala> val x = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 3)      有三個分割槽,每個分割槽的下標為0,1,2

scala>x.mapWith(a => a*10)((a, b)=>(b+2)).collect

res56: Array[Int] =Array(2, 2, 2, 12, 12, 12, 22, 22, 22, 22)

flatMapWith與mapWith很類似,都是接收兩個函式,一個函式把partitionIndex作為輸入,輸出是一個新型別A;另外一個函式是以二元組(T,A)作為輸入,輸出為一個序列,這些序列裡面的元素組成了新的RDD。

scala> val a =sc.parallelize(List(1,2,3,4,5,6,7,8,9), 3)

scala>a.flatMapWith(x=>x, true)((x, y)=>List(y, x)).collect

res58: Array[Int] =Array(0, 1, 0, 2, 0, 3, 1, 4, 1, 5, 1, 6, 2, 7, 2, 8, 2, 9)

3)、sample函式:sample(withReplacement,faction,seed):抽樣,withReplacement為true表示又放回;faction表示取樣的比例;seed為隨機種子

scala> val a =sc.parallelize(1 to 10000, 3)

scala>a.sample(false, 0.1, 0).count

res15: Long = 956

4)、distinct去重函式、union求並函式、intersection求交集函式

scala> val kv1 =sc.parallelize(List(("A", 1), ("B", 2), ("C", 3),("A", 4), ("B", 5)))

scala> val kv2 =sc.parallelize(List(("A", 4), ("A", 2), ("C", 3),("A", 4), ("B", 5)))

scala> kv2.distinct.collect

res17:Array[(String, Int)] = Array((A,4), (B,5), (C,3), (A,2))

scala> kv1.union(kv2).collect

res18:Array[(String, Int)] = Array((A,1), (B,2), (C,3), (A,4), (B,5), (A,4), (A,2),(C,3), (A,4), (B,5))

scala> kv1.intersection(kv2).collect

res19:Array[(String, Int)] = Array((A,4), (B,5), (C,3))

5)、sortByKey,groupByKey, reduceByKey等transformation操作涉及到了shuffle操作,所以這裡引出兩個概念寬依賴和窄依賴。

窄依賴(narrowdependencies)

子RDD的每個分割槽依賴於常數個父分割槽(與資料規模無關)

輸入輸出一對一的運算元,且結果RDD的分割槽結構不變。主要是map/flatmap

輸入輸出一對一的運算元,但結果RDD的分割槽結構發生了變化,如union/coalesce

從輸入中選擇部分元素的運算元,如filter、distinct、substract、sample

寬依賴(widedependencies)

子RDD的每個分割槽依賴於所有的父RDD分割槽

對單個RDD基於key進行重組和reduce,如groupByKey,reduceByKey

對兩個RDD基於key進行join和重組,如join

經過大量shuffle生成的RDD,建議進行快取。這樣避免失敗後重新計算帶來的開銷。

注意:reduce是一個action,和reduceByKey完全不同,reduceByKey是transformation操作。

sortByKey(boolean)函式:升序或降序由ascending布林引數決定,預設true為升序

scala> val kv1 =sc.parallelize(List(("A", 1), ("B", 2), ("C", 3),("A", 4), ("B", 5)))

kv1:org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[28] atparallelize at <console>:21

scala> kv1.sortByKey().collect                   注意sortByKey的小括號不能省

res23:Array[(String, Int)] = Array((A,1), (A,4), (B,2), (B,5), (C,3))

groupByKey函式:在一個(K,V)對的資料集上呼叫,返回一個(K,Seq[V])對的資料集

注意:預設情況下,只有8個並行任務來做操作,但是你可以傳入一個可選的numTasks引數來改變它。如果分組是用來計算聚合操作(如sum或average),那麼應該使用reduceByKey或combineByKey 來提供更好的效能。

scala>kv1.groupByKey().collect

res21:Array[(String, Iterable[Int])] = Array((B,CompactBuffer(2, 5)),(A,CompactBuffer(4, 1)), (C,CompactBuffer(3)))

reduceByKey函式:在一個(K,V)對的資料集上呼叫時,返回一個(K,V)對的資料集,使用指定的reduce函式,將相同key的值聚合到一起。類似groupByKey,reduce任務個數是可以通過第二個可選引數來配置的

scala>kv1.reduceByKey(_+_).collect

res22:Array[(String, Int)] = Array((B,7), (A,5), (C,3))

6)、coalesce(numPartitions, true)函式:RDD中的分割槽重新進行合併(減少rdd的分)。返回一個新的RDD,且該RDD的分割槽個數等於numPartitions個數。如果shuffle設定為true,則會進行shuffle。

scala> var data =sc.parallelize(List(1,2,3,4), 4)

scala>data.partitions.length

res27: Int = 4

scala> val result= data.coalesce(2, false)

scala>result.partitions.length

res28: Int = 2

repartition(numPartitions)隨機重新shuffle RDD中的資料,並建立numPartitions個分割槽。這個操作總會通過網路來shuffle全部資料

7)、join函式和cogroup函式:

join(otherDataset,[numTasks])在型別為(K,V)和(K,W)型別的資料集上呼叫時,返回一個相同key對應的所有元素對在一起的(K, (V, W))資料集

scala> valkv1=sc.parallelize(List(("A",1),("B",2),("C",3),("A",4),("B",5)))

scala> valkv3=sc.parallelize(List(("A",10),("B",20),("D",30)))

scala>kv1.join(kv3).collect

res31:Array[(String, (Int, Int))] = Array((B,(2,20)), (B,(5,20)), (A,(4,10)),(A,(1,10)))

cogroup(otherDataset, [numTasks])在型別為(K,V)和(K,W)的資料集上呼叫,返回一個 (K, Seq[V], Seq[W])元組的資料集。這個操作也可以稱之為groupwith

scala>kv1.cogroup(kv3).collect

res32:Array[(String, (Iterable[Int], Iterable[Int]))] = Array((B,(CompactBuffer(5,2),CompactBuffer(20))), (D,(CompactBuffer(),CompactBuffer(30))),(A,(CompactBuffer(1, 4),CompactBuffer(10))),(C,(CompactBuffer(3),CompactBuffer())))

8)、sortBy (dataSet, boolean)函式:排序。第二個引數預設為true,即升序排序。

scala> val rdd =sc.parallelize(List(3,1,90,3,5,12))

scala> rdd.sortBy(x => x, false)       第二個引數預設為true,即升序排列

res40: Array[Int] =Array(90, 12, 5, 3, 3, 1)

檢視執行後rdd 分割槽的個數:   scala> result.partitions.size

res18: Int = 2

注意:預設為兩個,但可以手動修改rdd的分割槽個數手動修改rdd分割槽的個數

scala> val result= rdd.sortBy(x => x, false, 1)

scala>result.partitions.size

res21: Int = 1

3action操作函式總結:

1)、reduce(func)函式、reduceByKey函式transformation操作)

reduce(func)將RDD中元素兩兩傳遞給輸入函式,同時產生一個新的值,新產生的值與RDD中下一個元素再被傳遞給輸入函式直到最後只有一個值為止。

scala> val c =sc.parallelize(1 to 10)

scala> c.reduce((x, y) => x+y)     res60: Int = 55     RDD中的元素求和

reduceByKey就是對元素為KV對的RDDKey相同的元素的Value進行reduce,因此,Key相同的多個元素的值被reduce為一個值,然後與原RDD中的Key組成一個新的KV

scala> val a =sc.parallelize(List((1,2), (3,4), (3,6)))

scala>a.reduceByKey((x, y) => x +y).collect

res61: Array[(Int,Int)] = Array((1,2), (3,10))

對Key相同的元素的值求和,因此Key為3的兩個元素被轉為了(3,10)。

2)、foreach(func)在資料集的每一個元素上,執行函式func進行更新。這通常用於邊緣效果,例如更新一個累加器,或者和外部儲存系統進行互動,例如HBase.

scala> valnum=sc.parallelize(1 to 10)

scala>num.take(5).foreach(println)

3)、

4、其它函式總結:

1)、zip函式:把兩個單獨的rdd組合為key-value形式

scala> val a =sc.parallelize(List("wyp", "iteblog", "com","397090770", "test"), 2)

scala> val b =sc.parallelize(List(3,1,9,12,4))

scala> val c = b.zip(a)           zipba組成key-value形式,其中bkeyavalue

scala>c.sortByKey().collect

res33: Array[(Int,String)] = Array((1,iteblog), (3,wyp), (4,test), (9,com), (12,397090770))

2)、lookup操作:通過key找value值。

scala> val rdd2 =sc.parallelize(List(('a', 1), ('a', 2), ('b', 1), ('b', 3)))

scala>rdd2.lookup('a')

res15: Seq[Int] =WrappedArray(1, 2)

scala>rdd2.lookup('b')

res16: Seq[Int] =WrappedArray(1, 3)

3)take 和takeAsTextFile操作

[[email protected] ~]$ cat a.txt

2014-03-04        121212121212        12

2014-03-06        161616161616        16

[[email protected] ~]$ cat b.txt

2014-03-02        121212121212        1        33.555333        -117.888878786

2014-03-06        161616161616        2        33.677666        -117.886868687

scala> val format= new java.text.SimpleDateFormat("yyyy-mm-dd")

scala> case classRegister(d: java.util.Date, uuid: String, cust_id: String , lat: Float, lng:Float)

scala> case classClick(d: java.util.Date, uuid: String, landing_page: Int)

首先,讀取檔案的內容;然後,以tab鍵進行分詞,接著以第二列為key,每一行的所有內容為Value構建起的Register作為Value的值

scala> val reg =sc.textFile("/ai/b.txt").map(_.split('\t')).map(r => (r(1),Register(format.parse(r(0)), r(1), r(2), r(3).toFloat, r(4).toFloat)))

scala> val clk =sc.textFile("/ai/a.txt").map(_.split('\t')).map(c =>(c(1),Click(format.parse(c(0)), c(1), c(2).trim.toInt)))

clk:org.apache.spark.rdd.RDD[(String, Click)] =MapPartitionsRDD[86] at map at <console>:25

scala> val result = reg join clk  ——》result: org.apache.spark.rdd.RDD[(String, (Register, Click))] = MapPartitionsRDD[89] at join at <console>:31

scala>result.take(2)

res17:Array[(String, (Register, Click))] = Array((121212121212,(Register(Thu Jan 02 00:03:00 CST 2014,121212121212,1,33.555332,-117.88888),Click(Sat Jan 04 00:03:00 CST 2014,121212121212,12))),(161616161616,(Register(Mon Jan 06 00:03:00 CST2014,161616161616,2,33.677666,-117.88687),Click(Mon Jan 06 00:03:00 CST2014,161616161616,16))))

scala>result.saveAsTextFile("/ai/join/result")

[[email protected] ~]$hadoop fs -cat /ai/join/result/part-00000

(121212121212,(Register(ThuJan 02 00:03:00 CST 2014,121212121212,1,33.555332,-117.88888),Click(Sat Jan 0400:03:00 CST 2014,121212121212,12)))

(161616161616,(Register(MonJan 06 00:03:00 CST 2014,161616161616,2,33.677666,-117.88687),Click(Mon Jan 0600:03:00 CST 2014,161616161616,16)))

4)fold, foldLeft, and foldRight之間的區別

主要的區別是fold函式操作遍歷問題集合的順序。foldLeft是從左開始計算,然後往右遍歷。foldRight是從右開始算,然後往左遍歷。而fold遍歷的順序沒有特殊的次序。

scala> valnumbers = List(5,4,8,6,2)

scala>numbers.fold(0){(z,i)=>z+i}

res50: Int = 25

List中的fold方法需要輸入兩個引數:初始值以及一個函式。輸入的函式也需要輸入兩個引數:累加值和當前item的索引。那麼上面的程式碼片段發生了什麼事?

程式碼開始執行的時候,初始值0作為第一個引數傳進到fold函式中,list中的第一個item作為第二個引數傳進fold函式中。

1、fold函式開始對傳進的兩個引數進行計算,在本例中,僅僅是做加法計算,然後返回計算的值;

2、Fold函式然後將上一步返回的值作為輸入函式的第一個引數,並且把list中的下一個item作為第二個引數傳進繼續計算,同樣返回計算的值;

3、第2步將重複計算,直到list中的所有元素都被遍歷之後,返回最後的計算值,整個過程結束;

4、這雖然是一個簡單的例子,讓我們來看看一些比較有用的東西。早在後面將會介紹foldLeft函式,並解釋它和fold之間的區別,目前,你只需要想象foldLeft函式和fold函式執行過程一樣。

下面是一個簡單的類和伴生類

scala> classFoo(val name:String, val age:Int, val sex:Symbol)

scala> objectFoo{

     | def apply(name:String, age:Int, sex:Symbol)=new Foo(name, age, sex)

     | }

假如我們有很多的Foo例項,並存在list中:

scala> valfooList=Foo("Hugh Jass", 25, 'male)::Foo("Biggus Dickus",43, 'male)::Foo("Incontinetia Buttocks", 37, 'female)::Nil

我們想將上面的list轉換成一個儲存[title] [name], [age]格式的String連結串列:

scala> valstringList = fooList.foldLeft(List[String]()) {(z,f)=>

     | val title=f.sex match{

     |  case 'male=>"Mr."

     |  case 'female=>"Ms."

     |  }

     |  z:+ s"$title ${f.name},${f.age}"

     | }

5)trim函式:把字元兩端的空格截掉

scala>val people =sc.textFile("/spark/people.txt").map(_.split(",")).map(p=>Person(p(0),p(1).trim.toInt)).toDF()  

people: org.apache.spark.sql.DataFrame = [name: string, age: int]       注意:如果去掉trim,會報錯

scala>people.registerTempTable("people")

5、spark的統計與數學函式

1)隨機資料生成(Random Data Generation)主要是為測試資料提供方便快捷的介面,如range、rand和randn。rand函式提供均勻正態分佈,而randn則提供標準正態分佈。在呼叫這些函式時,還可以指定列的別名,以方便我們對這些資料進行測試。

scala> val df =sqlContext.range(1, 10).withColumn("uniform",rand(seed=10)).withColumn("normal", randn(seed=27))

scala> sqlContext.range(1, 10).show          生成一個屬性為id的資料框,左閉右開,即不包含10

+---+

| id|

+---+

|  1|

。。。

|  9|

+---+

2)概要與描述性統計(Summary and Descriptive Statistics)包含了計數、平均值、標準差、最大值、最小值運算。只需要針對DataFrame呼叫describe函式即可:

scala>df.describe().show

scala> people.describe().show           注意:此括號不能省略

+-------+------------------+--------------------+-------------------+

|summary|                id|             uniform|             normal|

+-------+------------------+--------------------+-------------------+

|  count|                 9|                   9|                  9|

|   mean|               5.0|  0.3501113296528809|-0.4749238142128228|

|stddev|2.5819888974716116| 0.18389046183528623| 0.9742194777287445|

|    min|                 1|0.018326130186194667|-2.372340011831022|

|    max|                 9|  0.7224977951905031| 0.7873642272821919|

+-------+------------------+--------------------+-------------------+

如果返回的DataFrame含有大量的列,你可以返回其中的一部分列:

scala>df.describe("uniform", "normal").show()

+-------+--------------------+-------------------+

|summary|             uniform|             normal|

+-------+--------------------+-------------------+

|  count|                   9|                  9|

|   mean| 0.3501113296528809|-0.4749238142128228|

| stddev|0.18389046183528623| 0.9742194777287445|

|    min|0.018326130186194667|-2.372340011831022|

|    max| 0.7224977951905031| 0.7873642272821919|

+-------+--------------------+-------------------+

自定選擇要統計的列及函式:

scala>df.select(mean("uniform"), min("uniform"),max("uniform")).show()

+------------------+--------------------+------------------+

|      avg(uniform)|        min(uniform)|      max(uniform)|

+------------------+--------------------+------------------+

|0.3501113296528809|0.018326130186194667|0.7224977951905031|

+------------------+--------------------+------------------+

3)樣本協方差和相關性(Sample covariance and correlation)

協方差表示的是兩個變數的總體的誤差。正數意味著其中一個增加,另外一個也有增加的趨勢;而負數意味著其中一個數增加,另外一個有降低的趨勢。DataFrame兩列中的樣本協方差計算可以如下:

scala> val df =sqlContext.range(0, 10).withColumn("rand1",rand(seed=10)).withColumn("rand2", rand(seed=27))

scala> df.stat.cov("rand1", "rand2")

res18: Double =0.003702640706789616

scala>df.stat.cov("id", "id")

res19: Double =9.166666666666666

兩個隨機生成的列之間的協方差接近零;而id列和它自己的協方差非常大。

協方差的值為9.17可能很難解釋,而相關是協方差的歸一化度量,這個相對更好理解,因為它提供了兩個隨機變數之間的統計相關性的定量測量。

scala> df.stat.corr("rand1", "rand2")

res21: Double =0.07372733310735549

scala> df.stat.corr("id", "id")

res22: Double = 1.0

ID那列完全與相關本身;而兩個隨機生成的列之間的相關性非常低。

4)交叉分類彙總表(又稱列聯表)(Cross tabulation)

如果同時按幾個變數或特徵,把資料分類列表時,這樣的統計表叫作交叉分類彙總表,其主要用來檢驗兩個變數之間是否存在關係,或者說是否獨立。在Spark 1.4中,我們可以計算DataFrame中兩列之間的交叉分類彙總表,以便獲取計算的兩列中不同對的數量,下面是關於如何使用交叉表來獲取列聯表的例子

scala> val df =sqlContext.jdbc("jdbc:mysql://node3:3306/test?user=hive&password=mysql","cross_table")

scala> df.show

+-----+------+

| name|  item|

+-----+------+

|Alice|  milk|

|Alice|  milk|

|Alice| bread|

|Alice|butter|

|  Bob|butter|

|  Bob|butter|

|  Bob|butter|

|  Bob|apples|

+-----+------+

scala>df.stat.crosstab("name", "item").show()

+---------+------+------+----+-----+

|name_item|apples|butter|milk|bread|

+---------+------+------+----+-----+

|      Bob|    1|     3|   0|   0|

|    Alice|    0|     1|   2|   1|

+---------+------+------+----+-----+

我們需要記住,列的基數不能太大。也就是說,name和item distinct之後的數量不能過多。試想,如果item distinct之後的數量為10億,那麼你如何在螢幕上顯示這個表??

5)頻繁項(Frequent items)

瞭解列中那些頻繁出現的item對於我們瞭解資料集非常重要。在Spark 1.4中,我們可以通過使用DataFrames來發現列中的頻繁項,

6)數學函式(Mathematical functions)

Spark 1.4中增加了一系列的數學函式,使用者可以自如地將這些操作應用到他們列。我可以在這裡看到所有的數學函式。輸入必須是一個列函式,並且這個列函式只能輸入一個引數,比如cos, sin, floor, ceil。對於那些需要輸入兩個引數的列函式,比如pow, hypot,我們可以輸入兩列或者列的組合。

相關推薦

spark常用函式transformationaction

1、RDD提供了兩種型別的操作:transformation和action 所有的transformation都是採用的懶策略,如果只是將transformation提交是不會執行計算的,計算只有在action被提交的時候才被觸發。 1)transformation操作:得

Spark程式設計指南之一transformationaction等RDD基本操作

文章目錄 基本概念 開發環境 程式設計實戰 初始化SparkContext RDD的生成 RDD基本操作 Key-Value Pairs Transformations f

Spark常用函式講解之Action操作+例項

RDD:彈性分散式資料集,是一種特殊集合 ‚ 支援多種來源 ‚ 有容錯機制 ‚ 可以被快取 ‚ 支援並行操作,一個RDD代表一個分割槽裡的資料集RDD有兩種操作運算元:         Transformatio

Spark核心程式設計建立RDD及transformationaction詳解案例

建立RDD 進行Spark核心程式設計時,首先要做的第一件事,就是建立一個初始的RDD。該RDD中,通常就代表和包含了Spark應用程式的輸入源資料。然後在建立了初始的RDD之後,才可以通過Spark Core提供的transformation運算元,對該RD

Sparktransformationaction算子簡介

一個 算子 filter true 組成 實現 並行 ont key transformation算子 map(func) 返回一個新的分布式數據集,由每個原元素經過func函數處理後的新元素組成 filter(func) 返回一個新的數據集,由經過func函數處理後返回

spark transformationaction運算元

spark transformation和action的運算元 map(func) 返回一個新的分散式資料集,由每個原元素經過func函式處理後的新元素組成 filter(func) 返回一個新的資料集,由經過func函式處理後返回值為true的原元素組

Spark TransformationAction運算元速查表

Transformation運算元 Transformation運算元 作用 map(func) 返回一個新的分散式資料集,其中每個元素都是由源RDD中每一個元素經過fun

Sparktransformationaction操作

1.前言: RDD:彈性分散式資料集,是一種特殊集合、支援多種來源、有容錯機制、可以被快取、支援並行操作,一個RDD代表多個分割槽裡的資料集 RDD有兩種操作運算元: Transformation(轉換):Transformation屬於

為Linux配置常用epelIUS

IT 配置 arch pda creates erp mmu version 源代碼 CentOS上,除了os類的yum源,還需要配置幾個常用的源:epel、ius。 有很多國內很多鏡像站點都提供了各類倉庫的鏡像站點,個人感覺比較全的是阿裏雲http://mirrors.

spark2的transformationaction操作

spark支援兩種RDD操作:transformation和action操作。 transformation操作會針對已有RDD建立一個新的RDD,而action則對RDD進行最後的操作,如遍歷、儲存到檔案等,並將結果返回到Driver程式。 transformation有lazy特性:若一

Tensorflow框架兩種cost函式MSEMulti-class

import tensorflow as tf def MSE_cost(out,Y): cost = tf.reduce_mean(tf.square(out-Y)) return cost def multiclass_cost(out,Y): cost = tf

Spark常用函式之鍵值RDD轉換+例項

RDD:彈性分散式資料集,是一種特殊集合 ‚ 支援多種來源 ‚ 有容錯機制 ‚ 可以被快取 ‚ 支援並行操作,一個RDD代表一個分割槽裡的資料集RDD有兩種操作運算元:         Transformatio

php常用函式trim,ltrim,rtrim,str_replace

trim trim去除字串首尾處的空白字元(或者其它字元) trim(string $str[,string $charlist])返回字串 trim(待處理的字串,可選引數(過濾字元也可由charlist引數指定,一般要列出所有希望過濾的字元)) 此函式返

spark 常用函式總結

1, textFile()  讀取外部資料來源 2, map() 對每一條資料進行相應的處理 如切分 3, reduceByKey(_+_) 傳入一個函式,將key相同的一類進行聚合計算 如相加 4, mapvalues(_+10) 傳入一個函式,類似於map方法,不過這裡

03、操作RDD(transformationaction案例實戰)

// 這裡通過textFile()方法,針對外部檔案建立了一個RDD,lines,但是實際上,程式執行到這裡為止,spark.txt檔案的資料是不會載入到記憶體中的。lines,只是代表了一個指向spark.txt檔案的引用。val lines = sc.textFile("spark.txt")// 這裡對

【Python】寫視訊的2種常用方法write_videofilevideoWrite

一、使用Python自帶的write_videofile 1、函式說明如下: def write_videofile(self, filename, fps=None, codec=None, bitrate=None, audio

transformationaction的運算元簡介

transformation運算元 map(func) 返回一個新的分散式資料集,由每個原元素經過func函式處理後的新元素組成 filter(func) 返回一個新的資料集,由經過func函式處理後返回值為true的原元素組成 flatMap(func) 類似於map,但是每一個輸入元素,會

javascript未來的函式生成器promise

1.通過生成器讓函式持續執行。 2.使用promise處理非同步任務。 3.使用生成器和promise書寫優雅程式碼。 ES6語言特性:生成器(generator)和promise(promise) 生成器(generator)是一種特殊型別的函式。當從頭到尾執行標準函式時,它最多隻

matlab-畫圖函式scatterplot

由於需要畫圖的時候會經常忘記畫圖函式的具體引數,所以給總結了一下,便於自己和大家用的時候查起來方便,不用到處查,浪費很多時間。 畫圖的時候常用的畫圖函式有scatter和plot,具體語法可以直接在MATLAB命令框裡輸入:help+空格+函式名,檢視具體的語法規則,還附帶

spark 常用函式介紹(python)

全棧工程師開發手冊 (作者:欒鵬) 獲取SparkContext python語法 1. 獲取sparkSession: se = SparkSession.builder.config(conf = SparkConf()).getOrCreate()