1. 程式人生 > 其它 >程式設計指南_Spark Streaming程式設計指南(三)

程式設計指南_Spark Streaming程式設計指南(三)

技術標籤:程式設計指南

點選上方“藍字”關注我

基本概念

DataFrame和SQL操作

您可以輕鬆地對流資料使用DataFrames和SQL進行操作。您必須使用StreamingContext使用的SparkContext建立一個SparkSession物件。此外,必須這樣做,以便可以在驅動程式發生故障時重新啟動。這是通過建立一個延遲例項化的SparkSession單例例項來完成的。在下面的示例中顯示。它修改了前面的單詞計數示例,以使用DataFrames和SQL生成單詞計數。每個RDD都轉換為一個DataFrame,註冊為臨時表,然後使用SQL查詢。

/**DataFrameoperationsinsideyourstreamingprogram*/

valwords:DStream[String]=...

words.foreachRDD{rdd=>

//GetthesingletoninstanceofSparkSession
valspark=SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
importspark.implicits._

//ConvertRDD[String]toDataFrame
valwordsDataFrame=rdd.toDF("word")

//Createatemporaryview
wordsDataFrame.createOrReplaceTempView("words")

//DowordcountonDataFrameusingSQLandprintit
valwordCountsDataFrame=
spark.sql("selectword,count(*)astotalfromwordsgroupbyword")
wordCountsDataFrame.show()
}

檢視完整的原始碼。

您還可以在來自不同執行緒的流資料上定義的表上執行SQL查詢(即與正在執行的StreamingContext非同步)。只要確保您將StreamingContext設定為記住足夠的流資料即可執行查詢。否則,不知道任何非同步SQL查詢的StreamingContext將在查詢完成之前刪除舊的流資料。例如,如果您要查詢最後一批,但是查詢可能需要5分鐘才能執行,請呼叫streamingContext.remember(Minutes(5))

(使用Scala或其他語言的等效語言)。

請參閱DataFrames和SQL指南以瞭解有關DataFrames的更多資訊。

MLlib操作

您還可以輕鬆使用MLlib提供的機器學習演算法。首先,有流式機器學習演算法(例如,流線性迴歸,流KMeans等),它們可以同時從流資料中學習並將模型應用於流資料。除此之外,對於更大範圍的機器學習演算法,您可以離線學習學習模型(即使用歷史資料),然後線上將模型應用於流資料。有關更多詳細資訊,請參見MLlib指南。

快取/持久化

與RDD類似,DStream也允許開發者將流資料持久化在記憶體中。也就是說,在DStream上使用persist()方法將自動將該DStream的每個RDD持久化在記憶體中。如果DStream中的資料將被多次計算(例如,對同一資料進行多次操作),這將非常有用。對於基於視窗的操作(如reduceByWindow和reduceByKeyAndWindow)以及基於狀態的操作(如updateStateByKey),這是隱式進行了持久化。因此,由基於視窗的操作生成的DStream會自動儲存在記憶體中,而無需開發人員呼叫persist()方法。

對於通過網路接收資料的輸入流(例如Kafka,Flume,套接字等),預設的持久化級別設定為將資料複製到兩個節點以實現容錯。

注意,與RDD不同,DStream的預設持化級別是將資料序列化在記憶體中。效能調優部分將對此進行進一步討論。有關不同持久化級別的更多資訊,請參見《 Spark程式設計指南》。

檢查點

流式應用程式必須7*24小時執行,因此必須對與應用程式邏輯無關的故障(例如系統故障,JVM崩潰等)具有彈性。為此,Spark Streaming需要向容錯儲存系統檢查足夠的資訊,以便可以從故障中恢復。有兩種型別資料的檢查點。

  • 元資料檢查點-將定義流計算的資訊儲存到HDFS等容錯儲存系統中。這用於從執行流應用程式的驅動程式的節點的故障中恢復(稍後詳細討論)。元資料包括:
    • 配置-用於建立流應用程式的配置。
    • DStream操作-定義流應用程式的DStream操作集。
    • 不完整的批次-作業對列尚未完成的批次。
  • 資料檢查點-將生成的RDD儲存到可靠的儲存中。在某些狀態轉換中對資料進行檢查點是有必要的,這些轉換將多個批次中的資料合併在一起。在這種轉換中,生成的RDD依賴於先前批次的RDD,這導致依賴項鍊的長度隨著時間而不斷增加。為了避免恢復時間的無限增加(與依賴關係成正比)。狀態轉換的中間RDD定期檢查點到可靠的儲存系統中(例如HDFS),以切斷依賴鏈。

