Spark Streaming狀態管理函式(三)——MapWithState的使用(scala版)
MapWithState
關於mapWithState
需要自己寫一個匿名函式func來實現自己想要的功能。如果有初始化的值得需要,可以使用initialState(RDD)來初始化key的值。 另外,還可以指定timeout函式,該函式的作用是,如果一個key超過timeout設定的時間沒有更新值,那麼這個key將會失效。這個控制需要在func中實現,必須使用state.isTimingOut()來判斷失效的key值。如果在失效時間之後,這個key又有新的值了,則會重新計算。如果沒有使用isTimingOut,則會報錯。
注意事項
下面程式是使用idea編寫的,使用的是scala語言,在程式中master(“local[2]”)設定為本地模式([]中的數指定的是執行緒數,不能少於2,否則看不到結果。主要是因為spark需要啟動一個執行緒receiver來迴圈接收資料,一個Executor來接收資料,如果少於2執行緒不夠將不能打印出結果。),在window上執行的。使用的spark版本是2.3.0,在2.x以後的版本,基本採用SparkSession來進行操作。同時,想要執行程式你的伺服器上還必須要安裝netcat這個軟體,使用yum install nc進行安裝(注意安全配置好yum源,DNS才能下載安裝),使用命令nc -lk 6666開啟服務傳送資料。最後在執行程式前還需要匯入spark、scala相應的依賴包。
示例程式碼
package spark2x import org.apache.spark.sql.SparkSession import org.apache.spark.streaming.dstream.{DStream, MapWithStateDStream, ReceiverInputDStream} import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext} /** * 類名 MapWithState * 作者 彭三青 * 建立時間 2018-12-01 14:08 * 版本 1.0 * 描述: $ */ object MapWithState { // 設定本地執行模式 def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .master("local[2]") .appName("MapWithState") .getOrCreate() // 建立一個context,批次間隔為2秒鐘, val ssc: StreamingContext = new StreamingContext(spark.sparkContext, Seconds(3)) // 設定checkpoint目錄 ssc.checkpoint("hdfs://SC01:8020/user/tmp/cp-20181201-2") // 建立一個ReceiverInputDStream,從伺服器端的netcat接收資料。 // 伺服器主機名SC01(SC01已在Window上的hosts檔案中做了對映,沒做對映的則寫ip就OK了),監聽埠為6666 val line: ReceiverInputDStream[String] = ssc.socketTextStream("SC01", 6666) // 對接收到的資料進行處理,進行切割,分組形式為(day, 1) (word 1) val wordsStream: DStream[(String, Int)] = line.flatMap(_.split(" ")).map((_, 1)) val wordCount: MapWithStateDStream[String, Int, Int, Any] = wordsStream.mapWithState(StateSpec.function(func).timeout(Seconds(30))) // 列印 wordCount.print() // 提交 ssc.start() // ssc.awaitTermination() } /** * 定義一個函式,該函式有三個型別word: String, option: Option[Int], state: State[Int] * 其中word代表統計的單詞,option代表的是歷史資料,state代表的是返回的狀態 */ val func = (word: String, option: Option[Int], state: State[Int]) => { if(state.isTimingOut()){ println(word + "is timeout") }else{ // 獲取歷史資料,當前值加上上一個批次的該狀態的值 val sum = option.getOrElse(0) + state.getOption().getOrElse(0) // 單詞和該單詞出現的頻率 val wordFreq = (word, sum) // 更新狀態 state.update(sum) wordFreq } } }
執行
伺服器執行nc
idea端執行編寫好的程式
伺服器傳送資料
控制檯顯示結果
結論
mapWithState它會按照時間線在每一個批次間隔返回之前的發生改變的或者新的key的狀態,不發生變化的不返回。同時mapWithState可以不用設定checkpoint,返回的資料量少,效能和效率都比mapWithState好。
第一篇:Spark Streaming狀態管理函式(一)——updateStateByKey和mapWithState
第二篇:Spark Streaming狀態管理函式(二)——updateStateByKey的使用(scala版)