一篇文章學會spark-streaming
1.什麼是spark-streaming?
實際生產中會有許多應用到實時處理的場景,比如:實時監測頁面點選,實時監測系統異常,實時監測來自於外部的攻擊。針對這些場景,twitter研發了實時資料處理工具storm,並在後來開源。spark針對這些場景設計了spark-streaming實時計算模型,它允許使用者使用一系列批處理的API去處理實時資料,能做到程式碼邏輯的重複使用。
和spark中的rdd非常相似,spark-streaming中使用離散化流(discretized stream)作為抽象的表示,叫做DStream。它是隨時間推移而收集資料的序列,每個時間段收集到的資料在DStream內部以一個RDD的形式存在。DStream支援從kafka,flume,hdfs,s3等獲取輸入。DStream也支援兩種操作,即轉化操作和輸出操作(區別於RDD中的行動操作)。轉化操作又分為無狀態的轉化操作和有狀態的轉化操作,無狀態的轉化操作有map,filter,flatmap,repartition等,是針對單個時間區間內的操作。而有狀態的轉化操作可以針對不同的時間區間,後面詳述。
2.兩個簡單的例子
2.1 監聽socket獲取資料,程式碼如下:
這裡使用nc -lk 9999 在ip為10.121.33.44的機器上傳送訊息
scala 17行
object SocketStream { def main(args: Array[String]): Unit = { //本地測試,設定4核 val conf = new SparkConf().setMaster("local[4]").setAppName("streaming") //以10秒為一個批次 val ssc = new StreamingContext(conf,Seconds(10)) //接收訊息 val dstream = ssc.socketTextStream("10.121.33.44",9999,StorageLevel.MEMORY_AND_DISK_SER) //監測關鍵字error,出現則print dstream.filter(_.contains("error")).foreachRDD(rdd=>{ rdd.foreach(println(_)) }) ssc.start() ssc.awaitTermination() } }
2.2 從kafka讀取資料,比較常用
scala 31行
object KafkaStream { def main(args: Array[String]): Unit = { //本地測試,設定4核 val conf = new SparkConf().setMaster("local[4]").setAppName("streaming") //以10秒為一個批次 val ssc = new StreamingContext(conf,Seconds(10)) val zkQuorum = "10.22.33.44:6688,10.22.33.45:6688/kafka_cluster" val group_id = "realtime_data" //kafka相關引數 val kafka_param = Map[String,String]( "zookeeper.connect" ->zkQuorum, "group.id" -> group_id, "zookeeper.connection.timeout.ms" -> "10000", "fetch.message.max.bytes" -> "10485760" ) val topic = Map[String,Int]("test_topic" -> 16) //接收訊息 val dstream = KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](ssc,kafka_param,topic,StorageLevel.MEMORY_AND_DISK_SER).map(_._2) //監測關鍵字error,出現則print dstream.filter(_.contains("error")).foreachRDD(rdd=>{ rdd.foreach(println(_)) }) ssc.start() ssc.awaitTermination() } }
3.再來談架構
通過上面兩個例子,你可能對spark-streaming有了初步的瞭解,我們再來看一下它的架構。
Spark-streaming使用"微批次"的架構,把流式計算當做一系列微型的批處理操作來對待,每個時間段都產生一個RDD。如圖:
作用於一個DStream上的無狀態轉化操作會對它其中的每個RDD生效,如針對一個輸入為語句的DStream做flatMap操作的示意圖如下:
4.轉化操作
4.1 無狀態的轉化操作。
無狀態轉化操作就是簡單的將轉化作用於DStream的每個RDD上面。下面列舉了一些常見的轉化操作,其中最後一個transform表示可以試用自定義的轉化函式,儘管它前面已經提供了很多現成的API。
4.2有狀態的轉化操作。
有狀態的轉化操作是跨時間段的資料操作,一些先前的批次也被用來在新的批次中做計算。主要有滑動視窗和updateStateByKey。前者以一個時間段為滑動視窗進行操作,後者則用來跟蹤每個鍵的狀態變化。有狀態的轉化操作需要開啟檢查點機制來保證容錯性。即:給ssc.checkpoint()設定一個檢查點目錄。
(1)基於視窗的轉化操作會在一個比ssc設定的更長的時間段內,通過整合多個批次的,計算出整個大的時間視窗的結果。基於視窗的操作需要兩個引數,一個是視窗時長,一個是滑動步長。這兩個引數是ssc設定的時長的整數倍。下面的圖表示了一個時間視窗為3,滑動步長為2的視窗轉化操作。
前面提到的監測關鍵字error的例子,現在需要每隔20s就對前面30s有error的日誌記錄做計數,程式碼如下:
scala 34行
object KafkaStream {
def main(args: Array[String]): Unit = {
//本地測試,設定4核
val conf = new SparkConf().setMaster("local[4]").setAppName("streaming")
//以10秒為一個批次
val ssc = new StreamingContext(conf,Seconds(10))
val zkQuorum = "10.22.33.44:6688,10.22.33.45:6688/kafka_cluster"
val group_id = "realtime_data"
//kafka相關引數
val kafka_param = Map[String,String](
"zookeeper.connect" ->zkQuorum,
"group.id" -> group_id,
"zookeeper.connection.timeout.ms" -> "10000",
"fetch.message.max.bytes" -> "10485760"
)
val topic = Map[String,Int]("test_topic" -> 16)
//接收訊息
val dstream = KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](ssc,kafka_param,topic,StorageLevel.MEMORY_AND_DISK_SER)
.map(_._2)
//每隔20s對前30s出現error的日誌做計數
val errors = dstream.window(Seconds(30),Seconds(20))
.filter(_.contains("error"))
.count()
errors.foreachRDD(rdd=>{
rdd.foreach(println(_))
})
ssc.start()
ssc.awaitTermination()
}
}
(2)updateStateByKey
updateStateByKey能對鍵值對的資料進行不同批次間的資料計算,使用updateStateByKey,需要傳入一個update函式,這個函式接收某個key最新批次對應的values,以及該key之前對應的value,按照自定義的邏輯返回一個新的value。如需要計算一個實時日誌中http響應碼的計數,程式碼如下:
scala 39行
object KafkaStream {
def main(args: Array[String]): Unit = {
//輸出目錄
val output = args(0)
//本地測試,設定4核
val conf = new SparkConf().setMaster("local[4]").setAppName("streaming")
//以10秒為一個批次
val ssc = new StreamingContext(conf,Seconds(10))
val zkQuorum = "10.22.33.44:6688,10.22.33.45:6688/kafka_cluster"
val group_id = "realtime_data"
//kafka相關引數
val kafka_param = Map[String,String](
"zookeeper.connect" ->zkQuorum,
"group.id" -> group_id,
"zookeeper.connection.timeout.ms" -> "10000",
"fetch.message.max.bytes" -> "10485760"
)
val topic = Map[String,Int]("test_topic" -> 16)
//接收訊息
val dstream = KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](ssc,kafka_param,topic,StorageLevel.MEMORY_AND_DISK_SER).map(_._2)
val rdd = dstream.map(_.split("\001"))
.map(x=>(x(0),x(1).toLong))
.updateStateByKey(update)
//輸出
rdd.foreachRDD(_.saveAsTextFile(output))
ssc.start()
ssc.awaitTermination()
}
//update函式
def update(new_values:Seq[Long],old_value:Option[Long]):Option[Long]={
val current_num = new_values.size
val result_num = current_num + old_value.getOrElse(0L)
Some(result_num)
}
}
(3)所有有狀態轉化操作
5.輸出操作
輸出操作比較簡單,有以下幾種:
6.作業穩定性
spark-streaming作業一般都要全天候不間斷執行,那麼作業的穩定性如何保證?主要有以下幾點:
6.1 檢查點機制。
其原理就是階段性的將作業執行的資料存放到儲存系統,如hdfs,s3等。當作業執行出現異常時可以從上述資料中恢復。
6.2 驅動器容錯。
在建立實時計算作業的上下文時使用getOrCreate函式。程式碼如下:
scala 7行
val ssc = StreamingContext.getOrCreate(cp_dir,createContext )
def createContext(): StreamingContext ={
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc,Seconds(10))
ssc.checkpoint(cp_dir)
}
更多文章請關注微信公眾號:bigdataer