1. 程式人生 > >Spark基礎知識點兒彙總

Spark基礎知識點兒彙總



*spark的理解
spark是一個快速的、統一的大規模資料處理引擎
它是基於記憶體計算的
它的特點是:快速、易用、適用於各種資料處理場景(批處理、流處理、互動式處理)、它可以執行在多種分散式計算框架中,如yarn和mesos等

*spark的架構
Master
  spark計算叢集的主節點,負責接收客戶端提交來的spark job,並且負責work節點的資源申請和資源調配,在程式執行時,對各個子節點的狀態監控和執行程式的執行情況的檢視。
Worker
  根據master的資源申請,執行起executor和driver程式,在executor和driver上執行具體的執行程式

spark程式的叢集執行方式:
1.把master的url設定成叢集的主節點服務的url
  val conf = new SparkConf().setAppName("wordcount").setMaster("spark://centos1:7077")
  把本地的intellj裡的程式碼打成jar包
  然後再sc上新增:
  sc.addJar("E:\\bigdata\\sparkfirst20\\out\\artifacts\\sparkfirst20_jar\\sparkfirst20.jar")
 
  然後再intellj上右鍵執行就能把程式傳送到spark叢集上來執行
2.把專案打包,然後傳送到linux,使用spark的submit指令把程式釋出到spark叢集上執行
  spark-submit
  --class com.zhiyou.bd20.WordCount
  --master spark://centos1:7077
  --executor-memory 1G
  sparkfirst20.jar

standalone
client   cluster

*spark的程式設計模型
1.建立sparkconfig物件,用於配置spark執行環境
2.使用config來構建sparkcontext物件,sparkcontext物件主要用於和spark叢集服務連線併發布應用程式,同時用於載入資料來源和建立rdd、累加器、廣播變數
3.使用sparkcontext載入資料來源,獲取rdd
4.呼叫rdd的各種transformation方法
5.呼叫rdd的action方法觸發計算job的執行。在action方法中對計算結果進行讀值/取值。
6.sparkcontext.stop()


我們開發的spark程式碼可以分成兩塊:
1:Driver,application的入口執行程式,一個application的資料處理過程全部都是在driver中來進行定義和配置的。
2:Executor,一個application對資料的處理過程是分散式進行的,分散式處理中的每一個任務執行節點裡面會執行起一個executor,executor來接收driver傳送過來的任務(task)

driver在執行時,會負責監控每一個executor的執行進度,每一個executor在被分配給driver之後直接向driver負責,接收driver釋出進來的指令和任務,在這個過程中,具體task的執行進度狀況不再向master彙報。

application,app:一個完整的資料處理過程被稱為一個app
                一個application程式中包含一個driver和多個executor

Cluster Manager:分散式計算框架的主節點。spark叢集來說是:master,對yarn來說是:resource manager
Worker Node:分散式計算框架的子節點。對spark叢集來說就是:worker,對yarn來說是:node manager

job:一個application中會有多個job,job是由action方法來觸發的,一個action方法就會有一個job。每一個job都會有一個dag圖和一個排程,job的執行是由排程按照dag來調配work node子節點上執行其executor來接收並執行task的
Stage:一個job在被解析編譯時會劃分成多個stage,stage是以shuffle來作為邊界劃分的。每個stage裡有多個task任務。
task:每個job中都會有多個task,task是由driver分配指派給executor來進行執行的


*spark核心api
SparkContext型別

例項化sparkcontext物件:SparkContext.getOrCreate(conf)
建立rdd:
  makeRDD:使用driver節點記憶體中的集合物件建立rdd
  parallelize:使用driver節點記憶體中的集合物件建立rdd
  range:獲取一個序列rdd
 
  textFile:載入文字格式資料檔案獲取rdd
  sequenceFile:載入sequence個是的資料檔案獲取rdd
  newAPIHadoopFile:用來載入任意格式的mr可以載入的資料檔案
  newAPIHadoopRDD:用來載入hbase中的資料
 

建立累加器:
  collectionAccumulator
  doubleAccumulator
  longAccumulator
  register
建立廣播變數:
  broadcast

setCheckpointDir 設定檢查點的目錄

累加器和廣播變數可以優化spark程式

