1. 程式人生 > >SparkCore程式設計(一)-分組取topN

SparkCore程式設計(一)-分組取topN

1、檔案資料
Spark,100
Hadoop,62
Flink,77
Kafka,91
Hadoop,93
Spark,78
Hadoop,69
Spark,98
Hadoop,62
Spark,99
Hadoop,61
Spark,70
Hadoop,75
Spark,88
Hadoop,68
Spark,90
Hadoop,61
2、Scala程式碼:
package topN

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object GroupTop {
  def main(args: Array[String]): Unit = {

    /**
      * 判斷引數的多少
      */

    if (args.length < 2){
      println(
        """
          |topN.GroupTop<inputPath><outputPath>
          |<inputPath> 檔案輸入目錄
          |<outputPath> 輸出目錄
        """.stripMargin
      )
      System.exit(0)
    }
    /**
      * 接收引數
      */


    val Array(inputPath,outputPath) = args
    /**
      * 初始化程式入口
      */
    val conf = new SparkConf()
    conf.setAppName(s"${this.getClass.getSimpleName}")
    conf.setMaster("local")
    conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")

    /**
      * 計算topN
      */
    val sc = new SparkContext(conf)
    val lines: RDD[String] = sc.textFile(inputPath)
    //拆分為Tuple2
    val tupleRDD: RDD[(String, Int)] = lines.map(line => {
      (line.split(",")(0), line.split(",")(1).toInt)
    })
    //分組
    val groutRDD: RDD[(String, Iterable[Int])] = tupleRDD.groupByKey()
    //針對分組對value排序,返回Tuple2
    val groupSort: RDD[(String, List[Int])] = groutRDD.map(grouped => {
      (grouped._1, grouped._2.toList.sortWith(_ > _).take(5))//升序,取Top3
    })
    //遍歷輸出
    groupSort.sortByKey().collect().foreach(pair => {
      println(pair._1+":")
      pair._2.foreach(s => println(s + "\t"))
    })

    sc.stop()
  }
}

執行結果:

Flink:
77	
Hadoop:
93	
75	
69	
68	
62	
Kafka:
91	
Spark:
100	
99	
98	
90	
88	
3、Java程式碼:
package topN;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.rdd.RDD;
import scala.Tuple2;

import java.util.Comparator;
import java.util.Iterator;


public class GroupTopN {
    public static void main(String[] args) {

        /**
         * 判斷引數的個數
         */
        if (args.length > 2){
            System.out.println("topN.GroupTopN" +
                    "need two parameter <inputPath><outputPath> \n" +
                    "<inputPath> 輸入路徑\n" +
                    "<outputPath> 輸出路徑");
            System.exit(0);
        }
        /**
         * 接收引數
         */
        String inputPath = args[0];
        String outputPath = args[1];
        SparkConf conf = new SparkConf().setAppName("topN.GroupTopN").setMaster("local");;
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaRDD<String> lines = sc.textFile(inputPath);
        
        //拆分為JavaPairRDD程式碼 匿名內部類
        JavaPairRDD<String, Integer> cs = lines.mapToPair(new PairFunction<String, String, Integer>() {

            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2(s.split(",")[0],Integer.valueOf(s.split(",")[1]));
            }
        });
        //拆分為JavaPairRDD程式碼 用lamda表示式
//        JavaPairRDD cs = lines.mapToPair((PairFunction) s -> new Tuple2(s.toString().split(",")[0],Integer.valueOf(s.toString().split(",")[1])));

        //根據Key分組
        JavaPairRDD<String, Iterable<Integer>> csPairsRDD = cs.groupByKey();
        //根據Key排序,降序
        JavaPairRDD<String, Iterable<Integer>> sortbykey = csPairsRDD.sortByKey();

        //遍歷取出Top3
        sortbykey.foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>() {
            @Override
            public void call(Tuple2<String, Iterable<Integer>> csPair) throws Exception {
                String name = csPair._1();
                Iterator<Integer> ite = csPair._2().iterator();
                Integer[] res = new Integer[3];
                //排序,取出Top3
                while (ite.hasNext()){
                    Integer score = ite.next();
                    for (int i = 0; i < 3; i++){
                        if (res[i] == null){
                            res[i] = score;
                            break;
                        } else if (res[i] < score){
                            for (int j= 2 ; j > i; j--){
                                res[i] = res[j - 1];
                            }
                            res[i] = score;
                            break;
                        }
                    }
                }
                System.out.println(name+":");
                for (int i = 0; i < res.length; i++){
                    System.out.println(res[i] + "\t");
                }
                System.out.println();
            }


        });
        sc.close();
    }

}

執行結果:

Flink:
77	
null	
null	

Hadoop:
93	
75	
68	

Kafka:
91	
null	
null	

Spark:
100	
99	
90	

相關推薦

SparkCore程式設計-分組topN

1、檔案資料Spark,100 Hadoop,62 Flink,77 Kafka,91 Hadoop,93 Spark,78 Hadoop,69 Spark,98 Hadoop,62 Spark,99 Hadoop,61 Spark,70 Hadoop,75 Spark,88

Python爬蟲實例百度貼吧帖子中的圖片

選擇 圖片查看 負責 targe mpat wid agent html headers 程序功能說明:爬取百度貼吧帖子中的圖片,用戶輸入貼吧名稱和要爬取的起始和終止頁數即可進行爬取。 思路分析: 一、指定貼吧url的獲取 例如我們進入秦時明月吧,提取並分析其有效url如下

SparkCore基礎

數據操作 步驟 kcon 用戶訪問 引擎 cdh 管理 你會 gpo * SparkCore基礎(一) 學習Spark,首先要熟悉Scala,當然你說你會Python或者Java能不能玩Spark?能!但是不推薦,首推Scala,因為Scala非常便捷,而且Scala有非

