1. 程式人生 > >《深入理解Spark》之運算元詳解

《深入理解Spark》之運算元詳解

 XML Code 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197

package com.lyzx.day17

import org.apache.spark.{SparkContext,SparkConf}

class T1 {
  /*
    cartesian  做笛卡爾積
    RDD[K]+RDD[V]=>RDD[(K,V)]
    把每項依次組合後放入一個元組中返回
   */
  def f1(sc:SparkContext): Unit ={
    val arr1 = List("A","B","C")
    val arr2 = List(1,2,3)

    val rdd1 = sc.parallelize(arr1)
    val rdd2 = sc.parallelize(arr2)

    rdd1.persist()
    rdd2.persist()

    rdd1.cartesian(rdd2).foreach(println)

    println("==============================="

)

    val rdd11 = rdd1.map(item=>(item,item))
    val rdd21 = rdd2.map(item=>(item,item))
    rdd11.cartesian(rdd21).foreach(println)
  }

  /*
  countByKey  是一個action操作  計算每一個key出現的次數
  具有shuffle操作
  計算出每一個鍵對應的個數,底層使用reduceByKey
   */
  def f2(sc:SparkContext): Unit ={
    val arr1 = (1 to 10)
    val rdd1 = sc.parallelize(arr1)

    val arr2 = (5 to 15)
    val rdd2 = sc.parallelize(arr2)

    rdd1.map(item=>
(item,item*100))
        .union(rdd2.map(item=>(item,item+99)))
        .countByKey()
        .foreach(println)
  }

  /*
  join
    類似於SQL裡面的inner join左表根據條件匹配右表的記錄,如果左表沒有匹配到右表的記錄則不會加回去
    返回元組,類似於(key,(value1,value2,...,values3))
   cogroup
    首先在左表的記錄(k,v)封裝為(k,compactBuffer(v))的形式 再做join操作把匹配的項(k,v1),(k,v2)封裝為(k,compactBuffer(v1,v2))
    右表做相同的操作,然後左表右表做join操作
   */
  def f3(sc:SparkContext): Unit ={
    val arr1 = List((1,"羅永浩"),(2,"雷軍"),(3,"餘承東"),(1,"玉皇大帝"),(4,"戰神刑天"))
    val studentRdd = sc.parallelize(arr1)
    studentRdd.persist()

    val arr2 = List((1,88),(2,99),(3,99))
    val scoreRdd = sc.parallelize(arr2)
    scoreRdd.persist()

    //studentRdd中的記錄根據key匹配scoreRdd中的記錄,然後把匹配的記錄放入到一個元組中(key,(leftValue,rightValue))
    studentRdd.join(scoreRdd).foreach(println)

    println("= = = = = = = = = = = = = = = = = = = = = ")

    //cogroup 首先在studentRdd中把鍵歸併在一起把值存入CompactBuffer  在scoreRdd中相同
    //然後把兩個計算的結果在做join
    studentRdd.cogroup(scoreRdd).foreach(println)
  }

  /*
    mapValues 只針對鍵值對的RDD,其中鍵不變,後面的函式只操作值
   */
  def f4(sc:SparkContext): Unit ={
    val rdd = sc.parallelize((1 to 10))
    val mapRdd = rdd.map(item=>(item,item+100))
    mapRdd.mapValues(_+1).foreach(println)
  }

  /*
    有序列化的地方
    1、儲存等級  MEMERY_ONLY > MEMERY_SER
    2、shuffle 走網路傳輸的時候
    3、分發任務和返回結果的時候

    讀取本地檔案
    textFile()的第二個引數是最少的partition的個數
    如果不傳就找叢集中的預設的並行度和2的最小值最為預設的partition的個數
    如果讀取hdfs上的檔案預設有多少個block就有多少個partition
    如果出入的引數小於block的個數則partition的個數還是block個
    如果大於block的個數 則就使用引數個
   */

  /*
    sortBy
    第一個引數是一個函式,該函式的也有一個帶T泛型的引數,返回型別和RDD中元素的型別是一致的;
      >>主要用於對每個元素進行計算轉換之類的操作
      >>如果不需要做計算就寫x=>x  從x到x的對映
      >>後面兩個引數都有預設值,如果想降序排列可以一個引數搞定x=>{-x}
  第二個引數是ascending,從字面的意思大家應該可以猜到,是的,這引數決定排序後RDD中的元素是升序還是降序,預設是true,也就是升序;
      >>是否是升序
  第三個引數是numPartitions,該引數決定排序後的RDD的分割槽個數,預設排序後的分割槽個數和排序之前的個數相等,
    即為this.partitions.size
   */
  def f5(sc:SparkContext): Unit ={
    val rdd = sc.parallelize(List(10,9,8,7,6,5,4,3,2,1))
    rdd.sortBy(x=>x,true).foreach(print)
    println("========================================")
    val rdd2 = sc.parallelize(1 to 10)
    rdd2.sortBy(x=>{-x}).foreach(print)
  }


