《深入理解Spark》之運算元詳解
XML Code
1 |
package com.lyzx.day17 val rdd11 = rdd1.map(item=>(item,item)) val rdd21 = rdd2.map(item=>(item,item)) rdd11.cartesian(rdd21).foreach(println) } /* countByKey 是一個action操作 計算每一個key出現的次數 具有shuffle操作 計算出每一個鍵對應的個數,底層使用reduceByKey */ def f2(sc:SparkContext): Unit ={ val arr1 = (1 to 10) val rdd1 = sc.parallelize(arr1) val arr2 = (5 to 15) val rdd2 = sc.parallelize(arr2) rdd1.map(item=> .union(rdd2.map(item=>(item,item+99))) .countByKey() .foreach(println) } /* join 類似於SQL裡面的inner join左表根據條件匹配右表的記錄,如果左表沒有匹配到右表的記錄則不會加回去 返回元組,類似於(key,(value1,value2,...,values3)) cogroup 首先在左表的記錄(k,v)封裝為(k,compactBuffer(v))的形式 再做join操作把匹配的項(k,v1),(k,v2)封裝為(k,compactBuffer(v1,v2)) 右表做相同的操作,然後左表右表做join操作 */ def f3(sc:SparkContext): Unit ={ val arr1 = List((1,"羅永浩"),(2,"雷軍"),(3,"餘承東"),(1,"玉皇大帝"),(4,"戰神刑天")) val studentRdd = sc.parallelize(arr1) studentRdd.persist() val arr2 = List((1,88),(2,99),(3,99)) val scoreRdd = sc.parallelize(arr2) scoreRdd.persist() //studentRdd中的記錄根據key匹配scoreRdd中的記錄,然後把匹配的記錄放入到一個元組中(key,(leftValue,rightValue)) studentRdd.join(scoreRdd).foreach(println) println("= = = = = = = = = = = = = = = = = = = = = ") //cogroup 首先在studentRdd中把鍵歸併在一起把值存入CompactBuffer 在scoreRdd中相同 //然後把兩個計算的結果在做join studentRdd.cogroup(scoreRdd).foreach(println) } /* mapValues 只針對鍵值對的RDD,其中鍵不變,後面的函式只操作值 */ def f4(sc:SparkContext): Unit ={ val rdd = sc.parallelize((1 to 10)) val mapRdd = rdd.map(item=>(item,item+100)) mapRdd.mapValues(_+1).foreach(println) } /* 有序列化的地方 1、儲存等級 MEMERY_ONLY > MEMERY_SER 2、shuffle 走網路傳輸的時候 3、分發任務和返回結果的時候 讀取本地檔案 textFile()的第二個引數是最少的partition的個數 如果不傳就找叢集中的預設的並行度和2的最小值最為預設的partition的個數 如果讀取hdfs上的檔案預設有多少個block就有多少個partition 如果出入的引數小於block的個數則partition的個數還是block個 如果大於block的個數 則就使用引數個 */ /* sortBy 第一個引數是一個函式,該函式的也有一個帶T泛型的引數,返回型別和RDD中元素的型別是一致的; >>主要用於對每個元素進行計算轉換之類的操作 >>如果不需要做計算就寫x=>x 從x到x的對映 >>後面兩個引數都有預設值,如果想降序排列可以一個引數搞定x=>{-x} 第二個引數是ascending,從字面的意思大家應該可以猜到,是的,這引數決定排序後RDD中的元素是升序還是降序,預設是true,也就是升序; >>是否是升序 第三個引數是numPartitions,該引數決定排序後的RDD的分割槽個數,預設排序後的分割槽個數和排序之前的個數相等, 即為this.partitions.size */ def f5(sc:SparkContext): Unit ={ val rdd = sc.parallelize(List(10,9,8,7,6,5,4,3,2,1)) rdd.sortBy(x=>x,true).foreach(print) println("========================================") val rdd2 = sc.parallelize(1 to 10) rdd2.sortBy(x=>{-x}).foreach(print) } /* countByValue 數一數值有多少個,這兒的value並不是鍵值對裡面的value 而是RDD的裡面的元素,返回結果是一個元組(element,count) 第一項是元素,第二項是個數 */ def f6(sc:SparkContext): Unit ={ val rdd = sc.parallelize(List((1,10),(2,10),(3,3),(4,4),(5,10),(5,10),(6,1),(7,1))) // val mapRdd = rdd.map(item=>(item,item)) val xx = rdd.countByValue() xx.foreach(println) println("==================================") val rdd2 = sc.parallelize(List(1,2,2,2,3,4)) val result = rdd2.countByValue() result.foreach(item=>println(item._1+"-"+item._2)) } /* groupBy 分組組成鍵值對的形式 鍵是每元素 值是元素組成的CompactBuffer集合 */ def f7(sc:SparkContext): Unit ={ val rdd = sc.parallelize(List(1,2,2,2,3,4,5,6)) rdd.groupBy(x=>x).foreach(println) } /* 一個簡單的TopN 小栗子 如下data列表中每一個元素都是網名在搜尋引擎中搜索的關鍵字,現在要統計今天的熱搜詞(出現次數最多的次)Top2 */ def f8(sc:SparkContext): Unit ={ val data = List("A","B","B","A","C","D","A","A","A","A","A","A","A","A","A","A","A","C","C","C","C","C") val rdd = sc.parallelize(data) //org.apache.spark.rdd.ShuffledRDD val mapRdd = rdd.countByValue() .map(item=>(item._2,item._1)) println(mapRdd.getClass.getName) mapRdd.foreach(item=>println(item._1+"----"+item._2)) // rdd.countByValue() // .map(item=>(item._2,item._1)) // .toSeq // .sortBy(x=>{-x._1}) // .take(2) // .foreach(item=>println(item._2)) } def f9(sc:SparkContext): Unit ={ // val rdd = sc.parallelize((1 to 10)) // val mapRdd = rdd.map(item=>(item,item)) // mapRdd // .sortByKey() // println(mapRdd.getClass.getName) val data1 = ("A","B","C") val data2 = (1 to 10) println(data1.getClass.getName) println(data2.getClass.getName) } } object T1{ def main(args: Array[String]) { val conf = new SparkConf().setAppName("day17").setMaster("local") val sc = new SparkContext(conf) val t = new T1 // t.f1(sc) // t.f2(sc) // t.f3(sc) // t.f4(sc) // t.f5(sc) // t.f6(sc) // t.f7(sc) // t.f8(sc) t.f9(sc) sc.stop() } } |
相關推薦
深入理解spark-rdd詳解
彈性 gem exc .com drive image 都是 spa ima 1.我們在使用spark計算的時候,操作數據集的感覺很方便是因為spark幫我們封裝了一個rdd(彈性分布式數據集Resilient Distributed Dataset); 那麽rdd
深入理解Android之Xposed詳解
一、背景Xposed,大名鼎鼎得Xposed,是Android平臺上最負盛名的一個框架。在這個框架下,我們可以載入很多外掛App,這些外掛App可以直接或間接操縱系統層面的東西,比如操縱一些本來只對系統廠商才open的功能(實際上是因為Android系統很多API是不公開的,
《深入理解Spark》之運算元詳解
XML Code 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
《深入理解Spark》之Spark常用運算元詳解(java版+spark1.6.1)
最近公司要用Java開發Spark專案,以前用的是Scala語言,今天就把Spark常用的運算元使用java語言實現了一遍 XML Code 1 2 3 4 5 6 7 8 9 10 11 12
Spark常用運算元詳解彙總 : 實戰案例、Java版本、Scala版本
官網API地址: JavaRDD:http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.api.java.JavaRDD JavaPairRDD:http://spark.apache.or
Spark常用運算元詳解
Spark的運算元的分類 從大方向來說,Spark 運算元大致可以分為以下兩類: 1)Transformation 變換/轉換運算元:這種變換並不觸發提交作業,完成作業中間過程處理。 Transformation 操作是延遲計算的,也就是說從一個RDD
C++深入理解單例模式詳解
作者:知乎使用者連結:https://www.zhihu.com/question/27704562/answer/37760739來源:知乎著作權歸作者所有。商業轉載請聯絡作者獲得授權,非商業轉載請註明出處。不使用編譯器擴充套件,不用C++11,不加鎖,也不使用原子操作的話
【深入理解CSS】BFC詳解
什麼是BFC? 塊級格式化上下文(Block formatting context)是CSS對於一個頁面進行視覺化渲染時產生的區域,在這個區域中會產生被渲染的盒子模型、以及相互影響的浮動元素。 簡單來說,就是BFC就是一種屬性,影響著元素的定位,以及兄弟元素之間的相互影響。
深入理解Spark之ListenerBus監聽器
ListenerBus對消費佇列的實現 上圖為LiveListenerBus類的申明 self => 這句相當於給this起了一個別名為self LiveListenerBus負責將SparkListenerEvents非同步傳送過已經註冊過的S
安卓專案實戰之強大的網路請求框架okGo使用詳解(二):深入理解Callback之自定義JsonCallback
前言 JSON是一種取代XML的資料結構,和xml相比,它更小巧但描述能力卻不差,由於它的小巧所以網路傳輸資料將減少更多流量從而加快了傳輸速度,目前客戶端伺服器返回的資料大多都是基於這種格式的,相應的我們瞭解的關於json的解析工具主要有兩個:Gson(Google官方出的)和fas
零基礎入門大資料之spark中rdd部分運算元詳解
先前文章介紹過一些spark相關知識,本文繼續補充一些細節。 我們知道,spark中一個重要的資料結構是rdd,這是一種並行集合的資料格式,大多數操作都是圍繞著rdd來的,rdd裡面擁有眾多的方法可以呼叫從而實現各種各樣的功能,那麼通常情況下我們讀入的資料來源並非rdd格式的,如何轉
深入理解Tomcat系列之七 詳解URL請求
分享一下我老師大神的人工智慧教程!零基礎,通俗易懂!http://blog.csdn.net/jiangjunshow 也歡迎大家轉載本篇文章。分享知識,造福人民,實現我們中華民族偉大復興!  
深入理解盒子模型——CSS之BFC詳解
首先我們看看w3c對BFC是怎麼定義的: Floats, absolutely positioned elements, block containers (such as inline-blocks, table-cells, and table-c
《深入理解Spark》之通過sample運算元找出導致資料傾斜的key
最近在整理原來學過的內容,看到sample運算元就寫一篇在實際開發中sample運算元的具體應用 sample(withReplacement : scala.Boolean, fraction : scala.Double,seed scala.Long) sample
Spark運算元篇 --Spark運算元之aggregateByKey詳解
一。基本介紹 rdd.aggregateByKey(3, seqFunc, combFunc) 其中第一個函式是初始值 3代表每次分完組之後的每個組的初始值。 seqFunc代表combine
Spark函數詳解系列之RDD基本轉換
9.png cal shuff reac 數組a water all conn data 摘要: RDD:彈性分布式數據集,是一種特殊集合 ? 支持多種來源 ? 有容錯機制 ? 可以被緩存 ? 支持並行操作,一個RDD代表一個分區裏的數據集 RDD有兩種操作算子: Tra
Spark算子之aggregateByKey詳解
all item bubuko 最大 name rest map com class 一、基本介紹 rdd.aggregateByKey(3, seqFunc, combFunc) 其中第一個函數是初始值 3代表每次分完組之後的每個組的初始值。 seqFunc代表combi
spark運算元詳解
combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner) 定義: def combineByKey[C]( createCombiner: V => C, mergeVal
深入React技術棧之setState詳解
丟擲問題 class Example extends Component { contructor () { super() this.state = { value: 0, index: 0 } } componentDidMount ()
深入學習理解(9):java:AbstractQueuedSynchronizer詳解
導讀: 前一陣子在寫輕量級RPC框架的時候,由於系統中所需要用非同步RPC模型,由於系統所要求效能比較苛刻,所以基本所有耗時的操作都會採用非同步呼叫的方式:比如非同步讀寫DB,IO,更可能redis的操作都需要非同步(主程說了,我咋辦,做唄)。 正文 什麼是A