1. 程式人生 > >Spark實戰(5)_Spark Core核心程式設計

Spark實戰(5)_Spark Core核心程式設計

Spark版本

cdh5.9.0整合的spark的版本1.6.0,整合的hadoop版本2.6.0。檢視的網址:

http://archive.cloudera.com/cdh5/redhat/6/x86_64/cdh/5.9.0/

如果用cdh5.9.0 parcels離線安裝自帶的spark(on yarn),啟動時提示缺少包,需要修改spark-env.sh的配置SPARK_DIST_CLASSPATH,裡面預設的配置為線上用rpm方式安裝的配置,修改為/opt/clouderra/parcels/CDH/lib。

Spark執行模式

Spark 的執行模式有 local、Yarn、Standalone、Mesos四類。

開發和測試用 local 模式,其實就是用多執行緒模似分散式執行。

如果業務部門較少且不需要對部門或組之間的資源做劃分和優先順序排程的話,可以使用 Standalone 模式來部署。

當如果有多個部門或組,且希望每個組織可以限制固定執行的最大資源,另外組或者任務需要有優先順序執行的話,可以選擇 Yarn 或 Mesos。

Standalone模式,即獨立模式,master/slave(worker),自帶完整的服務,可單獨部署到一個叢集中,無需依賴任何其他資源管理系統。需要啟動Master和Worker守護程序,即服務端程序,就好比Mapreduce的JobTracker和TaskTracker。

Spark on Yarn: 把Spark作業的排程和資源分配交給Yarn,Yarn相當於Spark叢集的Master,Spark無自己的守護程序,僅僅作為客戶端存在。

MR(Hive)、Storm、Tez、Spark,期望這些作業有統一的排程和資源分配的角色,Yarn(MR2)。

Spark元件

Cluster Manager:在Standalone模式中即為Master(主節點),控制整個叢集,監控Worker。在YARN模式中為資源管理器。

Worker:從節點,負責控制計算節點,啟動Executor或Driver。在YARN模式中為NodeManager,負責計算節點的控制。

Driver:執行Application的main()函式並建立SparkContext。

Executor:執行器,在worker node上執行任務的元件、用於啟動執行緒池執行任務。

SparkContext:整個應用的上下文,控制應用的生命週期,提交作業的入口。

RDD:Spark的基本計算單元,一組RDD可形成執行的有向無環圖RDD Graph。

DAG Scheduler:實現將Spark作業分解成一到多個Stage,每個Stage根據RDD的Partition個數決定Task的個數,然後生成相應的Task set放到TaskScheduler(NodeManager)中。

TaskScheduler:將任務(Task)分發給Executor執行。

Stage:一個Spark作業一般包含一到多個Stage。

Task:一個Stage包含一到多個Task,通過多個Task實現並行執行的功能。

Spark叢集部署

配置免金鑰登入

CDH中用CM進行叢集管理,叢集直接互聯是通過ssh協議,但我們不需要配置ssh免密匙訪問,因為CM中配置了通過相同帳戶密碼訪問。用Apache Spark的話,必須配置ssh免密匙訪問。

安裝Scala和Spark

修改環境變數

1vi /etc/profile
2export HADOOP_HOME=/opt/cloudera/parcels/CDH/lib/hadoop
3export SPARK_HOME=/opt/soft/spark2.0/spark-2.2.0-bin-hadoop2.6
4export SCALA_HOME=/opt/soft/spark2.0/scala-2.11.8
5export JAVA_HOME=/opt/soft/jdk1.8.0_131
6export HADOOP_CONF_DIR=/etc/hadoop/conf

修改$SPARK_HOME/conf
mv slaves.template slaves,slaves裡配置工作節點主機名列表。

mv spark-env.sh.template spark-env.sh,spark-env.sh配置一些環境變數,由於我們用Yarn模式,這裡面不用配置。如果是standalone模式呢?

分散式模式執行測試,

1spark-submit --class org.apache.spark.examples.SparkPi \
2--master yarn \
3--num-executors 1 \
4--driver-memory 1g \
5--executor-memory 1g \
6--executor-cores 1 \
7--conf "spark.app.name=SparkPi" \
8/export/servers/spark-2.0.2-bin-hadoop2.6/examples/jars/spark-examples_2.11-2.0.2.jar

報錯資訊,設定HADOOP_CONF_DIR環境變數即可。

