1. 程式人生 > >Java接入Spark之建立RDD的兩種方式和操作RDD

Java接入Spark之建立RDD的兩種方式和操作RDD

首先看看思維導圖,我的spark是1.6.1版本,jdk是1.7版本
這裡寫圖片描述

spark是什麼?
Spark是基於記憶體計算的大資料平行計算框架。Spark基於記憶體計算,提高了在大資料環境下資料處理的實時性,同時保證了高容錯性和高可伸縮性,允許使用者將Spark 部署在大量廉價硬體之上,形成叢集。

下載和安裝
可以看我之前發表的部落格
Spark安裝

安裝成功後執行示例程式

在spark安裝目錄下examples/src/main目錄中。 執行的一個Java或Scala示例程式,使用bin/run-example <class> [params]

./bin/run-example SparkPi 10

啟動spark-shell時的引數
./bin/spark-shell –master local[2]
引數master 表名主機master在分散式叢集中的URL
local【2】 表示在本地通過開啟2個執行緒執行

執行模式
四種:
1.Mesos
2.Hadoop YARN
3.spark
4.local

一般我們用的是local和spark模式

首先建立maven工程加入整個專案所用到的包的maven依賴

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi
="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <groupId>sparkday01</groupId> <artifactId>sparkday01</artifactId> <version>0.0.1-SNAPSHOT</version
>
<packaging>jar</packaging> <name>sparkday01</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.6.4</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.10</artifactId> <version>1.6.1</version> </dependency> </dependencies> </project>

下面開始初始化spark

spark程式需要做的第一件事情,就是建立一個SparkContext物件,它將告訴spark如何訪問一個叢集,而要建立一個SparkContext物件,你首先要建立一個SparkConf物件,該物件訪問了你的應用程式的資訊

比如下面的程式碼是執行在spark模式下

public class sparkTestCon {

    public static void main(String[] args) {
        SparkConf conf=new SparkConf();
        conf.set("spark.testing.memory", "2147480000");     //因為jvm無法獲得足夠的資源
        JavaSparkContext sc = new JavaSparkContext("spark://192.168.52.140:7077", "First Spark App",conf);
        System.out.println(sc);
    }

}

下面是執行在本機,把上面的第6行程式碼改為如下

JavaSparkContext sc = new JavaSparkContext("local", "First Spark App",conf);

快速入門

可以參看我的部落格,轉載的一篇文章
Spark快速入門

Spark程式設計

每一個spark應用程式都包含一個驅動程式(driver program ),他會執行使用者的main函式,並在叢集上執行各種並行操作(parallel operations)

spark提供的最主要的抽象概念有兩種:
彈性分散式資料集(resilient distributed dataset)簡稱RDD ,他是一個元素集合,被分割槽地分佈到叢集的不同節點上,可以被並行操作,RDDS可以從hdfs(或者任意其他的支援Hadoop的檔案系統)上的一個檔案開始建立,或者通過轉換驅動程式中已經存在的Scala集合得到,使用者也可以讓spark將一個RDD持久化到記憶體中,使其能再並行操作中被有效地重複使用,最後RDD能自動從節點故障中恢復

spark的第二個抽象概念是共享變數(shared variables),它可以在並行操作中使用,在預設情況下,當spark將一個函式以任務集的形式在不同的節點上並行執行時,會將該函式所使用的每個變數拷貝傳遞給每一個任務中,有時候,一個變數需要在任務之間,或者驅動程式之間進行共享,spark支援兩種共享變數:
廣播變數(broadcast variables),它可以在所有節點的記憶體中快取一個值。
累加器(accumulators):只能用於做加法的變數,例如計算器或求和器

RDD的建立有兩種方式
1.引用外部檔案系統的資料集(HDFS)
2.並行化一個已經存在於驅動程式中的集合(並行集合,是通過對於驅動程式中的集合呼叫JavaSparkContext.parallelize來構建的RDD)

第一種方式建立
下面通過程式碼來理解RDD和怎麼操作RDD

