1. 程式人生 > 實用技巧 >spark 程式設計基礎

spark 程式設計基礎

一、大資料技術涉及的技術層面

  • 資料採集,通過etl將結構化、非結構化資料抽取到中間層,進行清洗、轉換、載入到資料集市,作為資料分析、資料探勘和流計算的基礎
  • 資料儲存和管理,通過分散式檔案系統、數倉、關係型資料庫、NoSql資料庫,對資料進行儲存和管理
  • 資料處理和分析,通過分散式計算框架,進行資料探勘、資料分析
  • 資料安全

為實現上述功能,hadoop大資料架構核心功能,分散式架構(hdfs)和分散式處理(MapReduce)

hadoop生態通過MapReduce實現資料的分散式處理,spark則是代替MapReduce的更高效元件,spark只是代替mapReduce的分散式處理,spark藉助hadoop的hafs、Hbase完成資料儲存,然後由spark完成資料的計算。

延生:flink和spark都是基於記憶體計算框架進行實時計算,全都執行在hadoop Yarn上,效能上 flink > spark > hadoop(MR)。流失計算中,flink是一行一行處理,spark是基於資料片(RDD)進行小批量處理。

二、大資料常用計算模式及代表產品

1> 批處理計算,spark/mapReduce;2> 流計算,Storm/Streams;3> 圖計算,GraphX、Pregel;4>查詢分析計算,Impala/hive

企業常見業務場景:使用mapReduce實現離線批處理;使用Impala實現實時互動查詢分析;使用Storm實現流式資料實時分析;使用Spark實現迭代計算

三、hadoop生態系統

以HDFS為基礎,通過YARN來管理和排程叢集資源,最終通過MapReduce實現分散式計算。而上層的Hive、Pig、Mahout等通過更簡單的語言編譯為MapReduce語句,給使用者以更好的互動體驗以及更低的使用門檻。

YARN

  • YARN的目標就是實現“一個叢集多個框架”,即在一個叢集上部署一個統 一的資源排程管理框架YARN,在YARN之上可以部署其他各種計算框架 ;
  • 由YARN為這些計算框架提供統一的資源排程管理服務,並且能夠根據各種 計算框架的負載需求,調整各自佔用的資源,實現叢集資源共享和資源彈性 收縮;

四、Spark的優勢

hadoop計算框架存在如下缺點:表達能力有效、磁碟IO開銷大;延遲高

Spark在借鑑Hadoop MapReduce優點的同時,很好地解決了 MapReduce所面臨的問題 相比於Hadoop MapReduce,Spark主要具有如下優點 :

  • Spark的計算模式也屬於MapReduce,但不侷限於Map和Reduce操作 ,還提供了多種資料集操作型別,程式設計模型比Hadoop MapReduce更 靈活
  • Spark提供了記憶體計算,可將中間結果放到記憶體中,對於迭代運算 效率更高
  • Spark基於DAG的任務排程執行機制,要優於Hadoop MapReduce的 迭代執行機制

Spark將資料載入記憶體後,之後的迭代計算都可以直接使用記憶體中的中間結果作運算,避免了從磁碟中頻繁讀取資料。

五、Spark生態系統

RDD:是Resillient Distributed Dataset(彈性分散式資料集)的簡稱,是分散式記憶體的一個抽象概念,提供了一種高度受限的共享記憶體模型。彈性--》資料集可大可小,分佈的數量可變

DAG:是Directed Acyclic Graph(有向無環圖)的簡稱,反映RDD之間的依賴關係

Executor:是執行在工作節點(WorkerNode)的一個程序,負責執行Task

應用(Application):使用者編寫的Spark應用程式

任務( Task ):執行在Executor上的工作單元

作業( Job ):一個作業包含多個RDD及作用於相應RDD上的各種操作

階段( Stage ):是作業的基本排程單位,一個作業會分為多組任務,每組任務被稱為階段,或者也被稱為任務集合,代表了一組關聯的、相互之間 沒有Shuffle依賴關係的任務組成的任務集