1Exception in thread "main" java.lang.Exception: When running with master 'yarn' either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.
2        at org.apache.spark.deploy.SparkSubmitArguments.validateSubmitArguments(SparkSubmitArguments.scala:256)
3        at org.apache.spark.deploy.SparkSubmitArguments.validateArguments(SparkSubmitArguments.scala:233)
4        at org.apache.spark.deploy.SparkSubmitArguments.<init>(SparkSubmitArguments.scala:110)
5        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:117)
6        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

如果記憶體不足,報錯的話,在cm裡進行yarn的配置,如下2個設定為2g:
yarn.scheduler.maximum-allocation-mbyarn.nodemanager.resource.memory-mb
儲存後,部署客戶端配置,把cm介面裡修改過的引數同步到每個節點的xml配置檔案裡,重啟Yarn服務。

RDD

Spark 對資料的核心抽象——彈性分散式資料集(Resilient Distributed Dataset,簡稱RDD)。在Spark 中,對資料的所有操作不外乎建立RDD和操作RDD 。

對資料(Seq)的操作,比如List:轉換(Transform),map、filter,返回List型別,資料轉換/加工過程。Action:head、tail、count ,返回不同型別,即我們需要的結果。

RDD就是類似集合類(Iterable),具有和集合類幾乎完全相同的操作(Transform和Action)。而在這一切背後,Spark 會自動將RDD 中的資料分發到叢集上,並將操作並行化執行。

Spark中的RDD就是一個不可變的分散式物件集合。每個RDD 都被分為多個分割槽,這些分割槽執行在叢集中的不同節點上。RDD 可以包含Python、Java、Scala 中任意型別的物件,甚至可以包含使用者自定義的物件。

RDD是類似Iterable的資料結構。

idea中進行spark core開發,需要在工程中新建一個lib目錄,把spark的包複製進去,在Project Structure中的Libraries中把包加進來。

建立RDD

使用者可以使用兩種方法建立RDD:

  • 用SparkContext的parallelize(Seq)把Seq轉為RDD。該方式常用於學習和實驗。
  • 讀外部資料,通常是讀HDFS、訊息佇列等。
1def parallelize[T](seq: Seq[T], numSlices: Int = defaultParallelism) : RDD[T]


numSlices是並行度,具有初始值所以呼叫時可以只給一個引數。比如可以parallelize(seq),可以parallelize(seq,10),並行度為10意味著Spark把資料分割為10份,放在叢集上執行。defaultParallelism是機器CPU個數。

1# 檢視CPU的個數
2cat /proc/cpuinfo| grep "processor"| wc -l

RDD的操作

RDD有兩種型別的操作:Transform操作和Action操作。注:就是Iterable類中的函式,Transform返回Iterable本身型別,Action返回新型別。

Iterable: Seq、Map

對應到

RDD:單元素RDD、PairRDD

Transform操作會由一個RDD生成一個新的RDD,這個過程中不進行實質計算,只有當第一次Action操作時才會真正計算。稱作Lazy計算,惰性計算。

Action操作會對RDD計算出一個結果,可以把結果返回,或把結果儲存到外部儲存系統(如HDFS)中。

RDD是類似Iterable的資料結構,也具有Iterable類的Map()、filter()、flatMap()等高階函式。

Action操作

collect():把資料返回驅動器程式中最簡單、最常見的操作, 通常在單元測試中使用,資料量不能太大,因為放在記憶體中,資料量大會記憶體溢位。

reduce():類似sum(),如:val sum = rdd.reduce((x, y) => x + y),結果同sum。

fold():和reduce()類似,多一個“初始值”,當初始值為零時效果同reduce()。fold(0) = reduce()

take(n):返回RDD中的n個元素,並且嘗試只訪問儘量少的分割槽。

top(n):從RDD中獲取前幾個元素。

count():用來返回元素的個數。

countByValue():返回一個從各值到值對應的計數的對映表。

sum():返回彙總。

fold(n)的執行原理:每個分割槽裡進行這樣的計算:初始值+sum(元素),最後進行:初始值+sum(分割槽sum值),初始值累加次數為分割槽數+1次。

1// 叢集模式執行的話,通常用資料之前需要調rdd.collect()
2rdd.collect().foreach(println)
3// 叢集模式用它可能拿不全
4rdd.foreach(println)

持久化函式persist()

