1. 程式人生 > >Spark Streaming——Spark第一代實時計算引擎

Spark Streaming——Spark第一代實時計算引擎

雖然SparkStreaming已經停止更新,Spark的重點也放到了 Structured Streaming ,但由於Spark版本過低或者其他技術選型問題,可能還是會選擇SparkStreaming。 SparkStreaming對於時間視窗,事件時間雖然支撐較少,但還是可以滿足部分的實時計算場景的,SparkStreaming資料較多,這裡也做一個簡單介紹。 # **一.** **什麼是Spark Streaming** ![](https://img2020.cnblogs.com/blog/1089984/202008/1089984-20200805160512369-109959799.png) Spark Streaming在當時是為了與當時的Apache Storm競爭,也讓Spark可以用於流式資料的處理。根據其官方文件介紹,Spark Streaming有高吞吐量和容錯能力強等特點。Spark Streaming支援的資料輸入源很多,例如:Kafka、Flume、Twitter、ZeroMQ和簡單的TCP套接字等等。資料輸入後可以用Spark的高度抽象原語如:map、reduce、join、window等進行運算。而結果也能儲存在很多地方,如HDFS,資料庫等。另外Spark Streaming也能和MLlib(機器學習)以及Graphx完美融合。 當然Storm目前已經漸漸淡出,Flink開始大放異彩。 ![](https://img2020.cnblogs.com/blog/1089984/202008/1089984-20200805160534384-2133653011.png) ### **Spark與Storm的對比** ![](https://img2020.cnblogs.com/blog/1089984/202008/1089984-20200805160621549-278237844.png) ## 二、SparkStreaming入門 Spark Streaming 是 Spark Core API 的擴充套件,它支援彈性的,高吞吐的,容錯的實時資料流的處理。資料可以通過多種資料來源獲取,例如 Kafka,Flume,Kinesis 以及 TCP sockets,也可以通過例如 `map`,`reduce`,`join`,`window` 等的高階函式組成的複雜演算法處理。最終,處理後的資料可以輸出到檔案系統,資料庫以及實時儀表盤中。事實上,你還可以在 data streams(資料流)上使用 [機器學習] 以及 [圖計算] 演算法。 在內部,它工作原理如下,Spark Streaming 接收實時輸入資料流並將資料切分成多個 batch(批)資料,然後由 Spark 引擎處理它們以生成最終的 stream of results in batches(分批流結果)。 ![](https://img2020.cnblogs.com/blog/1089984/202008/1089984-20200805160903163-1300240571.jpg) Spark Streaming 提供了一個名為 *discretized stream* 或 *DStream* 的高階抽象,它代表一個連續的資料流。DStream 可以從資料來源的輸入資料流建立,例如 Kafka,Flume 以及 Kinesis,或者在其他 DStream 上進行高層次的操作以建立。在內部,一個 DStream 是通過一系列的 [RDDs] 來表示。 本指南告訴你如何使用 DStream 來編寫一個 Spark Streaming 程式。你可以使用 Scala,Java 或者 Python(Spark 1.2 版本後引進)來編寫 Spark Streaming 程式。 在idea中新建maven專案 引入依賴 ``` ``` Project Structure —— Global Libraries —— 把scala 新增到 add module 新建Scala Class ``` import org.apache.log4j.{Level, Logger} import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext} object Demo { //遮蔽日誌 Logger.getLogger("org.apache")setLevel(Level.WARN) def main(args: Array[String]): Unit = { //local會有問題 最少兩個執行緒 一個拿資料 一個計算 //val conf = new SparkConf().setAppName(s"${this.getClass.getSimpleName}").setMaster("local") val conf = new SparkConf().setAppName(s"${this.getClass.getSimpleName}").setMaster("local[2]") //時間間隔 val ssc = new StreamingContext(conf,Seconds(1)) //接收資料 處理 //socket demo val value: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999) val words: DStream[String] = value.flatMap(_.split(" ")) val wordsTuple: DStream[(String, Int)] = words.map((_, 1)) val wordcount: DStream[(String, Int)] = wordsTuple.reduceByKey(_ + _) //觸發action wordcount.print() ssc.start() //保持流的執行 等待程式被終止 ssc.awaitTermination() } } ``` 測試 下載一個win10 用的netcat https://eternallybored.org/misc/netcat/ 下載[netcat 1.12](https://eternallybored.org/misc/netcat/netcat-win32-1.12.zip) 解壓 在目錄下啟動cmd 輸入 ``` nc -L -p 9999 ``` 開始輸入單詞 在idea中驗證接收 ## 原理 ##### 初始化StreamingContext 為了初始化一個 Spark Streaming 程式,一個 **StreamingContext** 物件必須要被創建出來,它是所有的 Spark Streaming 功能的主入口點。 ``` import org.apache.spark._ import org.apache.spark.streaming._ val conf = new SparkConf().setAppName(appName).setMaster(master) val ssc = new StreamingContext(conf, Seconds(1)) ``` `appName` 引數是展示在叢集 UI 介面上的應用程式的名稱 `master` 是local 或者spark叢集的url(mesos yarn) 本地測試可以用**local[\*]** 注意要多於兩個執行緒 Second(1)定義的是batch interval 批處理間隔 就是間隔多久去拿一次資料 在定義一個 context 之後,您必須執行以下操作。 1. 通過建立輸入 DStreams 來定義輸入源。 2. 通過應用轉換和輸出操作 DStreams 定義流計算(streaming computations)。 3. 開始接收輸入並且使用 `streamingContext.start()` 來處理資料。 4. 使用 `streamingContext.awaitTermination()` 等待處理被終止(手動或者由於任何錯誤)。 5. 使用 `streamingContext.stop()` 來手動的停止處理。 需要記住的幾點: - 一旦一個 context 已經啟動,將不會有新的資料流的計算可以被建立或者新增到它。 - 一旦一個 context 已經停止,它不會被重新啟動。 - 同一時間內在 JVM 中只有一個 StreamingContext 可以被啟用。 - 在 StreamingContext 上的 stop() 同樣也停止了 SparkContext。為了只停止 StreamingContext,設定 `stop()` 的可選引數,名叫 `stopSparkContext` 為 false。 - 一個 SparkContext 就可以被重用以建立多個 StreamingContexts,只要前一個 StreamingContext 在下一個StreamingContext 被建立之前停止(不停止 SparkContext)。 ##### **Discretized Stream** or **DStream** **Discretized Stream** or **DStream** 是 Spark Streaming 提供的基本抽象。它代表了一個連續的資料流。可能是資料來源接收的流,也可能是轉換後的流。 DStream就是多個和時間相關的一系列連續RDD的集合,比如本例就是間隔一秒的一堆RDD的集合 ![](https://img2020.cnblogs.com/blog/1089984/202008/1089984-20200805161950426-906145656.jpg) DStream也是有依賴關係的 flatMap 操作也是直接作用在DStream上的,就和作用於RDD一樣 這樣很好理解 ![](https://img2020.cnblogs.com/blog/1089984/202008/1089984-20200805161937182-1675969293.jpg) 我們先來看資料來源接收的流 這種叫做Input DStreams 他會通過Receivers接收器去不同的資料來源接收資料。 Spark Streaming內建了兩種資料來源: - 基礎的資料來源:比如剛才用的socket接收 還有file systems - 高階的資料來源:比如kafka 還有flume kinesis等等 注意本地執行時,不要用local或者local[1],一個執行緒不夠。放到叢集上時分配給SparkStreaming的核數必須大於接收器的數量,留一個核去處理資料。 我們也可以自定義資料來源,那我們就需要自己開發一個接收器。 ##### Transformations 在我們接收到Dstreams之後可以進行轉換操作,常見轉換如下: | Transformation(轉換) | Meaning(含義) | | ---------------------------------------- | ------------------------------------------------------------ | | **map**(*func*) | 利用函式 *func* 處理原 DStream 的每個元素,返回一個新的 DStream。 | | **flatMap**(*func*) | 與 map 相似,但是每個輸入項可用被對映為 0 個或者多個輸出項。。 | | **filter**(*func*) | 返回一個新的 DStream,它僅僅包含原 DStream 中函式 *func* 返回值為 true 的項。 | | **repartition**(*numPartitions*) | 通過建立更多或者更少的 partition 以改變這個 DStream 的並行級別(level of parallelism)。 | | **union**(*otherStream*) | 返回一個新的 DStream,它包含源 DStream 和 *otherDStream* 的所有元素。 | | **count**() | 通過 count 源 DStream 中每個 RDD 的元素數量,返回一個包含單元素(single-element)RDDs 的新 DStream。 | | **reduce**(*func*) | 利用函式 *func* 聚集源 DStream 中每個 RDD 的元素,返回一個包含單元素(single-element)RDDs 的新 DStream。函式應該是相關聯的,以使計算可以並行化。 | | **countByValue**() | 在元素型別為 K 的 DStream上,返回一個(K,long)pair 的新的 DStream,每個 key 的值是在原 DStream 的每個 RDD 中的次數。 | | **reduceByKey**(*func*, [_numTasks_]) | 當在一個由 (K,V) pairs 組成的 DStream 上呼叫這個運算元時,返回一個新的,由 (K,V) pairs 組成的 DStream,每一個 key 的值均由給定的 reduce 函式聚合起來。**注意**:在預設情況下,這個運算元利用了 Spark 預設的併發任務數去分組。你可以用 numTasks 引數設定不同的任務數。 | | **join**(*otherStream*, [_numTasks_]) | 當應用於兩個 DStream(一個包含(K,V)對,一個包含 (K,W) 對),返回一個包含 (K, (V, W)) 對的新 DStream。 | | **cogroup**(*otherStream*, [_numTasks_]) | 當應用於兩個 DStream(一個包含(K,V)對,一個包含 (K,W) 對),返回一個包含 (K, Seq[V], Seq[W]) 的 tuples(元組)。 | | **transform**(*func*) | 通過對源 DStream 的每個 RDD 應用 RDD-to-RDD 函式,建立一個新的 DStream。這個可以在 DStream 中的任何 RDD 操作中使用。 | | **updateStateByKey**(*func*) | 返回一個新的 "狀態" 的 DStream,其中每個 key 的狀態通過在 key 的先前狀態應用給定的函式和 key 的新 valyes 來更新。這可以用於維護每個 key 的任意狀態資料。 | 這裡我們特別介紹一下updateStateByKey 我們如果需要對歷史資料進行統計,可能需要去kafka裡拿一下之前留存的資料,也可以用updateStateByKey這個方法。 ``` //儲存狀態 聚合相同的單詞 val wordcount = wordsTuple.updateStateByKey[Int]( //updateFunction _ (newValues: Seq[Int], runningCount: Option[Int])=> { val newCount = Some(newValues.sum + runningCount.getOrElse(0)) newCount } ) ``` 比如剛才的單詞計數,我們只能統計每一次發過來的訊息,但是如果希望統計多次訊息就需要用到這個,我們要指定一個checkpoint,就是從哪開始算。 ``` //增加成員變數 val checkpointDir = "./ckp" //在方法中加入checkpoint ssc.checkpoint(checkpointDir) val value: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999) value.checkpoint(Seconds(4))//官方建議批次時間的1-5倍 ``` 這時候我們建立StreamingContext的方法就要改變了 我們把剛才的建立過程提取成方法。 ``` def creatingFunc():StreamingContext = { val conf = new SparkConf().setAppName(s"${this.getClass.getSimpleName}").setMaster("local[*]") val ssc = new StreamingContext(conf, Seconds(1)) ssc.checkpoint(checkpointDir) val value: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999) value.checkpoint(Seconds(4))//官方建議批次時間的1-5倍 val words: DStream[String] = value.flatMap(_.split(" ")) val wordsTuple: DStream[(String, Int)] = words.map((_, 1)) //儲存狀態 聚合相同的單詞 val wordcount = wordsTuple.updateStateByKey[Int]( //updateFunction _ (newValues: Seq[Int], runningCount: Option[Int])=> { val newCount = Some(newValues.sum + runningCount.getOrElse(0)) newCount } ) //觸發action wordcount.print() ssc } ``` 在mian函式中修改為: ``` def main(args: Array[String]): Unit = { val ssc = StreamingContext.getOrCreate(checkpointDir,creatingFunc _) ssc.start() //保持流的執行 等待程式被終止 ssc.awaitTermination() } ``` 這樣就是,如果有checkpoint,程式會在checkpoint中把程式載入回來(程式被儲存為二進位制),沒有checkpoint的話才會建立。 將目錄下的checkpoint刪除,就可以將狀態刪除。 生產中updateStateByKey由於會將資料備份要慎重使用,可以考慮用hbase,redis等做替代。或者藉助kafka做聚合處理。 ``` //如果不用updatestateByKey 可以考慮redis wordsTuple.foreachRDD(rdd => { rdd.foreachPartition(i => { //redis } ) }) ``` ##### 視窗操作 Spark Streaming 也支援 _windowed computations(視窗計算),它允許你在資料的一個滑動視窗上應用 transformation(轉換)。 ![](https://img2020.cnblogs.com/blog/1089984/202008/1089984-20200805162139288-1841949559.jpg) 如上圖顯示,視窗在源 DStream 上 _slides(滑動),任何一個視窗操作都需要指定兩個引數: - *window length(視窗長度)* - 視窗的持續時間。 - *sliding interval(滑動間隔)* - 執行視窗操作的間隔。 比如計算過去30秒的詞頻: ``` val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10)) ``` 一些常用的視窗操作如下所示,這些操作都需要用到上文提到的兩個引數 - *windowLength(視窗長度)* 和 _slideInterval(滑動的時間間隔)_。 | Transformation(轉換) | Meaning(含義) | | ------------------------------------------------------------ | ------------------------------------------------------------ | | **window**(*windowLength*, *slideInterval*) | 返回一個新的 DStream,它是基於 source DStream 的視窗 batch 進行計算的。 | | **countByWindow**(*windowLength*, *slideInterval*) | 返回 stream(流)中滑動視窗元素的數 | | **reduceByWindow**(*func*, *windowLength*, *slideInterval*) | 返回一個新的單元素 stream(流),它通過在一個滑動間隔的 stream 中使用 *func* 來聚合以建立。該函式應該是 associative(關聯的)且 commutative(可交換的),以便它可以平行計算 | | **reduceByKeyAndWindow**(*func*, *windowLength*, *slideInterval*, [_numTasks_]) | 在一個 (K, V) pairs 的 DStream 上呼叫時,返回一個新的 (K, V) pairs 的 Stream,其中的每個 key 的 values 是在滑動視窗上的 batch 使用給定的函式 *func* 來聚合產生的。**Note(注意):** 預設情況下,該操作使用 Spark 的預設並行任務數量(local model 是 2,在 cluster mode 中的數量通過 `spark.default.parallelism` 來確定)來做 grouping。您可以通過一個可選的 `numTasks` 引數來設定一個不同的 tasks(任務)數量。 | | **reduceByKeyAndWindow**(*func*, *invFunc*, *windowLength*, *slideInterval*, [_numTasks_]) | 上述 `reduceByKeyAndWindow()` 的更有效的一個版本,其中使用前一視窗的 reduce 值逐漸計算每個視窗的 reduce值。這是通過減少進入滑動視窗的新資料,以及 “inverse reducing(逆減)” 離開視窗的舊資料來完成的。一個例子是當視窗滑動時”新增” 和 “減” keys 的數量。然而,它僅適用於 “invertible reduce functions(可逆減少函式)”,即具有相應 “inverse reduce(反向減少)” 函式的 reduce 函式(作為引數 *invFunc )。像在 reduceByKeyAndWindow 中的那樣,reduce 任務的數量可以通過可選引數進行配置。請注意,針對該操作的使用必須啟用 checkpointing.* | | **countByValueAndWindow**(*windowLength*, *slideInterval*, [_numTasks_]) | 在一個 (K, V) pairs 的 DStream 上呼叫時,返回一個新的 (K, Long) pairs 的 DStream,其中每個 key 的 value 是它在一個滑動視窗之內的頻次。像 code>reduceByKeyAndWindow 中的那樣,reduce 任務的數量可以通過可選引數進行配置。 | ##### Join操作 在 Spark Streaming 中可以執行不同型別的 join ``` val stream1: DStream[String, String] = ... val stream2: DStream[String, String] = ... val joinedStream = stream1.join(stream2) //也可以用視窗 val windowedStream1 = stream1.window(Seconds(20)) val windowedStream2 = stream2.window(Minutes(1)) val joinedStream = windowedStream1.join(windowedStream2) ``` ##### DStreams輸出操作 輸出操作允許將 DStream 的資料推送到外部系統,如資料庫或檔案系統。 會觸發所有變換的執行,類似RDD的action操作。有如下操作: | Output Operation | Meaning | | ------------------------------------------------ | ------------------------------------------------------------ | | **print**() | 在執行流應用程式的 driver 節點上的DStream中列印每批資料的前十個元素。這對於開發和除錯很有用。 | | Python API 這在 Python API 中稱為 **pprint()**。 | | | **saveAsTextFiles**(*prefix*, [_suffix_]) | 將此 DStream 的內容另存為文字檔案。每個批處理間隔的檔名是根據 *字首* 和 *字尾_:*"prefix-TIME_IN_MS[.suffix]"_ 生成的。 | | **saveAsObjectFiles**(*prefix*, [_suffix_]) | 將此 DStream 的內容另存為序列化 Java 物件的 `SequenceFiles`。每個批處理間隔的檔名是根據 *字首* 和 *字尾_:*"prefix-TIME_IN_MS[.suffix]"_ 生成的。 | | Python API 這在Python API中是不可用的。 | | | **saveAsHadoopFiles**(*prefix*, [_suffix_]) | 將此 DStream 的內容另存為 Hadoop 檔案。每個批處理間隔的檔名是根據 *字首* 和 *字尾_:*"prefix-TIME_IN_MS[.suffix]"_ 生成的。 | | Python API 這在Python API中是不可用的。 | | | **foreachRDD**(*func*) | 對從流中生成的每個 RDD 應用函式 *func* 的最通用的輸出運算子。此功能應將每個 RDD 中的資料推送到外部系統,例如將 RDD 儲存到檔案,或將其通過網路寫入資料庫。請注意,函式 *func* 在執行流應用程式的 driver 程序中執行,通常會在其中具有 RDD 動作,這將強制流式傳輸 RDD 的計算。 | ##### foreachRDD設計模式使用 dstream.foreachRDD允許將資料傳送到外部系統。 但我們不要每次都建立一個連線,解決方案如下: 減少開銷,分割槽分攤開銷 ``` dstream.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => val connection = createNewConnection() partitionOfRecords.foreach(record => connection.send(record)) connection.close() } } ``` 更好的做法是用靜態資源池: ``` dstream.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => // ConnectionPool is a static, lazily initialized pool of connections val connection = ConnectionPool.getConnection() partitionOfRecords.foreach(record => connection.send(record)) ConnectionPool.returnConnection(connection) // return to the pool for future reuse } } ``` ## 連線Kafka Apache Kafka是一個高效能的訊息系統,由Scala 寫成。是由Apache 軟體基金會開發的一個開源訊息系統專案。 Kafka 最初是由LinkedIn 開發,並於2011 年初開源。2012 年10 月從Apache Incubator 畢業。該專案的目標是為處理實時資料提供一個統一、高通量、低等待(低延時)的平臺。 更多kafka相關請檢視[Kafka入門寶典(詳細截圖版)](https://mp.weixin.qq.com/s/oFEv5c5zO7NAMA3YYB3CrQ) Spark Streaming 2.4.4相容 kafka 0.10.0 或者更高的版本 Spark Streaming在2.3.0版本之前是提供了對kafka 0.8 和 0.10的支援的 ,不過在2.3.0以後對0.8的支援取消了。 **Note: Kafka 0.8 support is deprecated as of Spark 2.3.0.** | | [spark-streaming-kafka-0-8](http://spark.apache.org/docs/latest/streaming-kafka-0-8-integration.html) | [spark-streaming-kafka-0-10](http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html) | | :------------------------- | :----------------------------------------------------------- | :----------------------------------------------------------- | | Broker Version | 0.8.2.1 or higher | 0.10.0 or higher | | API Maturity | Deprecated | Stable | | Language Support | Scala, Java, Python | Scala, Java | | Receiver DStream | Yes | No | | Direct DStream | Yes | Yes | | SSL / TLS Support | No | Yes | | Offset Commit API | No | Yes | | Dynamic Topic Subscription | No | Yes | ##### Receiver 這裡簡單介紹一下對kafka0.8的一種支援方式:基於Receiver 依賴: ``` groupId = org.apache.spark artifactId = spark-streaming-kafka-0-8_2.12 version = 2.4.4 ``` ``` import org.apache.spark.streaming.kafka._ val kafkaStream = KafkaUtils.createStream(streamingContext, [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume]) ``` 這種情況 程式停掉資料會丟失,為了不丟失自己又寫了一份,這種是很多餘的。 由於採用了kafka高階api,偏移量offset不可控。 ##### Direct Kafka 0.10.0版本以後,採用了更好的一種Direct方式,這種我們需要自己維護偏移量offset。 ![](https://img2020.cnblogs.com/blog/1089984/202008/1089984-20200805161732402-907962801.png) 直連方式 並行度會更高 生產環境用的最多,0.8版本需要在zk或者redis等地方自己維護偏移量。我們使用0.10以上版本支援自己設定偏移量,我們只需要自己將偏移量寫回kafka就可以。 依賴 ``` groupId = org.apache.spark artifactId = spark-streaming-kafka-0-10_2.12 version = 2.4.4 ``` kafka 0.10以後 可以將offset寫回kafka 我們不需要自己維護offset了,具體程式碼如下: ``` val conf = new SparkConf().setAppName("KafkaStreaming").setMaster("local[*]") val ssc = new StreamingContext(conf,Seconds(2)) val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "localhost:9092,anotherhost:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "use_a_separate_group_id_for_each_stream", //latest none earliest "auto.offset.reset" -> "earliest", //自動提交偏移量 false "enable.auto.commit" -> (false: java.lang.Boolean) ) //val topics = Array("topicA", "topicB") val topics = Array("test_topic") val stream = KafkaUtils.createDirectStream[String, String]( ssc, // 與kafka broker不在一個節點上 用不同策略 //在一個節點用 PreferBrokers策略 很少見 LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaParams) ) stream.foreachRDD(rdd => { //普通的RDD不能強轉HasOffsetRanges 但kafkaRDD有 with這個特性 可以強轉 val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges //處理資料 計算邏輯 rdd.foreachPartition { iter => //一次處理一個分割槽的資料 獲取這個分割槽的偏移量 //計算完以後修改偏移量 要開啟事務 類似資料庫 connection -> conn.setAutoCommit(false) 各種操作 conn.commit(); conn.rollback() //獲取偏移量 如果要自己記錄的話這個 //val o: OffsetRange = offsetRanges(TaskContext.get.partitionId) //println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}") //處理資料 iter.foreach(println) } //kafka 0.10新特性 處理完資料後 將偏移量寫回kafka // some time later, after outputs have completed //kafka有一個特殊的topic 儲存偏移量 stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) }) ``` 更多Flink,Kafka,Spark等相關技術博文,科技資訊,歡迎關注實時流式計算 公眾號後臺回覆 “電子書” 下載300頁Flink實戰電子書 ![](https://img2020.cnblogs.com/blog/1089984/202005/1089984-20200511083216576-14373893