package com.tg.spark;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.storage.StorageLevel;
/**
 * 引用外部檔案系統的資料集(HDFS)建立RDD
 *  匿名內部類定義函式傳給spark
 * @author 湯高
 *
 */
public class RDDOps {
    //完成對所有行的長度求和
    public static void main(String[] args) {

        SparkConf conf=new SparkConf();
        conf.set("spark.testing.memory", "2147480000");     //因為jvm無法獲得足夠的資源
        JavaSparkContext sc = new JavaSparkContext("local", "First Spark App",conf);
        System.out.println(sc);

        //通過hdfs上的檔案定義一個RDD 這個資料暫時還沒有載入到記憶體,也沒有在上面執行動作,lines僅僅指向這個檔案
        JavaRDD<String> lines = sc.textFile("hdfs://master:9000/testFile/README.md");

        //定義lineLengths作為Map轉換的結果 由於惰性,不會立即計算lineLengths
        //第一個引數為傳入的內容,第二個引數為函式操作完後返回的結果型別
        JavaRDD<Integer> lineLengths = lines.map(new Function<String, Integer>() {
          public Integer call(String s) { 
              System.out.println("每行長度"+s.length());
              return s.length(); }
        });
        //執行reduce  這是一個動作action  這時候,spark才將計算拆分成不同的task,
        //並執行在獨立的機器上,每臺機器執行他自己的map部分和本地的reducation,並返回結果集給去驅動程式
        int totalLength = lineLengths.reduce(new Function2<Integer, Integer, Integer>() {
          public Integer call(Integer a, Integer b) { return a + b; }
        });

        System.out.println(totalLength);
        //為了以後複用  持久化到記憶體...
        lineLengths.persist(StorageLevel.MEMORY_ONLY());


    }
}

如果覺得剛剛那種寫法難以理解,可以看看第二種寫法

package com.tg.spark;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.storage.StorageLevel;
/**
 * 引用外部檔案系統的資料集(HDFS)建立RDD 
 *  外部類定義函式傳給spark
 * @author 湯高
 *
 */
public class RDDOps2 {
    // 完成對所有行的長度求和
    public static void main(String[] args) {

        SparkConf conf = new SparkConf();
        conf.set("spark.testing.memory", "2147480000"); // 因為jvm無法獲得足夠的資源
        JavaSparkContext sc = new JavaSparkContext("local", "First Spark App", conf);
        System.out.println(sc);


        //通過hdfs上的檔案定義一個RDD 這個資料暫時還沒有載入到記憶體,也沒有在上面執行動作,lines僅僅指向這個檔案
        JavaRDD<String> lines = sc.textFile("hdfs://master:9000/testFile/README.md");
        //定義lineLengths作為Map轉換的結果 由於惰性,不會立即計算lineLengths
        JavaRDD<Integer> lineLengths = lines.map(new GetLength());


        //執行reduce  這是一個動作action  這時候,spark才將計算拆分成不同的task,
                //並執行在獨立的機器上,每臺機器執行他自己的map部分和本地的reducation,並返回結果集給去驅動程式
        int totalLength = lineLengths.reduce(new Sum());

        System.out.println("總長度"+totalLength);
        // 為了以後複用 持久化到記憶體...
        lineLengths.persist(StorageLevel.MEMORY_ONLY());

    }
    //定義map函式
    //第一個引數為傳入的內容,第二個引數為函式操作完後返回的結果型別
    static class GetLength implements Function<String, Integer> {
        public Integer call(String s) {
            return s.length();
        }
    }
    //定義reduce函式 
    //第一個引數為內容,第三個引數為函式操作完後返回的結果型別
    static class Sum implements Function2<Integer, Integer, Integer> {
        public Integer call(Integer a, Integer b) {
            return a + b;
        }
    }
}

第二種方式建立RDD

package com.tg.spark;

import java.util.Arrays;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.storage.StorageLevel;

import com.tg.spark.RDDOps2.GetLength;
import com.tg.spark.RDDOps2.Sum;
/**
 * 並行化一個已經存在於驅動程式中的集合建立RDD
 * @author 湯高
 *
 */
