1. 程式人生 > >Spark 定製版:015~Spark Streaming原始碼解讀之No Receivers徹底思考

Spark 定製版:015~Spark Streaming原始碼解讀之No Receivers徹底思考

本講內容:

a. Direct Acess
b. Kafka

注:本講內容基於Spark 1.6.1版本(在2016年5月來說是Spark最新版本)講解。

上節回顧

上一講中,我們講Spark Streaming中一個非常重要的內容:State狀態管理

a. 為了說明state狀態管理,拿兩個非常具體非常有價值的方法updateStateByKey和mapWithState這兩個方法來說明sparkstreaming是如何實現對state狀態管理的。Sparkstreaming是按照batchduration劃分job的,但是有時我們想算過去一個小時或者過去一天的資料,在大於batchduration的時候對資料進行符合業務邏輯的操作,這時候不可避免的要進行狀態維護。Sparkstreaming每個batchduration都會產生一個job,job裡面都是RDD,所以現在面臨的一個問題就是,他每個batchduration產生RDD,怎麼對他的狀態進行維護的問題(像updateStateByKey)。例如計算一天的商品的點選量,這時候就需要類似於updateStateByKey或者mapWithState這樣的方法幫助完成核心的步驟

b. Spark 的狀態管理其實有很多函式,比較典型的有類似的UpdateStateByKey、MapWithState方法來完成核心的步驟

updateStateByKey和mapWithState這兩個方法在DStream中並不能找到。因為updateStateByKey和mapWithState這兩個方法都是針對key-value型別的資料進行操作,也就是pair型別的,和前邊講RDD是一樣的,RDD這個類本事並不會對key-value型別的資料進行操作,所以這時候就需要藉助scala的語法隱式轉換。隱式轉換一般放在類的伴生物件中,將DStream轉換成PairDStreamFunctions。這是從地獄中召喚出來的功能,使用後又回到地獄。執行機制就是找不到DStream的updateStateByKey和mapWithState,他們是PairDStreamFunctions的方法,就找隱式轉換,隱式轉換中發現toPairDStreamFunctions這個功能,就使用了implicit功能

最後我們附上程式碼執行流程圖:

這裡寫圖片描述

開講

上一講中主要是使用ReceiverInputDStream,是針對Receiver方式開展的剖析

本講我們之所以用一節課來講No Receivers,是因為企業級Spark Streaming應用程式開發中在越來越多的採用No Receivers的方式。No Receiver方式有自己的優勢,比如更大的控制的自由度、語義一致性等等。所以對No Receivers方式和Receiver方式都需要進一步研究、思考

Spark Streaming現在支援兩個方式,一種是Receivers的方式來接收資料的輸入或者對資料的控制,另一種是No Receivers的方式,也就是directAPI。其實No Receivers的方式是更符合讀取資料和操作資料的思路的,因為Spark是一個計算框架,作為一個計算框架底層會有資料來源,如果用No Receivers的方式直接操作資料來源中的資料,這是更自然的一種方式。如果要操作資料來源肯定要有一個封裝器,封裝器肯定是RDD型別的,所以SparkStreaming為了封裝資料推出了一個自定義的RDD叫KafkaRDD

從Kafka中消費資料的一種實現,首先要確定開始和結束的offset來保證exactly-once。

kafkaParams 中最關鍵的是metadata.broker.list,這個broker是kafka中的概念。就是SparkStreaming直接去操作kafka叢集,offsetRanges指的是哪一片資料是這個RDD的。Kafka傳資料的時候會進行編碼所以需要Decoder。直接從kafka中讀取資料需要自定義一個RDD,如果想從Hbase中直接讀資料也需要自定義RDD

所以Spark Streaming就產生了自定義RDD –> KafkaRDD

原始碼分析:

走進KafkaRDD開始探祕之旅

這裡寫圖片描述

KafkaRDD在繼承RDD的時候也繼承了HasOffsetOranges,這個是必須的,因為RDD天然的是a list of partitions,基於kafka直接訪問RDD時必須是HasOffsetRange型別,代表了來自kafka topicAndParttion,其實力被HasOffsetRange Create建立,從fromOffset到untilOffset ,

分散式傳輸Offset資料時必須序列化

這裡寫圖片描述

Offset是訊息偏移量,假設untilOffset是10萬,fromOffset是5萬,第10萬條訊息和5萬條訊息,一般處理資料規模大小是以資料條數為單位

建立一個offSetrange例項時可以確定從kafka叢集partition中讀取哪些topic,從foreachrdd中可以獲得當前rdd訪問的所有分割槽資料。Batch Duration中產生的rdd的分割槽資料,這個是對元資料的控制

再看getPartitions方法,offsetRanges指定了每個offsetrange從什麼位置開始到什麼位置結束

這裡寫圖片描述

看KafkaRDDPartition類,會從傳入的topic和partition及offset中獲取kafka資料

這裡寫圖片描述

Host port指定讀取資料來源的kfakf機器

看kafka rdd的compute計算每個資料分片,和rdd理念是一樣的,每次迭代操作獲取計算的rdd一部分