*RDD的api
rdd:彈性分散式資料集。
不可變的、元素被分割槽的,可被並行操作的資料集
1.rdd是一組分割槽
2.對rdd呼叫api方法進行計算時,計算函式是作用於每一個分割槽的(分散式計算)
3.rdd之間是有依賴關係的,每個rdd都有一個依賴列表,或稱為血統關係
4.對於kv的rdd可以指定partitioner對其進行分割槽
5.rdd在進行載入和計算時會盡可能考慮本地載入和本地計算

窄依賴:父rdd的一個分割槽的資料進行計算只流向子rdd的一個分割槽,這種依賴是窄依賴
寬依賴:父rdd的一個分割槽的資料計算時會流向子rdd的多個分割槽,這種依賴就叫做寬依賴

有些api方法是窄依賴如:map,flatmap,filter等
有些api方法是寬依賴,如:groupby,reducebykey等

總的來說rdd的方法分為action(返回值型別不是rdd的方法)和transformation(返回值型別是rdd的方法)

rdd本身的api包括:普通rdd(每個元素都是一個物件)和kvrdd(每個元素是一個kv)
  kv的rdd除了具有普通rdd的全部功能之外還有額外的功能

資料對映:對資料的轉換,解析,一對一計算等功能
map :一個輸入元素對應一個輸出元素。每一個元素都會經過map的運算元被計算
mapPartitions:每一個partition作為一個整體經過mappartition運算元被計算(效率更高)
flatMap:一個輸入對應多個輸出元素
filter:過濾rdd的元素,留下符合條件的資料。每個元素,經過運算元輸出是true就會被留下,如果是false的話就會被排掉
distinct:排重去重
keyBy:把一個普通的rdd轉換成kv rdd的一種方式,它的運算元就是確定key值

action:
foreach
foreachPartition

資料聚合的
action:
reduce:不需要初值,運算元的輸入和輸出型別要保持一致
fold:需要初值,運算元的輸入和輸出型別要保持一致
aggregate:需要初值,運算元的輸入和輸出型別可以不一致
max:
min:
count:
countByValue


transformation:
groupBy:對rdd中的資料按照key值進行分組,同一個key在輸出rdd裡面只存在一次,同一個key下的原rdd的多個value會被合併成一個集合物件Iterable,與key形成一個keyvalue從而形成一個kvrdd。groupby只對資料做一個分組操作,沒有進行實際的聚合計算,所以如果需求是做分組並聚合的話儘量避免使用groupby,使用kv rdd的其他更高效的方法來完成分組聚合計算。


排序
action:
first 取出rdd的第一條記錄
take 取出rdd中的前N條資料,它是按照rdd的原順序來取值

takeOrdered 取出rdd按照從小到大的順序排序後的前N條資料
top  取出rdd按照從大到校的順序排序後的前N條資料

transformation:
sortBy 是每個分割槽各自排序

快取
spark程式的優化手段
當一個rdd會被後續的很多計算進行使用的時候,把這個rdd放在快取中會提高程式的執行效率
cache:把資料快取在記憶體中
persist:接受StorageLevel級別來決定把資料快取在記憶體中或者快取在磁碟上。如果不傳引數就是快取在記憶體中

unpersist:把rdd從快取中踢出去

持久化
saveAsTextFile  儲存成文字檔案
saveAsObjectFile 儲存成物件檔案

重分割槽
資料在經過聚合或者過濾等計算之後,原rdd的各個分割槽中的資料有可能產生傾斜的情況,為了解決資料傾斜的情況,可以對計算後的rdd進行重分割槽。
coalesce:當原rdd的分割槽數小於分割槽後的rdd的分數的時候,可以使用coalesce從而避免分割槽過程中產生shuffle
repartition:不管重分割槽的數量怎麼發生變化,該方法都是會經過shuffle


集合計算
transformation:
union  並集
++
intersection 交集
subtract     減集
cartesian    笛卡爾乘積

zip   拉鍊操作
拉鍊操作要保持對應分割槽的元素數量要保持一致,不然會報錯。

檢查點:
checkpoint:
localCheckpoint:
getCheckpointFile
isCheckpointed


collect()

*PairRdd  kvrdd
pairrdd具有普通rdd的全部方法和功能。

