1. 程式人生 > >spark基礎入門

spark基礎入門

1、spark概述

  • spark是基於記憶體的一個計算框架,計算速度非常的快。這裡面沒有涉及到任何儲存,如果想要處理外部的資料來源,比如資料在HDFS上,此時我們就需要先搭建一個hadoop叢集。

2、spark的特點

  • 1、速度快(比mapreduce在記憶體中快100倍,比在磁碟中快10倍)
    • (1)spark在處理的資料中間結果資料可以不落地,mapreduce每次中間結果都要落地。
    • (2)在mapreduce計算的時候,mapTask,reduceTask,每一個task都對應一個jvm程序。
      在spark中,它同樣會按照hadoop中切片邏輯,會有N個task,而這些task都是執行在worker節點上,worker上會有executor程序,而這些task會以執行緒的方式執行在executor上面。
  • 2、易用性

    • 可以使用多種語言來編寫spark應用程式
    • java
    • scala
    • Python
    • R
  • 3、通用性

    • 可以使用sparksql、sparkStreaming、Mlib、Graphx
  • 4、相容性

    • 可以執行在不同的資源排程平臺
    • yarn(resourceManger分配資源)
    • mesos(是apache下開源的資源排程框架)
    • standAlone(master進行資源的分配)

3、spark叢集安裝

  • 1、下載對應版本的安裝包
  • 2、上傳安裝包到伺服器上
  • 3、規劃一下安裝目錄
  • 4、解壓安裝包到指定的安裝目錄
  • 5、重新命名安裝目錄
  • 6、修改配置檔案 cd conf
    • (1) spark-env.sh.template (需要 mv spark-env.sh.template spark-env.sh)
    • 配置javahome export JAVA_HOME=/export/servers/jdk
    • 配置master的Host export SPARK_MASTER_HOST=node1
    • 配置master的Port export SPARK_MASTER_PORT=7077
    • (2)slaves.template (需要 mv slaves.template slaves)
    • 新增worker節點
      • node2
      • node3
  • 7、配置一下spark的環境變數
  • 8、通過scp命令分發到其他節點中
    • spark安裝目錄
    • /etc/profile
  • 9、所有機器都要source /etc/profile
  • 10、可以啟動spark叢集
    • $SPARK_HOME/sbin/start-all.sh
    • 可以通過web介面訪問master
  • 11、停止spark叢集
    • $SPARK_HOME/sbin/stop-all.sh

4、spark高可用叢集配置

  • 1、需要先zk叢集
  • 2、修改spark配置(spark-env.sh)
    • (1)註釋掉master的地址
    • (2) 引入zk配置
    • export SPARK_DAEMON_JAVA_OPTS=”-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=hdp-node-01:2181,hdp-node-02:2181,hdp-node-03:2181 -Dspark.deploy.zookeeper.dir=/spark”
  • 3、啟動
    • 啟動zk叢集
    • 需要在spark叢集中任意一臺機器上啟動 start-all.sh
      • 產生master程序
      • 並且會根據 slaves,去對應的主機名上啟動worker程序
    • 在其他worker節點上單獨啟動master
      • start-master.sh

5、初識spark程式

  • 已經知道那個master是活著的master
    • –master spark://node1:7077
  • 有很多的master時候
    • –master spark://node1:7077,node2:7077,node3:7077

6、spark-shell使用

  • 1、spark-shell –master local[N] (本地單機版)
    • local[N]:表示在本地模擬N個執行緒來運行當前任務
  • 2、spark-shell –master local[*] (本地單機版)
    • 這個*表示當前機器上所有可用的資源
  • 3、spark-shell –master spark://node2:7077
  • 4、spark-shell 讀取hdfs上的資料檔案
    • sc.textFile("hdfs://node1:9000/wc.txt").flatMap(_.split(" ")).map(x=>(x,1)).reduceByKey(_+_).collect

7、spark整合hdfs

  • 1、修改配置檔案(spark-env.sh)

    • 新增配置引數
    • export HADOOP_CONF_DIR=/export/servers/hadoop/etc/hadoop
      • 通過scp分發配置到其他節點
  • 2、可以sc.textFile("/wc.txt").flatMap(_.split(" ")).map(x=>(x,1)).reduceByKey(_+_).collect

8、scala語言程式設計spark單詞計數

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
//通過scala編寫spark的單詞計數程式
object WordCount {