  /*
    countByValue
    數一數值有多少個,這兒的value並不是鍵值對裡面的value
    而是RDD的裡面的元素,返回結果是一個元組(element,count) 第一項是元素,第二項是個數
   */
  def f6(sc:SparkContext): Unit ={
    val rdd = sc.parallelize(List((1,10),(2,10),(3,3),(4,4),(5,10),(5,10),(6,1),(7,1)))
//    val mapRdd = rdd.map(item=>(item,item))
    val xx = rdd.countByValue()
    xx.foreach(println)

    println("==================================")
    val rdd2 = sc.parallelize(List(1,2,2,2,3,4))
    val result = rdd2.countByValue()
    result.foreach(item=>println(item._1+"-"+item._2))
  }
  /*
     groupBy  分組組成鍵值對的形式
     鍵是每元素  值是元素組成的CompactBuffer集合
   */
  def f7(sc:SparkContext): Unit ={
    val rdd = sc.parallelize(List(1,2,2,2,3,4,5,6))
    rdd.groupBy(x=>x).foreach(println)
  }

  /*
  一個簡單的TopN 小栗子
  如下data列表中每一個元素都是網名在搜尋引擎中搜索的關鍵字,現在要統計今天的熱搜詞(出現次數最多的次)Top2
   */
  def f8(sc:SparkContext): Unit ={
    val data = List("A","B","B","A","C","D","A","A","A","A","A","A","A","A","A","A","A","C","C","C","C","C")
    val rdd = sc.parallelize(data)

    //org.apache.spark.rdd.ShuffledRDD

    val mapRdd = rdd.countByValue()
                    .map(item=>(item._2,item._1))
    println(mapRdd.getClass.getName)
    mapRdd.foreach(item=>println(item._1+"----"+item._2))

//    rdd.countByValue()
//      .map(item=>(item._2,item._1))
//      .toSeq
//      .sortBy(x=>{-x._1})
//      .take(2)
//      .foreach(item=>println(item._2))
  }

  def f9(sc:SparkContext): Unit ={
//    val rdd = sc.parallelize((1 to 10))
//    val mapRdd = rdd.map(item=>(item,item))
//    mapRdd
//    .sortByKey()
//    println(mapRdd.getClass.getName)

    val data1 = ("A","B","C")
    val data2 = (1 to 10)
    println(data1.getClass.getName)
    println(data2.getClass.getName)
  }
}

object T1{
  def main(args: Array[String]) {
      val conf = new SparkConf().setAppName("day17").setMaster("local")
      val sc = new SparkContext(conf)

      val t = new T1
//    t.f1(sc)
//    t.f2(sc)
//    t.f3(sc)
//    t.f4(sc)
//    t.f5(sc)
//    t.f6(sc)
//    t.f7(sc)
//    t.f8(sc)
    t.f9(sc)
      sc.stop()
  }
}

相關推薦

深入理解spark-rdd

彈性 gem exc .com drive image 都是 spa ima 1.我們在使用spark計算的時候,操作數據集的感覺很方便是因為spark幫我們封裝了一個rdd(彈性分布式數據集Resilient Distributed Dataset); 那麽rdd

深入理解AndroidXposed

一、背景Xposed,大名鼎鼎得Xposed,是Android平臺上最負盛名的一個框架。在這個框架下,我們可以載入很多外掛App,這些外掛App可以直接或間接操縱系統層面的東西,比如操縱一些本來只對系統廠商才open的功能(實際上是因為Android系統很多API是不公開的,

深入理解Spark運算元

 XML Code  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22

深入理解SparkSpark常用運算元(java版+spark1.6.1)

最近公司要用Java開發Spark專案,以前用的是Scala語言,今天就把Spark常用的運算元使用java語言實現了一遍 XML Code  1 2 3 4 5 6 7 8 9 10 11 12

Spark常用運算元彙總 : 實戰案例、Java版本、Scala版本

官網API地址: JavaRDD:http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.api.java.JavaRDD  JavaPairRDD:http://spark.apache.or

Spark常用運算元

Spark的運算元的分類   從大方向來說,Spark 運算元大致可以分為以下兩類:     1)Transformation 變換/轉換運算元:這種變換並不觸發提交作業,完成作業中間過程處理。     Transformation 操作是延遲計算的,也就是說從一個RDD