kvrdd可以由普通的rdd對映而來,對映生成的rdd中的每一個元素是一個元組,這個元組只有兩個元素,這樣一個rdd就是一個pairrdd
val rdd = sc.parallize(List(1,2,3,4,5,6))
val kvrdd = rdd.map(x=>(x%2,x))

kv的rdd可以由sc載入資料檔案而來


對映方法:
mapValues    只對kv中的value進行對映,key保持不變。運算元的輸入是value,輸出是結果的value
flatMapValues

分組聚合函式:
transformation:
下面三個聚合函式的聚合都有兩個過程,1:partition內部聚合,2:partition之間的聚合
reduceByKey:根據key值對pairrdd的value進行分組聚合,不需要聚合的初始值,但是要求聚合的結果值必須要和kvrdd的value的型別保持一致。
reducebykey只接受一個運算元,這個運算元要同時被用於partition內部聚合和partition之間的聚合,所以,這個要求這個運算元必須滿足兩個輸入引數的地位要是對等關係。reducebykey值適合進行累加之類的計算

foldByKey:foldbykey除了比reducebykey多了一個初值之外,reducebykey的限制,它一樣具有

aggregateByKey:對分組聚合沒有太多的限制幾乎可以滿足所有的聚合
該方法接受三個引數,一個初始值和兩個運算元,
第一個運算元是用於分割槽內的聚合過程(會用到初始值)
第二個運算元是用於分割槽間的聚合(不會用到初始值)

combineByKey:對分組聚合沒有太多的限制幾乎可以滿足所有的聚合
該方法接受三個引數,三個引數都是運算元
第一個運算元用於初始值的計算過程.這個運算元是會作用於每一個分割槽的第一個元素。
第二個運算元用於分割槽內的聚合過程(要使用到第一個運算元執行結果)
第三個運算元是用於分割槽間的聚合

groupByKey:對資料按照key值進行合併,把一個rdd下某個key值相同的所有的value合併成一個集合物件,並和原key形成一個新的kv


countByKey:統計出相同的key的個數,和原key形成一個新的kv

cogroup:多個kvrdd之間的操作如
kvrddA.cogroup(kvrddB)
先對kvrddA和kvrddB進行groupbykey
keyA:Iterator
keyB:Iterator
join
key,(Iteratora,Iteratorb)
然後對兩個groupbykey後的結果進行join(fullOutterJoin)操作,這裡的連線是使用的全外連線。

cogroup=groupbykey + fulloutterjoin
groupWith:和cogroup功能用法完全一模一樣

資料集操作方法:
subtractByKey  減集

join              內連線   :兩個rdd根據key值相互過濾,只有能關聯上的資料才會在結果集中
leftOuterJoin     左外連線 :左邊的rdd是主rdd右邊的是輔rdd,用左邊rdd的key去過濾右邊rdd的資料,關聯上的在結果結果集裡面就是Some(輔表的資料),關聯不上的就是None

fullOuterJoin     全外連結
rightOuterJoin    右外連線

lookup:查詢某個key下的所有value

持久化操作方法:
saveAsNewAPIHadoopDataset:儲存資料到hbase的
saveAsNewAPIHadoopFile:以hadoopfile的輸出格式儲存檔案

spark完全支援hadoop的讀寫
val rdd1 = sc.parallelize(List((1,"張三"),(2,"李四"),(3,"王二")))
    rdd1.map(x=>(new IntWritable(x._1),new Text(x._2)))
      .saveAsNewAPIHadoopFile("file:///e:/usersparkoutput",classOf[IntWritable],classOf[Text]
        ,classOf[SequenceFileOutputFormat[IntWritable,Text]])


統計每個使用者,login次數,logout次數,view_user次數,new_tweet次數,每個ip上平均行為次數
   jim           44        33           22             33                   22

每個ip上平均行為次數 = 使用者行為次數/該使用者使用過的ip數量

維度
指標(KPI)

*共享變數
廣播變數:廣播變數是在driver上宣告,在executor上使用的一些變數資料
          廣播變數對executor來說是隻讀的
  使用步驟:1.在driver上宣告廣播變數
            2.在executor執行的運算元中使用這個廣播變數
  工作過程中,經常在兩張表關聯的時候使用廣播變數。
  spark上在對兩個rdd進行關聯的時候,如果一個rdd的資料量很小,一個rdd的資料量很大,可以使用廣播變數來把關聯過程變成map端關聯。把小表的資料變成廣播變數,廣播給每一個載入大表rdd的executor。