  def main(args: Array[String]): Unit = {
    //1、建立SparkConf物件,設定appName和master地址,local[2]表示本地使用2個執行緒來進行計算
    val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[2]")
    //2、建立SparkContext物件,這個物件很重要,它會建立DAGScheduler和TaskScheduler
    val sc: SparkContext = new SparkContext(sparkConf)
    //設定日誌輸出級別
    sc.setLogLevel("WARN")
    //3、讀取資料檔案
    //val data: RDD[String] = sc.textFile(args(0))
    val data: RDD[String] = sc.textFile("E:\\wordcount\\input\\words.txt")
    //4、切分每一行,並且壓平  hello、you、me
    val words: RDD[String] = data.flatMap(_.split(" "))
    //5、每個單詞記位1
    val wordAndOne: RDD[(String, Int)] = words.map((_,1))
    //6、相同單詞出現的次數進行累加
    val result: RDD[(String, Int)] = wordAndOne.reduceByKey(_+_)
    //按照單詞出現的次數降序排序
    val sortResult: RDD[(String, Int)] = result.sortBy(_._2,false)
    //7、收集資料,列印輸出
    val finalResult: Array[(String, Int)] = sortResult.collect()
    //列印結果
    finalResult.foreach(x=>println(x))
    //關閉
    sc.stop()
  }
}

9、java語言程式設計spark單詞計數

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

//利用java語言來實現spark的單詞計數
public class WordCount_Java {
    public static void main(String[] args) {
        //1、建立SparkConf物件,設定appName和master地址
        SparkConf sparkConf = new SparkConf().setAppName("WordCount_Java").setMaster("local[2]");

        //2、建立javaSparkContext物件
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);

        //3、讀取資料檔案
        JavaRDD<String> dataJavaRDD = jsc.textFile("E:\\wordcount\\input\\words.txt");

        //4、對每一行進行切分壓平
        JavaRDD<String> wordsJavaRDD = dataJavaRDD.flatMap(new FlatMapFunction<String, String>() {
            @Override               //line表示每一行記錄
            public Iterator<String> call(String line) throws Exception {
                //切分每一行
                String[] words = line.split(" ");

                return Arrays.asList(words).iterator();
            }
        });

        //5、每個單詞記為1
        JavaPairRDD<String, Integer> wordAndOneJavaPairRDD = wordsJavaRDD.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String word) throws Exception {
                return new Tuple2<>(word, 1);
            }
        });

        //6、把相同單詞出現的次數累加  (_+_)
        JavaPairRDD<String, Integer> resultJavaPairRDD = wordAndOneJavaPairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });

        //按照單詞出現的次數降序排序
        //需要將(單詞,次數)進行位置顛倒 (次數,單詞)
        JavaPairRDD<Integer, String> sortJavaPairRDD = resultJavaPairRDD.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
            @Override
            public Tuple2<Integer, String> call(Tuple2<String, Integer> t) throws Exception {
                return new Tuple2<>(t._2, t._1);
            }
        }).sortByKey(false);

        //將(次數,單詞)變為(單詞,次數)
        JavaPairRDD<String, Integer> finalSortJavaPairRDD = sortJavaPairRDD.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(Tuple2<Integer, String> t) throws Exception {
                return new Tuple2<>(t._2, t._1);
            }
        });

        //7、收集列印
        List<Tuple2<String, Integer>> finalResult = finalSortJavaPairRDD.collect();

        for(Tuple2<String, Integer> t:finalResult){
            System.out.println(t);
        }

        jsc.stop();
    }
}

10、上面的程式所依賴的pom.xml檔案

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>cn.itcast</groupId>
    <artifactId>Spark</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>18</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <scala.version>2.11.8</scala.version>
        <scala.compat.version>2.11</scala.compat.version>
        <hadoop.version>2.7.4</hadoop.version>
        <spark.version>2.0.2</spark.version>
    </properties>
    <dependencies>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>${scala.version}</version>
    </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
    </dependencies>
    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.18.1</version>
                <configuration>
                    <useFile>false</useFile>
                    <disableXmlReport>true</disableXmlReport>
                    <includes>
                        <include>**/*Test.*</include>
                        <include>**/*Suite.*</include>
                    </includes>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass></mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

Spark執行基本流程圖

圖片1.png