操作KafkaRDDIterator和操作rdd分片是一樣的,需要迭代資料分片

這裡寫圖片描述

關鍵的地方kafkaCluster物件時在kafkaUtils中直接建立了directStream,看下之前操作kafka程式碼發現傳入的引數是上下文、 broker.List.topic.list引數

構建時傳入topics為Set,當然可以直接指定ranges,他從kafka叢集直接建立了kafkaCluster和叢集進行互動,從fromOffset獲取資料具體的偏移量

這裡寫圖片描述

這裡寫圖片描述

如果不知道fromOffsets的話直接從配置中獲取fromOffsets,建立kafka DirectKafkaInputDStream的時候會從kafka叢集進行互動獲得partition、offset資訊,通過DirectKafkaInputDStream無論什麼情況最後都會建立DirectKafkaInputDStream

這裡寫圖片描述

DirectKafkaInputDStream會產生kafkaRDD,不同的topic partitions生成對應的的kafkarddpartitions,控制消費讀取速度。操作資料的時候是compute直接構建出kafka rdd,讀取kafka 上的資料。確定獲取讀取資料的期間就知道需要讀取多少條資料,然後構建kafkardd例項。Kafkardd的例項和DirectKafkaInputDStream是一一對應的,每次compute會產生一個kafkardd,其會包含很多partitions,有多少partition就是對應多少kafkapartition

看下KafkaRDDPartition就是一個簡單的資料結構

這裡寫圖片描述

總結:

而且KafkaRDDPartition只能屬於一個topic,不能讓partition跨多個topic,直接消費一個kafkatopic,topic不斷進來、資料不斷偏移,Offset代表kafka資料偏移量指標。

資料不斷流進kafka,batchDuration假如每十秒都會從配置的topic中消費資料,每次會消費一部分直到消費完,下一個batchDuration會再流進來的資料,又可以從頭開始讀或上一個資料的基礎上讀取資料。

Direct的方式相比Receivers的方式的優勢:

a. Direct的方式沒有快取,也就不用擔心出現記憶體溢位的問題。如果是Receivers的方式就存在快取

b. 如果是Receivers的方式,Receivers是和具體的worker繫結,Receivers的方式不方便做分散式,當然配置一下是可以做分散式的。Direct的方式預設資料就會在多個executor上

c. 資料消費的問題,我們在實際操作的時候,如果是Receivers的方式假如資料來不及處理,資料操作delazy之後,delazy多次的話,sparkstreaming程式就會崩潰。如果是Direct的方式就不存在這種情況

d. Direct的方式完全的語義一致性。不會重複消費,並且確保資料一定被消費。Direct的方式是和kafka進行互動,只有資料被真正的執行成功才會被記錄下來

e. Direct的方式比Receivers的方式速度快。對Direct的方式資料流進的速度的配置是對每個partition進行配置

f. Sparkstreaming還有一個配置是backpressure,這個引數可以試探一下資料流進來的速度和處理能力是否一致,如果處理不一致可以動態的進行調整,也就是資源的動態調整

生產環境下強烈建議採用direct方式讀取kafka資料

Kafka Direct方式的程式碼執行流程的原始碼和重要程式碼流程圖

這裡寫圖片描述

相關推薦

Spark 製版015~Spark Streaming原始碼解讀No Receivers徹底思考

本講內容: a. Direct Acess b. Kafka 注:本講內容基於Spark 1.6.1版本(在2016年5月來說是Spark最新版本)講解。 上節回顧 上一講中,我們講Spark Streaming中一個非常重要的內容:State狀態管理

第15課Spark Streaming原始碼解讀No Receivers徹底思考

背景:      目前No Receivers在企業中使用的越來越多。No Receivers具有更強的控制度,語義一致性。No Receivers是我們操作資料來源自然方式,操作資料來源使用一個封裝器,且是RDD型別的。所以Spark Streaming就產生了自定義R

Spark Streaming原始碼解讀No Receivers詳解

背景: 目前No Receivers在企業中使用的越來越多。No Receivers具有更強的控制度,語義一致性。No Receivers是我們操作資料來源自然方式,操作資料來源使用一個封裝器,且是RDD型別的。所以Spark Streaming就產生了自定義

Spark 製版013~Spark Streaming原始碼解讀Driver容錯安全性

本講內容: a. ReceiverBlockTracker容錯安全性 b. DStreamGraph和JobGenerator容錯安全性 注:本講內容基於Spark 1.6.1版本(在2016年5月來說是Spark最新版本)講解。 上節回顧 上一講中,

Spark 製版010~Spark Streaming原始碼解讀流資料不斷接收全生命週期徹底研究和思考

本講內容: a. 資料接收架構設計模式 b. 資料接收原始碼徹底研究 注:本講內容基於Spark 1.6.1版本(在2016年5月來說是Spark最新版本)講解。 上節回顧 上一講中,我們給大傢俱體分析了Receiver啟動的方式及其啟動設計帶來的多個

Spark 製版004~Spark Streaming事務處理徹底掌握

本講內容: a. Exactly Once b. 輸出不重複 注:本講內容基於Spark 1.6.1版本(在2016年5月來說是Spark最新版本)講解。 上節回顧: 上節課通過案例透視了Spark Streaming Job架構和執行機,並結合原