Spark提供rdd的persist()函式來解決這個重複計算的問題,persist()把需要重複使用的rdd存起來,這樣僅第一個Action操作才會計算,其他Action操作不需要再計算。

當我們執行rdd的persist()時,計算出RDD的節點會分別儲存它們所求出的分割槽資料。如果一個有持久化資料的節點發生故障,Spark 會在需要用到快取的資料時重算丟失的資料分割槽。

rdd的persist()有5種持久化級別,分別是:來自org.apache.spark.storage.StorageLevel的定義。

級別使用的空間CPU時間是否在記憶體中是否在磁碟上備註
MEMORY_ONLY
MEMORY_ONLY_SER
MEMORY_AND_DISK中等部分部分如果資料在記憶體中放不下,則溢寫到磁碟上
MEMORY_AND_DISK_SER部分部分如果資料在記憶體中放不下,則溢寫到磁碟上,在記憶體中存放序列化後的資料
DISK_ONLY
1val rdd1 = rdd.map(x => x+1)
2rdd1.persist(StorageLevel.DISK_ONLY)
3println(rdd1.first())
4println(rdd1.count())
5println(rdd1.sum())
6println(rdd1.collect().mkString(","))
7rdd1.unpersist()  //釋放快取,必須手工釋放

如果覺得資料過於重要,怕存一份有風險,則可以存2份:

1rdd1.persist(StorageLevel.MEMORY_ONLY_2)

注意

如果要快取的資料太多,記憶體中放不下,Spark會自動利用最近最少使用(LRU)的快取策略把最老的分割槽從記憶體中移除。但是對於僅把資料存放在記憶體中的快取級別,下一次要用到已經被移除的分割槽時,這些分割槽就需要重新計算。不必擔心你的作業因為快取了太多資料而被打斷。

如果MEMORY_ONLY記憶體不足的時候,Spark會自動用硬碟來承載。

WordCount案例

 1package com.padluo.spark.core
 2import org.apache.spark.{SparkConf, SparkContext}
 3object WordCount {
 4  def main(args: Array[String]): Unit = {
 5    val conf = new SparkConf()
 6      .setMaster("local")
 7      .setAppName("WordCount")
 8      .set("spark.testing.memory", "2147480000")
 9    val sc = new SparkContext(conf)
10    //    val lines = sc.textFile("hdfs://spark01:9000/user/spark/spark.txt", minPartitions = 1)
11    //    val lines = sc.textFile("/home/hadoop/spark.txt", minPartitions = 1)
12    //    val lines = sc.textFile("C:/Users/Administrator/Desktop/spark.txt", minPartitions = 1)
13    val lines = sc.textFile("file:///D:/Java/idea/IdeaProjects/spark-study/spark-core/resources/spark.txt", minPartitions = 1)
14    val words = lines.flatMap(_.split(" "))
15    val pairs = words.map((_, 1))
16    val wordCount = pairs.reduceByKey(_ + _)
17    wordCount.foreach(wordCount => println(wordCount._1 + " appeared " + wordCount._2 + " times."))
18    println(lines.count())
19    println(lines.first())
20  }
21}

List方式建立RDD,

從本地檔案/HDFS檔案讀取檔案建立RDD,注意本地檔案路徑和HDFS檔案路徑的寫法。

1val rdd = sc.textFile("file:///c:/spark.txt")
2val rdd = sc.textFile("hdfs://ip:8020/...")

PairRDD

通常多列RDD會轉為PairRDD進行操作,這樣就可以用PairRDD的Transform和Action操作。

PairRDD的建立

  • 可以通過sc.parallelize建立
  • 程式中其他RDD轉的

pairRDD的元素不是Map,而是Tuple2。

PairRDD的操作

reduceByKey(func),合併具有相同鍵的值。

groupByKey(),對具有相同鍵的值進行分組。

mapValues(func),對pariRDD中的每個值應用一個函式而不改變鍵。mapValues(func)函式,功能類似於map{case (x, y): (x,func(y))}