public class RDDOps3 {
    // 完成對所有數求和
    public static void main(String[] args) {

        SparkConf conf = new SparkConf();
        conf.set("spark.testing.memory", "2147480000"); // 因為jvm無法獲得足夠的資源
        JavaSparkContext sc = new JavaSparkContext("local", "First Spark App", conf);
        System.out.println(sc);

        List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
        //並行集合,是通過對於驅動程式中的集合呼叫JavaSparkContext.parallelize來構建的RDD
        JavaRDD<Integer> distData = sc.parallelize(data);

        JavaRDD<Integer> lineLengths = distData.map(new GetLength());

        // 執行reduce 這是一個動作action 這時候,spark才將計算拆分成不同的task,
        // 並執行在獨立的機器上,每臺機器執行他自己的map部分和本地的reducation,並返回結果集給去驅動程式
        int totalLength = lineLengths.reduce(new Sum());

        System.out.println("總和" + totalLength);
        // 為了以後複用 持久化到記憶體...
        lineLengths.persist(StorageLevel.MEMORY_ONLY());

    }

    // 定義map函式
    static class GetLength implements Function<Integer, Integer> {

        @Override
        public Integer call(Integer a) throws Exception {

            return a;
        }
    }

    // 定義reduce函式
    static class Sum implements Function2<Integer, Integer, Integer> {
        public Integer call(Integer a, Integer b) {
            return a + b;
        }
    }
}

注意:上面的寫法是基於jdk1.7或者更低版本
基於jdk1.8有更簡單的寫法
下面是官方文件的說明


Note: In this guide, we’ll often use the concise Java 8 lambda syntax to specify Java functions, but in older versions of Java you can implement the interfaces in the org.apache.spark.api.java.function package. We describe passing functions to Spark in more detail below.


Spark’s API relies heavily on passing functions in the driver program to run on the cluster. In Java, functions are represented by classes implementing the interfaces in the org.apache.spark.api.java.function package. There are two ways to create such functions:

Implement the Function interfaces in your own class, either as an anonymous inner class or a named one, and pass an instance of it to Spark.
In Java 8, use lambda expressions to concisely define an implementation.

所以如果要完成上面第一種建立方式,在jdk1.8中可以簡單的這麼寫

JavaRDD<String> lines = sc.textFile("hdfs://master:9000/testFile/README.md");
JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
int totalLength = lineLengths.reduce((a, b) -> a + b);

要完成第二種方式的建立,簡單的這麼寫

List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);

主要不同就是在jdk1.7中我們要自己寫一個函式傳到map或者reduce方法中,而在jdk1.8中可以直接在map或者reduce方法中寫lambda表示式

相關推薦

Java接入Spark建立RDD方式操作RDD

首先看看思維導圖,我的spark是1.6.1版本,jdk是1.7版本 spark是什麼? Spark是基於記憶體計算的大資料平行計算框架。Spark基於記憶體計算,提高了在大資料環境下資料處理的實時性,同時保證了高容錯性和高可伸縮性,允許使用者將Spar

Java Spark建立RDD方式操作RDD

首先看看思維導圖,我的spark是1.6.1版本,jdk是1.7版本  spark是什麼?  Spark是基於記憶體計算的大資料平行計算框架。Spark基於記憶體計算,提高了在大資料環境下資料處理的實時性,同時保證了高容錯性和高可伸縮性,允許使用者將Spark 部署在大量廉

java執行緒建立方式,六狀態匿名內部類建立子類或實現類物件

一.匿名內部類建立子類或實現類物件 new Test(){} 相當於建立了Test類的子類物件 並且沒有類名 建立介面實現類 new 介面名() {};介面實現類的物件 注意 : new 後邊是類或者介面名 大括號內是類或者介面中的方法 public

執行緒 建立方法)啟動