在Spark中,一個應用(Application)由一個任務控制節點(Driver)和若干個作業(Job)構成,一個作業由多個階段(Stage)構成,一個階段由多個任務(Task)組成。當執行一個應用時,任務控制節點會向叢集管理器(Cluster Manager)申請資源,啟動Executor,並向Executor傳送應用程式程式碼和檔案,然後在Executor上執行任務,執行結束後,執行結果會返回給任務控制節點,或者寫到HDFS或者其他資料庫中。

spark執行流程:

  • 當一個Spark應用被提交時,Driver建立一個SparkContext,由SparkContext負責和資源管理器(Cluster Manager)的通訊以及進行資源的申請、任務的分配和監控等。SparkContext會向資源管理器註冊並申請執行Executor的資源 ;
  • 資源管理器為Executor分配資源,並啟動Executor程序,Executor傳送心跳到資源管理器上;
  • SparkContext根據RDD的依賴關係構建DAG圖,DAG圖提交給DAG排程器(DAGScheduler)進行解析,將DAG圖分解成多個“階段”,並且計算出各個階段之間的依賴關係,然後把一個個“任務集”提交給底層的任務排程器(TaskScheduler)進行處理;Executor向SparkContext申請任務,任務排程器將任務分發給Executor執行,同時,SparkContext將應用程式程式碼發放給Executor;
  • 任務在Executor上執行,把執行結果反饋給任務排程器,然後反饋給DAG排程器,執行完畢後寫入資料並釋放所有資源 。

該過程的特點:

  • 資料本地化,計算向資料靠攏;
  • 多執行緒方式,executor執行task的時候採用多執行緒方式,減少了多程序任務頻繁的啟動開銷;
  • BlockManager儲存模組,儲存中間結果。

六、彈性分佈資料集(RDD)

RDD就是一個分散式物件集合,本質上是一個只讀的分割槽記錄集合,每個RDD可以分成多個分割槽,每個分割槽就是一個數據集片段,並且一個RDD的不同分割槽可以被儲存到叢集中不同的節點上,從而可以在叢集中的不同節點上進行平行計算。

RDD是隻讀的記錄分割槽的集合,不能修改,可以再轉換過程中進行修改。RDD提供了豐富的資料運算,轉換類(map/filter),動作類(count/collect)。RDD執行原理 fork/join機制。

RDD特性:

  • 只讀(不能修改原始的RDD,可以再新生成RDD過程中進行修改)
  • 記憶體操作,中間結果寫入到記憶體,不落地到磁碟

Spark 根據DAG 圖中的RDD 依賴關係,把一個作業分成多個階段。階段劃分的依據是窄依賴和寬依賴。

RDD在Spark架構中的執行過程:

  • 建立RDD物件;
  • SparkContext負責計算RDD之間的依賴關係,構建DAG;
  • DAGScheduler負責把DAG圖分解成多個Stage,每個Stage中包含了多個 Task,每個Task會被TaskScheduler分發給各個WorkerNode上的Executor去執行。

七、spark SQL 基礎

Hive庫是一種基於Hadoop開發的資料倉庫,相當於是SQL on Hadoop。其內部將SQL轉譯為MapReduce作業。

Spark SQL中新增了DataFrame(包含模式的RDD)。在Spark SQL中執行SQL語句,資料既可以來自RDD,也可以是Hive、 HDFS、Cassandra等外部資料來源。

SparkSession支援從不同的資料來源載入資料,並把資料轉換成 DataFrame,並且支援把DataFrame轉換成SQLContext自身中的表, 然後使用SQL語句來操作資料。SparkSession亦提供了HiveQL以及其 他依賴於Hive的功能的支援。

from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession
spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()

建立DataFrame時,可以使用spark.read操作
#讀取文字檔案people.txt建立DataFrame
spark.read.text("people.txt")
#讀取people.json檔案建立DataFrame。在讀取本地檔案或HDFS檔案時,要注意給出正確的檔案路徑
。
spark.read.json("people.json")
#讀取people.parquet檔案建立DataFrame
spark.read.parquet("people.parquet")