1// pariRDD使用map和filter,應結合case
2pairRdd.map{
3  case (k, v) => (k, v + 1)
4}
5// key保持不變,value加1
6pairRdd.mapValues(v => v + 1)
7// key進行分組,value兩兩相加
8pairRdd.reduceByKey(_ + _)
 1package com.padluo.spark.core
 2import org.apache.spark.{SparkConf, SparkContext}
 3object PairRddTest {
 4  def main(args: Array[String]): Unit = {
 5    val conf = new SparkConf()
 6      .setAppName("PairRddTest")
 7      .setMaster("local")
 8      .set("spark.testing.memory", "471859200")
 9    val sc = new SparkContext(conf) //Driver類
10    val rdd = sc.parallelize(List((2, 3), (4, 5), (3, 2), (2, 1)))
11    val rdd2 = sc.parallelize(List(2, 3, 5)).map(i => (i, i + 2)) // 建立pairRdd主要方式
12    rdd2.map {
13      case (k, v) => (k, v + 1)
14    } // i=>i+1 返回單元素,rdd2執行map函式需要返回和rdd2相同(PairRDD)型別
15    rdd2.filter {
16      case (k, v) => v % 2 == 0
17    }
18    // (2,3),(4,5),(3,2),(2,1)  -》(2,4),(4,6),(3,3),(2,2)
19    rdd.mapValues(i => i + 1).foreach(println _)
20    rdd.reduceByKey(_ + _).foreach(println _) // (2,3),(4,5),(3,2),(2,1) ->(2,4),(4,5),(3,2)
21    rdd.groupByKey().foreach(println)
22    println(rdd.groupByKey().getClass)
23  }
24}