java使用Thread類代表執行緒,所有的執行緒物件都必須是Thread 類或者子類的例項。 每個執行緒的任務是完成一定的任務,就是執行一段程式流。 目前有兩種建立方式(第三種後續補充) 一種是繼承Thread類 一種是實現Runnable介面` 一、繼承Thread類 1.定

Java執行緒建立方式

package test; /**  * 建立執行緒  *  */ public class Demo1 {      public static void main(String arg[]){

建立Java多執行緒的方式執行緒異常

一.使用多執行緒的兩種方法  使用多執行緒的兩種方法有:繼承Thread類和實現runable介面。 二.繼承Thread類 來看一下thread類的原始碼:  class Thread implements Runnable { 首先可以看出thread類也是實現Runable介面的run方法如下:

執行緒 建立方法)啟動

java使用Thread類代表執行緒,所有的執行緒物件都必須是Thread 類或者子類的例項。 每個執行緒的任務是完成一定的任務,就是執行一段程式流。 目前有兩種建立方式(第三種後續補充) 一種是繼承Thread類 一種是實現Runnable介面` 一、繼承Th

java--(多執行緒建立方式Thread類Runnable介面)

(一)繼承Thread類建立多執行緒----單執行緒下面的程式碼是一個死迴圈,但是不會執行main裡面的迴圈語句,而是run()裡面的語句,這是因為該程式是一個單執行緒程式,當呼叫MyThread類的run()方法時,遇到死迴圈,迴圈一直進行。因此,MyThread類的列印

java實現多線程的方式

vat nts lock name div print 創建對象 pre thread Java需要並發控制三個原因: 多線程環境 存在共享資源 多個線程操作(修改)共享資源 下面分別用繼承Thread類和實現Runnable接口倆種方式實現並發控制, 繼承Thread

Java中創建String的方式差異

ima 分享 引用 永遠 bubuko 技術分享 方式 println print 我們知道創建一個String類型的變量一般有以下兩種方法: String str1 = "abcd"; String str2 = new String("abcd"); 那麽

java實現二分查詢演算法,方式實現,非遞迴遞迴

java實現二分查詢演算法 1、概念 2、前提 3、思想 4、過程 4、複雜度 5、實現方式 1. 非遞迴方式 2. 遞迴方式

GitHub建立分支方式

一:需求背景          1.1 開發新功能和修改bug一般新建分支,如果覺得可行,可以合併到master分支上. 二:建立方式      &nbs

Java多執行緒建立的三方式與對比

一、繼承Thread類建立執行緒類 1、定義Thread類的子類,並重寫該類的run()方法,該run()方法的方法體代表了執行緒需要完成的任務,即執行緒的執行體。 2、建立Thread子類的例項,即建立執行緒物件。 3、呼叫執行緒物件的start()方法來啟動該執行緒

物件建立的三方式閉包的常用場景--js

物件建立的三種方式 ①通過new關鍵字建立物件 var obj = new Object(); obj.name = 'daxue'; obj.age = 28; obj.fun = function(){ } alert(obj.age); ②

java實現多執行緒的方式

Java需要併發控制三個原因: 多執行緒環境 存在共享資源 多個執行緒操作(修改)共享資源 下面分別用繼承Thread類和實現Runnable介面倆種方式實現併發控制, 繼承Thread類 繼承Thread類方式,最後建立物件是因為會是三個不同的執行緒物件,所以

SpringBoot學習筆記(二) SpringBoot專案建立方式

叄念 springboot 專案建立方式其實有多種,這裡我們主要介紹兩種方式: 當然這裡建議大家用方式一來建立,方式二用於理解 方式

Java專案中使用groovy的方式

可能還有更多的使用方式,在此只記錄自己使用的兩種方式: 方式一 String type = "List<String>"; String json

springboot通過java bean整合通用mapper的方式

前言:公司開發的框架基於springboot深度封裝,只能使用java bean的方式進行專案配置。第一種:1.引入POM座標,需要同時引入通用mapper和jpa<dependency> <groupId>tk.mybatis</gro

java呼叫CXF WebService介面的方式

通過http://localhost:7002/card/services/HelloWorld?wsdl訪問到xml如下,說明介面寫對了。 2.靜態呼叫         // 建立WebService客戶端代理工廠         JaxWsProxyFact

spark連線hive的方式

在pom中新增依賴 <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2