總而言之,從驅動程式故障中恢復時,主要需要元資料檢查點,而如果使用有狀態轉換,則即使是基本功能,也需要資料或RDD檢查點。

何時啟用檢查點

必須為具有以下任何要求的應用程式啟用檢查點:

  • 有狀態轉換的用法 - 如果在應用程式中使用了updateStateByKey或reduceByKeyAndWindow(帶有反函式),則必須提供檢查點目錄以允許定期的RDD檢查點。
  • 從執行應用程式的驅動程式故障中恢復-元資料檢查點用於恢復的進度資訊。

請注意,沒有上述狀態轉換的簡單流應用程式可以在不啟用檢查點的情況下執行。在這種情況下,從驅動程式故障中恢復也將是部分的(某些已接收但未處理的資料可能會丟失)。這通常是可以接受的,並且許多都以這種方式執行Spark Streaming應用程式。預計將來會改善對非Hadoop環境的支援。

如何配置檢查點

可以通過在容錯,可靠的檔案系統(例如HDFS,S3等)中設定目錄來啟用檢查點,將檢查點資訊儲存到該目錄中。這是通過使用streamingContext.checkpoint(checkpointDirectory)完成的。這將允許您使用上述有狀態轉換。此外,如果要使應用程式從驅動程式故障中恢復,則應重寫流應用程式以具有以下行為。

  • 程式首次啟動時,它將建立一個新的StreamingContext,設定所有流,然後呼叫start()
  • 失敗後重新啟動程式時,它將根據檢查點目錄中的檢查點資料重新建立StreamingContext。

通過使用StreamingContext.getOrCreate,此行為變得很簡單。用法如下:

//FunctiontocreateandsetupanewStreamingContext
deffunctionToCreateContext():StreamingContext={
valssc=newStreamingContext(...)//newcontext
vallines=ssc.socketTextStream(...)//createDStreams
...
ssc.checkpoint(checkpointDirectory)//setcheckpointdirectory
ssc
}

//GetStreamingContextfromcheckpointdataorcreateanewone
valcontext=StreamingContext.getOrCreate(checkpointDirectory,functionToCreateContext_)

//Doadditionalsetuponcontextthatneedstobedone,
//irrespectiveofwhetheritisbeingstartedorrestarted
context....

//Startthecontext
context.start()
context.awaitTermination()

如果checkpointDirectory目錄存在,則將根據檢查點資料重新建立上下文。如果該目錄不存在(即首次執行),則將呼叫函式functionToCreateContext來建立新上下文並設定DStreams。請參閱Scala示例RecoverableNetworkWordCount。本示例將網路資料的字數追加到檔案中。

除了使用getOrCreate之外,還需要確保驅動程式程序在發生故障時自動重新啟動。這隻能由用於執行應用程式的部署基礎結構來完成。這將在“部署”部分中進一步討論。

注意,RDD的檢查點會導致儲存到可靠儲存系統的成本。這可能會導致RDD獲得檢查點的那些批次的處理時間增加。因此,需要謹慎設定檢查點的間隔時間。在小批次(例如間隔時間為1秒)時,每個批次的檢查點可能會大大降低操作吞吐量。相反,檢查點太少會導致血統和任務規模增加,這可能會產生不利影響。對於需要RDD檢查點的有狀態轉換,預設間隔為批處理間隔的倍數,至少應為10秒。可以使用dstream.checkpoint(checkpointInterval)進行設定。通常,DStream的5-10個滑動間隔的檢查點間隔是一個很好的嘗試設定。

累加器,廣播變數和檢查點

無法從Spark Streaming中的檢查點恢復累加器和廣播變數。如果啟用檢查點並同時使用Accumulators或Broadcast變數,則必須為Accumulators和Broadcast變數建立延遲例項化的單例例項,以便在驅動程式故障重啟後可以重新例項化它們。如下面的示例中所示:

objectWordBlacklist{

@volatileprivatevarinstance:Broadcast[Seq[String]]=null

defgetInstance(sc:SparkContext):Broadcast[Seq[String]]={
if(instance==null){
synchronized{
if(instance==null){
valwordBlacklist=Seq("a","b","c")
instance=sc.broadcast(wordBlacklist)
}
}
}
instance
}
}

objectDroppedWordsCounter{

@volatileprivatevarinstance:LongAccumulator=null

defgetInstance(sc:SparkContext):LongAccumulator={
if(instance==null){
synchronized{
if(instance==null){
instance=sc.longAccumulator("WordsInBlacklistCounter")
}
}
}
instance
}
}

wordCounts.foreachRDD{(rdd:RDD[(String,Int)],time:Time)=>
//GetorregistertheblacklistBroadcast
valblacklist=WordBlacklist.getInstance(rdd.sparkContext)
//GetorregisterthedroppedWordsCounterAccumulator
valdroppedWordsCounter=DroppedWordsCounter.getInstance(rdd.sparkContext)
//UseblacklisttodropwordsandusedroppedWordsCountertocountthem
valcounts=rdd.filter{case(word,count)=>
if(blacklist.value.contains(word)){
droppedWordsCounter.add(count)
false
}else{
true
}
}.collect().mkString("[",",","]")
valoutput="Countsattime"+time+""+counts
})

檢視完整的原始碼。

部署應用程式

本章節將討論部署Spark Streaming應用程式的步驟。

要求

要執行Spark Streaming應用程式,您需要具備以下條件。

  • 使用叢集管理器管理的叢集 - 這是任何Spark應用程式的基本要求,並且在部署指南中進行了詳細討論。
  • 將應用程式打成JAR包 - 您必須將流式應用程式編譯成JAR包。如果您正在使用spark-submit來啟動應用程式,則無需提供Spark和Spark Streaming的JAR包。但是,如果您的應用程式使用高階輸入源(例如Kafka,Flume),則必須將它們新增額外的依賴及其依賴項打包在用於部署應用程式的JAR中。例如,使用KafkaUtils的應用程式必須在應用程式JAR中包含spark-streaming-kafka-0-8_2.11及其所有相關的依賴項。
  • 為執行程式配置足夠的記憶體-由於接收到的資料必須儲存在記憶體中,因此必須為執行程式配置足夠的記憶體來儲存接收到的資料。請注意,如果您要執行10分鐘的視窗操作,則系統必須在記憶體中至少保留最後10分鐘的資料。因此,應用程式的記憶體的大小要求取決於其中使用的操作。
  • 配置檢查點-如果流應用程式需要配置檢查點,則必須將Hadoop API相容的容錯儲存中的目錄(例如HDFS,S3等)配置為檢查點目錄,並且以這樣的方式編寫流應用程式:用於故障恢復。有關更多詳細資訊,請參見檢查點部分。
  • 配置應用程式驅動程式的自動重啟-要從驅動程式故障中自動恢復,用於執行流式應用程式的部署基礎結構必須監控驅動程式程序,並在驅動程式失敗時重新啟動。不同的叢集管理器具有不同的工具來實現:
    • Spark Standalone-可以提交Spark應用程式驅動程式以Spark Standalone叢集模式執行(請參閱叢集部署模式),即應用程式驅動程式本身在worker節點之一上執行。此外,可以指示獨立叢集管理器監控驅動程式,並在驅動程式由於非零退出程式碼或由於執行該驅動程式的節點故障而失敗時重新啟動它。有關更多詳細資訊,請參見Spark Standalone指南中的叢集模式和監控。
    • YARN-Yarn支援自動重啟應用程式的類似機制。請參閱YARN文件以獲取更多詳細資訊。
    • Mesos-Marathon已經用Mesos來實現這一功能。
  • 配置預寫日誌-自Spark 1.2起,我們引入了預寫日誌以實現強大的容錯保證。如果啟用,則將從接收器接收的所有資料寫入配置檢查點目錄中的預寫日誌中。這樣可以防止驅動程式恢復時丟失資料,從而確保零資料丟失(在“容錯語義”部分中進行了詳細討論 )。這可以通過設定來啟用配置引數spark.streaming.receiver.writeAheadLog.enabletrue。但是,這些更強的語義可能以單個接收器的接收吞吐量為代價。可以通過並行執行更多接收器來糾正此問題增加總吞吐量。另外,由於啟用了預寫日誌,因此建議禁用Spark中接收到的資料的複製,因為該日誌已儲存在複製的儲存系統中。可以通過將輸入流的儲存級別設定為來完成此操作StorageLevel.MEMORY_AND_DISK_SER。在使用S3(或任何不支援重新整理的檔案系統)作為預寫日誌時,請記住啟用 spark.streaming.driver.writeAheadLog.closeFileAfterWritespark.streaming.receiver.writeAheadLog.closeFileAfterWrite。有關更多詳細資訊,請參見Spark Streaming配置。請注意,啟用I/O加密後,Spark不會加密寫入預寫日誌的資料。如果需要對預寫日誌資料進行加密,則應將其儲存在本機支援加密的檔案系統中。
  • 設定最大接收速率-如果群集資源不足以使流式應用程式以最快的速度處理資料,則可以通過設定記錄/秒的最大速率限制來限制接收器的速率。請參閱接收器和 Direct Kafka方法的配置引數。在Spark 1.5中,我們引入了一項稱為背壓的功能,該功能消除了設定此速率限制的需要,因為Spark Streaming會自動計算出速率限制,並在處理條件發生變化時動態調整它們。這個背壓可以通過設定來啟用配置引數spark.streaming.backpressure.enabledtrue