案例:計算每個鍵對應的平均值

 1package com.padluo.spark.core
 2import org.apache.spark.{SparkConf, SparkContext}
 3object AvgTest {
 4  def main(args: Array[String]): Unit = {
 5    val conf = new SparkConf()
 6      .setAppName("PairRddTest")
 7      .setMaster("local")
 8      .set("spark.testing.memory", "471859200")
 9    val sc = new SparkContext(conf) //Driver類
10    val rdd = sc.parallelize(List((3, 4), (2, 4), (3, 2), (2, 6)))
11    rdd.mapValues(i => (i, 1)) // (3, (4, 1)

            
           

相關推薦

Spark實戰5_Spark Core核心程式設計

Spark版本cdh5.9.0整合的spark的版本1.6.0,整合的hadoop版本2.6.0。檢視的網址:http://archive.cloudera.com/cdh5/redhat/6/x86_64/cdh/5.9.0/如果用cdh5.9.0 parcels離線安裝自

Deeplearning4j 實戰5:基於多層感知機的Mnist壓縮以及在Spark實現

在上一篇部落格中,我們用基於RBM的的Deep AutoEncoder對Mnist資料集進行壓縮,應該說取得了不錯的效果。這裡,我們將神經網路這塊替換成傳統的全連線的前饋神經網路對Mnist資料集進行壓縮,看看兩者的效果有什麼異同。整個程式碼依然是利用Deeplearning4j進行實現,並且為了方

深度學習Deeplearning4j 入門實戰5:基於多層感知機的Mnist壓縮以及在Spark實現

在上一篇部落格中,我們用基於RBM的的Deep AutoEncoder對Mnist資料集進行壓縮,應該說取得了不錯的效果。這裡,我們將神經網路這塊替換成傳統的全連線的前饋神經網路對Mnist資料集進行壓縮,看看兩者的效果有什麼異同。整個程式碼依然是利用Deeplearnin

ceph分布式存儲實戰5——ceph存儲配置RBD鏡像日常管理

llb btrfs 分布 chan create sun 副本 狀態 鏡像 一、在線調整Ceph RBD的容量大小 1、支持調整ceph RBD的容量大小的底層文件系統 自由的增加或者減少RBD的容量,需要底層文件系統的支持,支持的文件系統有 1、XFS

Spark實戰SparkStreaming集成Kafka

round 形式 寫入 some base cal 接下來 會話 支持 Spark Streaming + Kafka集成指南 Kafka項目在版本0.8和0.10之間引入了一個新的消費者API,因此有兩個獨立的相應Spark Streaming包可用。請選擇正確的包,

吳裕雄 資料探勘與分析案例實戰5——python資料視覺化

# 餅圖的繪製# 匯入第三方模組import matplotlibimport matplotlib.pyplot as plt plt.rcParams['font.sans-serif']=['Simhei']plt.rcParams['axes.unicode_minus']=Falseziti =

小白的資料結構程式碼實戰5----佇列順序結構

//Author:張佳琪 #include <stdio.h> #include <stdlib.h> typedef int QElemType; typedef struct QNode { QElemType data; stru

Spring實戰5-Spel

SpEL簡介 Spring 動態語言,簡稱SpEL。是一個支援執行時查詢和操作物件圖的強大動態語言。 Spring 開發中經常涉及及呼叫各種資源的情況,包含普通檔案、網址、配置檔案、系統環境變數。我們可以使用SpEL實現資源的注入。SpEL已經整合到Spring框架的Be

Spark 實戰

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this w

Spark實戰:Kafka-SparkStreaming-Elasticsearch

本文介紹saprk實時部分----spark-streaming。spark-streaming可以實現實時批處理功能,實際上還是相當於小的批處理,但是是7*24工作,可以近實時但需要維護成本。本文裡的用java寫的demo,實現功能是將kafka作為spar

在ccs7下進行DM6467的開發5:Linux核心編譯

首先在ccs下建立一個Makefile工程:並將路徑指向核心程式碼所在的路徑:建立完成後是這個樣子的:接著配置交叉編譯的環境,開啟專案屬性:這兩個引數將傳遞給make命令。再把交叉編譯器的路徑新增到PATH列表中。最後修改一下生成的目標:經過這些簡單的配置,就可以很方便地在C

從官方例子入手 詳解資料清洗——kettle實戰5

注:本篇文章參考《Pentaho Kettle解決方案:使用PDI構建開源ETL解決方案》 資料清洗步驟 Kettle 裡沒有單一的資料清洗步驟,但又很多的步驟組合起來可以完成資料清洗的功能。(也可以使用”表輸入”步驟裡的可自定義的sql語句來做一

spark記錄5Spark運行流程及在不同集群中的運行過程

park 通知 dag 抽取 存在 的區別 kill 滿足 blog 摘自:https://www.cnblogs.com/qingyunzong/p/8945933.html 一、Spark中的基本概念 (1)Application:表示你的應用程序 (2)Driv

資料競賽實戰5——方圓之外

前言 1,背景介紹   這裡給出六千張影象做為訓練集。每個影象中只要一個圖形,要麼是圓形,要是是正方形。你的任務是根據這六千張圖片訓練出一個二元分類模型,並用它在測試集判斷每個影象中的形狀是圓還是方;測試集中有些影象既不是圓也不是方,也請將其甄別出來。 2,任務型別   二元分類,異常檢測,影象識別 3,資料

Java併發程式設計實戰5- 執行緒生命週期

在這篇文章中,我們來聊一下執行緒的生命週期。 [toc] # 概述 執行緒是作業系統中的一個概念,在Java中,它是實現併發程式的主要手段。 Java中的執行緒,本質上就是作業系統中的執行緒。 作業系統中的執行緒有“生老病死”,專業說法就是生命週期,雖然不同的開發語言對於作業系統的執行緒做了不同的封裝

Spark SQL 筆記(15)——實戰網站日誌分析5資料視覺化

1 常見的視覺化框架 echarts highcharts d3.js HUE Zeppelin 2 建立 Web 專案 下載Echarts的檔案放到此目錄 http://echarts.bai

J2SE核心開發實戰——字符串與包裝類

刪除 i++ cnblogs amp 分支語句 核心 最大 用途 else 字符串與包裝類 一、實驗簡單介紹 在本章。我們將學習一些用於處理字符串的API以及包裝類的相關知識。 本章知識點 字符串API 包裝類及其應用 二、認識字符

R語言數據挖掘實戰系列5

離群點檢測 關聯規則 時序模式 聚類分析 分類與預測 R語言數據挖掘實戰系列(5)——挖掘建模一、分類與預測分類和預測是預測問題的兩種主要類型,分類主要是預測分類標號(離散屬性),而預測主要是建立連續值函數模型,預測給定自變量對應的因變量的值。1.實現過程(1)分類分類是構造一個分類模型,

.net core 2.0學習筆記:Remoting核心類庫RealProxy遷移

ride dispatch 包含 void reflect 既然 splay creat (六) 在學習.net core的過程中,我們已經明確被告知,Remoting將不會被支持。官方的解釋是,.net framework 類型包含了太多的Runtime的內容,是

《selenium2 python 自動化測試實戰5——鍵盤事件

display 技術 添加 lan data- vbs nbsp .cn images 鍵盤事件,就是鍵盤上的一些操作,比如Ctrl +C,Ctrl+V,Ctrl+X等。 對鍵盤的操作需要導入另一個鍵盤的庫: from selenium.webdriver.commo