C++深入理解單例模式

作者:知乎使用者連結:https://www.zhihu.com/question/27704562/answer/37760739來源:知乎著作權歸作者所有。商業轉載請聯絡作者獲得授權,非商業轉載請註明出處。不使用編譯器擴充套件,不用C++11,不加鎖,也不使用原子操作的話

深入理解CSS】BFC

什麼是BFC? 塊級格式化上下文(Block formatting context)是CSS對於一個頁面進行視覺化渲染時產生的區域,在這個區域中會產生被渲染的盒子模型、以及相互影響的浮動元素。 簡單來說,就是BFC就是一種屬性,影響著元素的定位,以及兄弟元素之間的相互影響。

深入理解SparkListenerBus監聽器

ListenerBus對消費佇列的實現 上圖為LiveListenerBus類的申明 self => 這句相當於給this起了一個別名為self LiveListenerBus負責將SparkListenerEvents非同步傳送過已經註冊過的S

安卓專案實戰強大的網路請求框架okGo使用(二):深入理解Callback自定義JsonCallback

前言 JSON是一種取代XML的資料結構,和xml相比,它更小巧但描述能力卻不差,由於它的小巧所以網路傳輸資料將減少更多流量從而加快了傳輸速度,目前客戶端伺服器返回的資料大多都是基於這種格式的,相應的我們瞭解的關於json的解析工具主要有兩個:Gson(Google官方出的)和fas

零基礎入門大資料spark中rdd部分運算元

先前文章介紹過一些spark相關知識,本文繼續補充一些細節。 我們知道,spark中一個重要的資料結構是rdd,這是一種並行集合的資料格式,大多數操作都是圍繞著rdd來的,rdd裡面擁有眾多的方法可以呼叫從而實現各種各樣的功能,那麼通常情況下我們讀入的資料來源並非rdd格式的,如何轉

深入理解Tomcat系列URL請求

分享一下我老師大神的人工智慧教程!零基礎,通俗易懂!http://blog.csdn.net/jiangjunshow 也歡迎大家轉載本篇文章。分享知識,造福人民,實現我們中華民族偉大復興!        

深入理解盒子模型——CSSBFC

  首先我們看看w3c對BFC是怎麼定義的:   Floats, absolutely positioned elements, block containers (such as inline-blocks, table-cells, and table-c

深入理解Spark通過sample運算元找出導致資料傾斜的key

最近在整理原來學過的內容,看到sample運算元就寫一篇在實際開發中sample運算元的具體應用 sample(withReplacement : scala.Boolean, fraction : scala.Double,seed scala.Long) sample

Spark運算元篇 --Spark運算元aggregateByKey

一。基本介紹 rdd.aggregateByKey(3, seqFunc, combFunc) 其中第一個函式是初始值 3代表每次分完組之後的每個組的初始值。 seqFunc代表combine

Spark函數系列RDD基本轉換

9.png cal shuff reac 數組a water all conn data 摘要: RDD:彈性分布式數據集,是一種特殊集合 ? 支持多種來源 ? 有容錯機制 ? 可以被緩存 ? 支持並行操作,一個RDD代表一個分區裏的數據集 RDD有兩種操作算子: Tra

Spark算子aggregateByKey

all item bubuko 最大 name rest map com class 一、基本介紹 rdd.aggregateByKey(3, seqFunc, combFunc) 其中第一個函數是初始值 3代表每次分完組之後的每個組的初始值。 seqFunc代表combi

spark運算元

combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner) 定義: def combineByKey[C]( createCombiner: V => C, mergeVal

深入React技術棧setState

丟擲問題 class Example extends Component { contructor () { super() this.state = { value: 0, index: 0 } } componentDidMount ()

深入學習理解(9):java:AbstractQueuedSynchronizer

導讀: 前一陣子在寫輕量級RPC框架的時候,由於系統中所需要用非同步RPC模型,由於系統所要求效能比較苛刻,所以基本所有耗時的操作都會採用非同步呼叫的方式:比如非同步讀寫DB,IO,更可能redis的操作都需要非同步(主程說了,我咋辦,做唄)。 正文 什麼是A