升級應用程式程式碼

如果需要使用新的應用程式程式碼升級正在執行的Spark Streaming應用程式,則有兩種可能的機制。

  • 升級後的Spark Streaming應用程式將啟動,並與現有應用程式並行執行。一旦新的(接收的資料與舊的資料相同)已經準備好並且準備好合適的時間,則可以將舊的資料降下來。請注意,對於支援將資料傳送到兩個目標的資料來源(即較早的和升級的應用程式),可以這樣做。
  • 優雅停止現有應用程式(有關優雅停止選項,請參見StreamingContext.stop(...)JavaStreamingContext.stop(...)),以確保在關閉之前已完全處理已接收的資料。然後可以啟動升級的應用程式,它將從較早應用程式停止的同一點開始處理。請注意,只有使用支援在輸入源端緩衝的輸入源(例如Kafka和Flume)才能完成此操作,因為在上一個應用程式關閉且升級後的應用程式尚未啟動時需要緩衝資料。並且無法從升級前程式碼的較早檢查點資訊重新啟動。檢查點資訊本質上包含序列化的Scala/Java/Python物件,嘗試使用經過修改的新類反序列化物件可能會導致錯誤。在這種情況下,要麼使用其他檢查點目錄啟動升級的應用程式,要麼刪除先前的檢查點目錄。

監控應用程式

除了Spark的監控功能外,Spark Streaming還具有其他特定功能。使用StreamingContext時,Spark Web UI會顯示一個附加的Streaming選項卡,其中顯示有關正在執行的接收器(接收器是否處於活動狀態,接收到的記錄數,接收器錯誤等)和已完成的批處理(批處理時間,排隊延遲等)的統計資訊 )。這可用於監視流應用程式的進度。

Web UI中的以下兩個指標特別重要:

  • 處理時間-處理每批資料的時間

  • 排程延遲-批處理在佇列中等待先前批處理完成的時間

如果批處理時間始終大於批處理時間間隔或排隊延遲持續增加,則表明系統無法像生成批處理一樣快處理批處理,並且處於落後。在這種情況下,請考慮減少批處理時間。

還可以使用StreamingListener介面監控Spark Streaming程式的進度,該介面可讓您獲取接收器狀態和處理時間。請注意,這是一個開發人員API,將來可能會得到改進(即報告了更多資訊)。

b55034a425159ddb7083e08d30bd9f95.png 378afc8c90fe4762531331612db4c39a.png 公眾號ID:ldc11235 掃碼關注最新動態,跟我一起學大資料 a8fe944af2e99f0a7da29f71d1675d13.gif