Spark定製班第9課Spark Streaming原始碼解讀Receiver在Driver的精妙實現全生命週期徹底研究和思考

本期內容: 1. Receiver啟動的方式設想 2. Receiver啟動原始碼徹底分析 1. Receiver啟動的方式設想   Spark Streaming是個執行在Spark Core上的應用程式。這個應用程式既要接收資料,還要處理資料,這些都是在分散式的

Spark Streaming原始碼解讀Receiver在Driver的精妙實現全生命週期徹底研究和思考

在Spark Streaming中對於ReceiverInputDStream來說,都是現實一個Receiver,用來接收資料。而Receiver可以有很多個,並且執行在不同的worker節點上。這些Receiver都是由ReceiverTracker來管理的。

Spark Streaming原始碼解讀資料清理內幕徹底解密

本篇部落格的主要目的是: 1. 理清楚Spark Streaming中資料清理的流程 組織思路如下: a) 背景 b) 如何研究Spark Streaming資料清理? c) 原始碼解析

Spark Streaming原始碼解讀Driver中的ReceiverTracker詳解

本篇博文的目標是: Driver的ReceiverTracker接收到資料之後,下一步對資料是如何進行管理 一:ReceiverTracker的架構設計 1. Driver在Executor啟動Receiver方式,每個Receiver都封裝成一個Tas

Spark Streaming原始碼解讀State管理updateStateByKey和mapWithState解密

源地址:http://blog.csdn.net/snail_gesture/article/details/5151058 背景:  整個Spark Streaming是按照Batch Duractions劃分Job的。但是很多時候我們需要算過去的一天甚

Spark製版2通過案例對SparkStreaming透徹理解三板斧

本節課主要從以下二個方面來解密SparkStreaming: 一、解密SparkStreaming執行機制 二、解密SparkStreaming架構 SparkStreaming執行時更像SparkCore上的應用程式,SparkStreaming程式啟動後會啟動很

Spark——Streaming原始碼解析容錯

此文是從思維導圖中匯出稍作調整後生成的,思維腦圖對程式碼瀏覽支援不是很好,為了更好閱讀體驗,文中涉及到的原始碼都是刪除掉不必要的程式碼後的虛擬碼,如需獲取更好閱讀體驗可下載腦圖配合閱讀: 此博文共分為四個部分: DAG定義 Job動態生成 資料的產生與匯入 容錯 ​ 策略 優點 缺點 (1) 熱備

Spark——Streaming原始碼解析資料的產生與匯入

此文是從思維導圖中匯出稍作調整後生成的,思維腦圖對程式碼瀏覽支援不是很好,為了更好閱讀體驗,文中涉及到的原始碼都是刪除掉不必要的程式碼後的虛擬碼,如需獲取更好閱讀體驗可下載腦圖配合閱讀: 此博文共分為四個部分: DAG定義 Job動態生成 資料的產生與匯入 容錯 資料的產生與匯入主要分為以下五個部分

Spark——Streaming原始碼解析DAG定義

此文是從思維導圖中匯出稍作調整後生成的,思維腦圖對程式碼瀏覽支援不是很好,為了更好閱讀體驗,文中涉及到的原始碼都是刪除掉不必要的程式碼後的虛擬碼,如需獲取更好閱讀體驗可下載腦圖配合閱讀: 此博文共分為四個部分: DAG定義 Job動態生成 資料的產生與匯入 容錯 1. DStream 1.1. RD

Spark運算元[10]foldByKey、fold 原始碼例項詳解

foldByKey與aggregateByKey,fold與aggregate用法相近,作用相似! foldByKey是aggregateByKey的簡化,fold是aggregate的簡化。

<spark> error啟動spark後查看進程,進程中master和worker進程沖突

告訴 若有 master 沖突 存在 查看進程 spark 但是 文件 啟動hadoop再啟動spark後jps,發現master進程和worker進程同時存在,調試了半天配置文件。 測試發現,當我關閉hadoop後 worker進程還是存在, 但是,當我再關閉spar

Spark On Yarn提交Spark應用程式到Yarn

Spark On Yarn模式配置非常簡單,只需要下載編譯好的Spark安裝包,在一臺帶有Hadoop Yarn客戶端的機器上解壓,簡單配置之後即可使用。 要把Spark應用程式提交到Yarn執行,首先需要配置HADOOP_CONF_DIR或者YARN_C

Spark學習筆記初識Spark

=。= // 將users中的vertex屬性新增到graph中,生成graph2 // 使用joinVertices操作,用user中的屬性替換圖中對應Id的屬性 // 先將圖中的頂點屬

Spark MLlib原始碼解讀樸素貝葉斯分類器,NaiveBayes

Spark MLlib 樸素貝葉斯NaiveBayes 原始碼分析 基本原理介紹 首先是基本的條件概率求解的公式。 P(A|B)=P(AB)P(B) 在現實生活中,我們經常會碰到已知一個條件概率,求得兩個時間交換後的概率的問題。也就是在已知P(A