#讀取文字檔案people.json建立DataFrame
spark.read.format("text").load("people.txt")
#讀取JSON檔案people.json建立DataFrame
spark.read.format("json").load("people.json")
儲存DataFrame
df.write.text("people.txt")
df.write.json("people.json“)
或者
df.write.format("text").save("people.txt")
df.write.format("json").save("people.json")

spark sql 讀寫mysql ,可以通過jdbc連線資料庫,讀取資料

要向spark.student表中插入兩條記錄 。則該過程可分為4步進行:

  1. 建立表頭
  2. 生成Row物件
  3. 將表頭與Row物件對應起來
  4. 寫入資料庫
建立表頭
#!/usr/bin/env python3
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession
#生成一個指揮官(SparkSession)
spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()
#下面設定模式資訊(表頭),每個欄位指定為一個StructField
schema = StructType([StructField("id", IntegerType(), True),
                     StructField("name", StringType(), True),
                     StructField("gender", StringType(), True),
                     StructField("age", IntegerType(), True)])
生成Row物件
#下面設定兩條資料,表示兩個學生的資訊,parallelize得到包含兩個元素的RDD,".map"不改變元素個數
studentRDD = spark.sparkContext.parallelize(["3 Rongcheng M 26","4 Guanhua M 27"]).map(lambda x:x.split(" "))
#下面建立Row物件,每個Row物件都是rowRDD中的一行,通過Row物件轉化為DataFrame 
rowRDD = studentRDD.map(lambda p:Row(int(p[0].strip()), p[1].strip(),p[2].strip(), int(p[3].strip())))

利用資料和表頭生成DataFrame:
#建立起Row物件和模式之間的對應關係,也就是把資料(Row物件)和模式(表頭)對應起來
studentDF = spark.createDataFrame(rowRDD, schema)

將DataFrame存入資料庫中:
#寫入資料庫 prop = {} prop['user'] = 'root' #使用者名稱 prop['password'] = '123456' #密碼 prop['driver'] = "com.mysql.jdbc.Driver" #驅動程式名稱 #jdbc中的4個引數分別代表"資料庫","表名","追加","相關資訊全部寫入" studentDF.write.jdbc("jdbc:mysql://localhost:3306/spark",'student','append', prop)

八、Spark Streaming

Spark Streaming可整合多種輸入資料來源,如Kafka、 Flume、HDFS,甚至是普通的TCP套接字。經處理後的 資料可儲存至檔案系統、資料庫,或顯示在儀表盤裡。

Spark Streaming並不是真正意義上的流計算 。其基本原理是將實時輸入資料流以時間片(最小單位為秒)為單位進行拆分,然後經Spark引擎以微小批處理的方式處理每個時間片資料。

Spark Streaming最主要的抽象是DStream(Discretized Stream,離散化資料流),表示連續不斷的資料流。在內部實現上,Spark Streaming的輸入資料按照時間片(如1秒)分成一段一段,每一段資料轉換為Spark中的RDD,這些分段就是Dstream,並且對DStream的操作都最終轉變為對相應的RDD的操作 。因此,Spark Streaming是建立在SparkCore之上的。其邏輯本質很簡單,就是一系列的RDD。

每一種Spark工作方式都有一種資料抽象,回顧一下:

    • Spark Core:資料抽象為 RDD;
    • Spark SQL:資料抽象為 DataFrame;
    • Spark Streaming:資料抽象為 DStream。

Spark Streaming與Storm的對比

  • Spark Streaming和Storm最大的區別在於,Spark Streaming無法實現毫秒級的流計算,而Storm可以實現毫秒級響應;
  • Spark Streaming構建在Spark上,一方面是因為Spark的低延遲執行引擎(100ms+)可以用於實時計算,另一方面,相比於Storm,RDD資料集更容易做高效的容錯處理;
  • Spark Streaming可以同時相容批量和實時資料處理的邏輯和演算法。因此,方便了一些需要歷史資料和實時資料聯合分析的特定應用場合。