累加器:在driver上宣告,在executor進行累加奇數,
        累加器對executor來說是隻寫
  使用步驟:1.driver上宣告累加器
            2.在executor上對累加器進行累加資料處理
            3.在driver上讀取並得到累加結果

  當在工作過程中需要一些全域性計數型別的聚合的時候,可以考慮使用累加器來替代rddapi方法完成聚合。這種方式可以減少job的數量提高spark的執行效率。

*spark的部署模式
local模式:本地執行模式,一般開發除錯時使用該模式。
  driver和executor都在本地執行,它是通過多執行緒來模擬實現分散式的

standalon模式:把spark程式釋出到spark的叢集上去執行。
client模式:driver執行在客戶端,executor執行在叢集work節點上
cluster模式:driver和executor都執行在叢集的work節點上
cluster模式需要把spark程式的jar包放到hdfs上
spark-submit \
--class com.zhiyou.bd20.WordCount \
--master spark://centos1:7077 \
--deploy-mode cluster \
hdfs://master:9000/path/to/sparkjar


yarn模式:把spark程式釋出到yarn叢集上去執行。
clinet模式:driver執行在提交執行程式的客戶端,executor執行在叢集的nodemanager上
spark-submit \
--class com.zhiyou.bd20.WordCount \
--master yarn \
--deploy-mode cluster \
/path/to/jar

cluster模式:driver和executor都執行在叢集的nodemanager節點上


*spark程式讀寫資料庫(sql和nosql)
mysql:
讀:new JdbcRDD
寫:foreachPartition mapPartition
hbase:
讀:sc.newAPIHadoopRDD(configuration,classOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf[Result])
寫:兩種方式:
    1.oHBaseRdd.saveAsNewAPIHadoopDataset(job.getConfiguration)。這裡要保證rdd的value必須是Put型別或者是Delete型別
    2.和mysql一樣是用foreachPartition的方式,在運算元裡面用hbase的api把資料通過hbase服務儲存到hbase中

*java開發spark
1.建立專案引入依賴
2.構建sparkcontext物件
3.使用sparkcontext物件載入RDD
4.呼叫rdd的api方法,傳入運算元,完成對資料的處理邏輯過程
5.呼叫rdd的action方法觸發計算並讀取計算結果

hive
driver

hiveserver2 driver
beeline  squrrile


impala cloudera
presto teradata

hive--->hive run on spark
hive可以通過設定讓底層執行引擎直接由mapreduce換成spark
shark(sparksql 1.0)
更改了hive的執行引擎為spark,hive核心中的sql優化在自己專案裡面以spark為執行引擎的基礎上重構

sparksql2.x


*sparksql的開發步驟
1.構建SparkSession
引入sparksession的隱式轉換:
import spark.sql
import spark.implicits._
2.用sparksession載入資料來源Dataset DataFrame
把dataset或dataframe建立成檢視
3.對dataset使用sql來完成資料處理過程
4.對處理後的資料進行讀值觸發計算過程
5.關閉SparkSession

spark的配置引數:spark.sql.warehouse.dir用來指定給一個目錄來作為spark的warehouse目錄

*Dataset和DataFrame
DataFrame是Dataset的一個特例
DataFrame = Dataset[Row]

dataframe的row型別可以讓我們自定義每一個欄位的名稱
dataset的欄位的名稱是自動轉換而來

如果每條記錄的型別是一個case class的話,那轉換成dataset之後,模式欄位的名稱和型別會和case class的屬性的名稱和型別保持一致。

資料集每個元素是元組的--------》轉成dataframe
資料集每個元素是case class----------》轉成dataset

*sparksql資料載入和儲存
使用sparksession載入資料
使用dataset的方法儲存資料
讀:SparkSession.read 使用DataFrameReader的方法載入各種資料來源處的資料,可以很方便的載入各種格式的資料檔案,和資料庫。
寫:Dataset.write 使用DataFrameWriter的方法可以方便把資料儲存成各種格式和資料儲存位置。

select
where
找出personDataSet年齡大於5的資料
personDataSet.createOrReplaceTempView("person")
sql("select * from person where age>5")
-----------------------------
linq寫法
personDataSet.select("name","age").where("age>5")