python學習-網路程式設計

udp的接收和傳送資料程式碼: udp的傳送資料程式碼如下: import socket def main():     #建立套接字     udp_socket = socket.socket(socket.AF_I

網路程式設計:埠那些事兒

TCP和UDP協議都存在一個叫做埠的東西,但埠卻不是IP協議的一部分。 埠被設計出來主要是為了給協議棧和應用對應: 協議棧用埠號將資料分配給不同的應用層程式 應用層程式用埠號去區分不同的連線,參見之前提到過的“四元組” TCP和UDP協議都使用了埠號(Port num

Java GUI程式設計

********************第16章 GUI程式設計 ************************** 一.什麼是GUI GUI= Graphical User Interface 圖形使用者介面 圖形使用者介面 GUI是用java來編寫帶介面的應用程式 兩套技術 1. AWT(Ab

軟體工程部落格作業二 -- 結對程式設計

作業要求:https://edu.cnblogs.com/campus/ustc/InnovatingLeadersClass/homework/2231 專案原始碼:https://github.com/jackroos/golden_number 黃金點遊戲簡介 N個同學(N通常大於10),每人寫一

Java-函數語言程式設計初識篇

原文連結:https://www.javazhiyin.com/20249.html 開發者使用Java8編寫複雜的集合處理演算法,只需要簡單的程式碼就能在多喝cpu上高效執行,這就是Lambda表示式的初衷。 提示:函數語言程式設計和語言無關,它是一種思想,任何語言都可以實現

Python Socket網路程式設計初識Socket和Socket初步使用

目錄 前言 網路程式設計 實質 IP地址和埠 資料傳輸協議 協議 Socket

淺談PHP面向物件程式設計

傳統的面向過程   將要完成的工作,分作若干個步驟,或再細分為子步驟,然後後步驟從前往後一步一步完成,最初達致目標。 現代的面向物件   將要完成的工作拆分為“一個一個物件”的任務(功能),每個物件獨自完成自己的任務,任務之間的連結通過“呼叫”來實現,最終也完成了整體的工

微控制器高階裸機程式設計-- 資料驅動程式

我理解的所謂資料驅動程式的方法,簡單而言:將資料與程式分析,將程式碼邏輯的組織轉換成數字規律的統計。即將資料作為一個要處理的指令碼(當做資料庫),然後程式作為直譯器,將指令碼的內容用程式碼邏輯解釋出來,實現程式設計功能。 這就要求我們將程式碼邏輯分解出其內在資料關係,這樣我們就可以寫出與資料

Socket網路程式設計

此文使用的協議是 TCP       首先要寫入以下程式碼,不然很多函式都用不了 #include <WinSock2.h> #pragma comment(lib, "ws2_32.lib")   ●伺服器端    

Python函數語言程式設計:高階函式

首先有一個高階函式的知識。 一個函式可以接收另一個函式作為引數,這種函式就稱之為高階函式。 def add(x, y, f): return f(x) + f(y) 當我們呼叫add(-5, 6, abs)時,引數x,y和f分別接收-5,6和abs,根據函式定義,我們可以推導計算過程為:

利用CUDA進行GPU程式設計

安裝CUDA工具 進行GPU程式設計的第一步,是對程式設計環境進行搭建,小編選擇的是NVIDIA提供的CUDA toolkit, 使用該工具的硬體基礎是電腦顯示卡需要是N卡(即NVIDIA系列顯示卡),通過在電腦中工作管理員的顯示介面卡中檢視自己電腦的顯示卡資訊,也可以在魯大師等軟體中直

Python爬蟲入門實戰系列--爬網路小說並存放至txt檔案

執行平臺: Windows  Python版本: Python3.x  一、庫檔案                      

Scala函數語言程式設計

package PXL.basics import sun.security.util.Length /*** * 使用Alt + /可以自動補全 * def 後面就可以看成函式 * 函式是一等公民,可以像變數一樣被傳遞,被賦值;並且函式和變數之間可以賦值傳遞,可以把函式當成一個變

併發程式設計:執行緒基礎、執行緒之間的共享與協作

一、基礎概念 1.1 CPU核心數、執行緒數 **兩者的關係:**cpu的核心數與執行緒數是1:1的關係,例如一個8核的cpu,支援8個執行緒同時執行。但在intel引入超執行緒技術以後,cpu與執行緒數的關係就變成了1:2。此外在開發過程中並沒感覺到執行緒的限制,那是因為cpu

我愛程式設計

 1、目錄檢索 【1】 給定一個非空字串,找出不含有重複字元的最長子串的長度。【easy】 【2】用多型的思想求圓、矩形的周長和麵積。【medium】 【3】氣泡排序。【easy】 【4】快速排序。【medium】 【5】一列數的規則如下: 1、1、2、3、5、8、13

基於Linux的C程式設計

一、GCC概述 1、GCC概述 一個c/c++程式從開始編碼到生成二進位制可執行檔案至少要經過四個步驟。   (1)預處理:對原始檔的巨集進行展開。   (2)編譯:將源程式編譯成彙編檔案。   (3)彙編:將彙編檔案編譯成機器碼。   (4)連結:將目標檔案和外部符號進行連結,生成可執行檔案。

併發程式設計同步類容器和併發類容器

併發程式設計(一)同步類容器和併發類容器 一、同步類容器 同步類容器是 執行緒安全 的,如 Vector、HashTable 等容器的同步功能都是由 Collections.synchronizedMap 等工廠方法去建立實現的,底層使用 synchronized 關鍵字,每次只有一個執行緒訪問容器。這明