1. 程式人生 > >Spark的那些事(一)一文了解spark

Spark的那些事(一)一文了解spark

Spark是一個快速的叢集化的實時計算系統。支援Java, Scala, Python 和R語言的高階API。

一 Spark生態:
1111.png
支援Spark Sql用於sql和結構化資料查詢處理;支援MLlib用於機器學習;支援GraphX用於圖形處理;支援Spark Streaming和Structured Sql(spark2.1.1版本釋出)用於實時計算。(其中,我們使用的Spark功能主要是Spark Sql和Structured Sql。其中Spark sql用於查詢模組,可以聯合多個數據源進行查詢。Structured Sql用於流式資料處理。)

部署方式有:
1、本地執行模式:new SparkConf().setAppName(“sparkName”)
                 .setMaster(config.getString(“local[*]”))) 

2、Stanalone模式:
              1)由master/slaves服務組成的
      2)各個節點上的資源被抽象成粗粒度的slot,有多少slot就能同時執行多少task。 
              3)部署時通過spark-env.sh和slave配置檔案進行配置,使用start-all.sh可以一鍵啟動。


3、EC2模式:
      部署於雲端。


4、Spark on Mesos模式:
 支援粗粒度模式和細粒度模式。
1)粗粒度模式:應用程式的各個任務正式執行之前,需要將執行環境中的資源全部申請好,且執行過    程中要一直佔用這些資源,即使不用,最後程式執行結束後,回收這些資源。比如你提交應用程式時,指定使用5個executor執行你的應用程式,每個executor佔用5GB記憶體和5個CPU,每個executor內部設定了5個slot,則Mesos需要先為executor分配資源並啟動它們,之後開始排程任務。另外,在程式執行過程中,mesos的master和slave並不知道executor內部各個task的執行情況,executor直接將任務狀態通過內部的通訊機制彙報給Driver,從一定程度上可以認為,每個應用程式利用mesos搭建了一個虛擬叢集自己使用。 
2)細粒度模式:鑑於粗粒度模式會造成大量資源浪費,Spark On Mesos還提供了另外一種排程模式:細粒度模式,這種模式類似於現在的雲端計算,思想是按需分配。與粗粒度模式一樣,應用程式啟動時,先會啟動executor,但每個executor佔用資源僅僅是自己執行所需的資源,不需要考慮將來要執行的任務,之後,mesos會為每個executor動態分配資源,每分配一些,便可以執行一個新任務,單個Task執行完之後可以馬上釋放對應的資源。每個Task會彙報狀態給Mesos slave和Mesos Master,便於更加細粒度管理和容錯,這種排程模式類似於MapReduce排程模式,每個Task完全獨立,優點是便於資源控制和隔離,但缺點也很明顯,短作業執行延遲大。

5、Spark on yarn模式:
支援粗粒度模式,只要用yarn的resource manage進行排程管理。(目前選擇的是該模式)
(細粒度模式尚未實現 https://issues.apache.org/jira/browse/YARN-1197)

整合性:
Spark可以很好的整合HDFS,HBase,Elatatic Search,kudu等儲存系統,mysql等關係性資料庫和json csv等靜態檔案處理。

二、Spark基本架構:
image.png

1)Cluster Manager:在standalone模式中即為Master主節點,控制整個叢集,監控worker。在YARN模式中為資源管理器
2)Worker節點:從節點,負責控制計算節點,啟動Executor或者Driver。
3)Driver: 執行Application 的main()函式
4)Executor:執行器,是為某個Application執行在worker node上的一個程序

三、執行流程
1)建立Spark context
2)Spark context向Cluster manager申請執行Executor資源,並啟動StandaloneExecutorbackend
3)Executor向SparkContext申請Task
4)SparkContext將應用程式分發給Executor
5)SparkContext構建成DAG圖,將DAG圖分解成Stage、將Taskset傳送給Task Scheduler,最後由Task Scheduler將Task傳送給Executor執行
6)Task在Executor上執行,執行完釋放所有資源

四、Cluster模式和client模式

yarn-cluster模式下,driver執行在AM(Application Master)中,它負責向YARN申請資源,並監督作業的執行狀況。當用戶提交了作業之後,就可以關掉Client,作業會繼續在YARN上執行。yarn-cluster模式不適合執行互動型別的作業。
image2.png

Yarn-client模式下,Application Master僅僅向YARN請求executor,client會和請求的container通訊來排程他們工作。

image3.png

五、Spark sql

Spark sql應用於查詢模組。

以CSV檔案為例,前端查詢為select * from CSV.test
1)通過CSV.test查詢資料庫獲取對應的csv檔案儲存路徑path。
2)spark讀取path對應的hdfs檔案生成dataset
3)dataset.createTempView()生成臨時表testTable
4)spark執行sql,select * from testTable並返回結果

六、Structured Streaming
Spark2.0中提出一個概念,continuous applications(連續應用程式)。
Spark Streaming等流式處理引擎致力於流式資料的運算,比如通過map執行一個方法來改變流中的每一條記錄,通過reduce可以基於時間做資料聚合。但是,事實上很少有隻在流式資料上做運算的需求,相對的,流式處理往往是一個大型應用的一部分。continuous applications提出後,實時運算作為一部分,不同系統間的互動等也可以由Structured Streaming來處理。如下圖,左側為Spark Streaming類的流式引擎,互動是由使用者來處理;右側為Strctured Streaming類的連續應用,互動由應用來處理。(https://databricks.com/blog/2016/07/28/continuous-applications-evolving-streaming-in-apache-spark-2-0.html

image4.png

Structured Streaming是一個建立在Spark sql引擎上的可擴充套件高容錯的流式處理引擎。它使得可以像對靜態資料進行批量處理一樣來處理流式資料。Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.
Structured Streaming抽象了一個DataSet中無邊界的表。structured streaming將流資料看作是一張沒有邊界的表,流資料不斷的向表尾增加資料
image5.png

在每一個週期(預設1s),新的內容將會增加到表尾,查詢的結果將會更新到結果表中。一旦結果表被更新,就需要將改變後的表內容輸出到外部的sink中。

如Kafka—etl—es的過程,spark每秒鐘從source—kafka讀取一批資料,寫入無邊界表中,通過dataset的spark sql操作進行ETL轉換,更新result表,隨著result表更新,變化的result行將被寫入外部sink—es。

source型別:File source,Kafka source Socket source
sink型別:File sink,Foreach sink,Console sink,Memory sink,其中es sink是由Elastatic search擴充套件的。
輸出模式:
Complete mode: 不刪除任何資料,在 Result Table 中保留所有資料,每次觸發操作輸出所有視窗資料;
Append mode: 當確定不會更新視窗時,將會輸出該視窗的資料並刪除,保證每個視窗的資料只會輸出一次;
Updated mode: 刪除不再更新的時間視窗,每次觸發聚合操作時,輸出更新的視窗。

聚合:輸出模式必須是Append或Updated。sink為es時只能是Append。

Event time:時間發生時間,來源於source資料中的時間列。
Watermark:資料過期時間。