*spark使用hive的metastore和hive進行資料互通
1.hive-site.xml引入專案
2.新增依賴spark-hive
3.在sparksession建立過程中呼叫builder的enableHiveSupport
如果報元資料版本不一致,把hive-site.xml的引數
hive.metastore.schema.verification=false


*流處理
流處理和批處理的區別
1.流處理對應實時資料處理業務場景,批處理對應歷史報表業務場景
2.處理速度上,流處理速度快,批處理速度慢;處理資料量上,流處理處理的資料量小,批處理處理的資料量大
  其實對流處理的框架來說處理的資料量的大小,而是使用吞吐量來衡量流處理框架的處理能力
3.批處理程式是使用定時排程來進行執行和終止的。
  流處理7X24執行,一旦執行起來,除非更新程式或意外情況,流處理不會被終止。
4.批處理啟動後只執行一次計算
  流處理啟動後可能會執行無數次計算

在資料處理上
流處理有時間的概念
批處理只有時間點的概念

*spark streaming的開發流程
1.構建StreamingContext,這個型別封裝了SparkContext
2.從資料來源處載入流資料,獲取DStream,它封裝了rdd
3.呼叫DStream的各種api方法完成流處理過程
4.啟動流計算

*spark streaming開發重要內容
1.獲取資料來源(kafka flume socket mq)
2.DStream的api方法


*時間間隔長度
 Duration、Minutes、Seconds


需求:
實時展現商品銷量的top5

1.流資料來源(nc -lk)
  訂單id 產品id 訂單產品數量 訂單金額
1       2     3             344
1       1     1             111
1       1     1             111
1       2     3             123
2.統計出每個產品的總銷量和每個產品的總訂單金額
  transformation:
  map運算元
  reducebykey運算元
  action:
  foreachpartition運算元
3.取出銷量的top5

mysql資料庫設計
create table order_stastic(
  product_id integer,
  account_num integer,    -- 累計銷量
  amount_num integer,     -- 累計總金額
  primary key(product_id)
)

資料每個微批次都更新到mysql的這張表
使用:

實時檢視銷量top5:
select * from order_stastic order by account_num desc limit 5
實時檢視銷售金額top5:
select * from order_stastic order by amount_num desc limit 5

作業
計算累計的wordcount
計算結果儲存在hbase中
 

*dstream的api
對映:
transformation:
flatMap
map
mapPartitions
filter

action:
foreachRDD

print 列印前10條資料

聚合函式:
reduce
count
countByValue


-------------------------------------------------
快取:
cache
persist
重分割槽:
repartition
持久化:
saveAsTextFiles


*kvdstream(pairdstream)的api
一個dstream只要符合如下條件就自動具有paridstream的所有方法:
dstream中每個元素是一個元組,每個元組中有兩個元素

pairdstream具有dstream的所有方法和功能
對映
mapValues
flatMapValues

聚合:
分組聚合
reduceByKey
combineByKey
groupByKey
cogroup,先groupby 再join

持久化:
saveAsNewAPIHadoopFiles

資料集合資料集的關聯操作:
join
leftOuterJoin
rightOuterJoin
fullOuterJoin


*dstream附加運算元

統計近10分鐘之內接受到的資料的wordcount累計值

視窗計算運算元
1.視窗的寬度和微批次時間長度一樣是個時間長度的概念,一個視窗包括若干個微批次時間長度
2.視窗寬度必須是微批次時間長度的整數倍
3.當計算的時間間隔需要超過微批次的時間間隔的時候,可以使用視窗滑動引數來進行設定計算的時間間隔。視窗滑動的寬度也必須是微批次時間長度的整數倍。(當視窗計算沒有滑動引數的時候,計算的時間間隔是微批次的時間,當視窗計算有滑動引數的時候,計算的時間間隔就是滑動引數的時間長度)
groupByKeyAndWindow
reduceByKeyAndWindow


countByValueAndWindow
countByWindow
reduceByWindow

統計近1分鐘個之內的累計wordcount

window:開窗運算元

累計計算運算元:
這種累計計算的運算元只有在pairdstream裡面才有,普通的dstream不具有累積計算的功能。
mapWithState

updateStateByKey