Spark Streaming程式設計指南
Overview
Spark Streaming屬於Spark的核心api,它支援高吞吐量、支援容錯的實時流資料處理。
它可以接受來自Kafka, Flume, Twitter, ZeroMQ和TCP Socket的資料來源,使用簡單的api函式比如 map
, reduce
, join
, window等操作,還可以直接使用內建的機器學習演算法、圖演算法包來處理資料。
它的工作流程像下面的圖所示一樣,接受到實時資料後,給資料分批次,然後傳給Spark Engine處理最後生成該批次的結果。
它支援的資料流叫Dstream,直接支援Kafka、Flume的資料來源。Dstream是一種連續的RDDs,下面是一個例子幫助大家理解Dstream。
A Quick Example
// 建立StreamingContext,1秒一個批次 val ssc = new StreamingContext(sparkConf, Seconds(1)); // 獲得一個DStream負責連線 監聽埠:地址 val lines = ssc.socketTextStream(serverIP, serverPort); // 對每一行資料執行Split操作 val words = lines.flatMap(_.split(" ")); // 統計word的數量 val pairs = words.map(word => (word, 1)); val wordCounts = pairs.reduceByKey(_ + _); // 輸出結果 wordCounts.print(); ssc.start(); // 開始 ssc.awaitTermination(); // 計算完畢退出
具體的程式碼可以訪問這個頁面:
https://github.com/apache/incubator-spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala
如果已經裝好Spark的朋友,我們可以通過下面的例子試試。
首先,啟動Netcat,這個工具在Unix-like的系統都存在,是個簡易的資料伺服器。
使用下面這句命令來啟動Netcat:
$ nc -lk 9999
接著啟動example
$ ./bin/run-example org.apache.spark.streaming.examples.NetworkWordCount local[2] localhost 9999
在Netcat這端輸入hello world,看Spark這邊的
# TERMINAL 1:
# Running Netcat
$ nc -lk 9999
hello world
...
# TERMINAL 2: RUNNING NetworkWordCount or JavaNetworkWordCount
$ ./bin/run-example org.apache.spark.streaming.examples.NetworkWordCount local[2] localhost 9999
...
-------------------------------------------
Time: 1357008430000 ms
-------------------------------------------
(hello,1)
(world,1)
...
Basics
下面這塊是如何編寫程式碼的啦,哇咔咔!
首先我們要在SBT或者Maven工程新增以下資訊:
groupId = org.apache.spark
artifactId = spark-streaming_2.10
version = 0.9.0-incubating
//需要使用一下資料來源的,還要新增相應的依賴
Source Artifact
Kafka spark-streaming-kafka_2.10
Flume spark-streaming-flume_2.10
Twitter spark-streaming-twitter_2.10
ZeroMQ spark-streaming-zeromq_2.10
MQTT spark-streaming-mqtt_2.10
接著就是例項化
new StreamingContext(master, appName, batchDuration, [sparkHome], [jars])
這是之前的例子對DStream的操作。
Input Sources
除了sockets之外,我們還可以這樣建立Dstream
streamingContext.fileStream(dataDirectory)
這裡有3個要點:
(1)dataDirectory下的檔案格式都是一樣
(2)在這個目錄下建立檔案都是通過移動或者重新命名的方式建立的
(3)一旦檔案進去之後就不能再改變
假設我們要建立一個Kafka的Dstream。
import org.apache.spark.streaming.kafka._
KafkaUtils.createStream(streamingContext, kafkaParams, ...)
如果我們需要自定義流的receiver,可以檢視https://spark.incubator.apache.org/docs/latest/streaming-custom-receivers.html
Operations
對於Dstream,我們可以進行兩種操作,transformations 和 output
Transformations
Transformation Meaning
map(func) 對每一個元素執行func方法
flatMap(func) 類似map函式,但是可以map到0+個輸出
filter(func) 過濾
repartition(numPartitions) 增加分割槽,提高並行度
union(otherStream) 合併兩個流
count() 統計元素的個數
reduce(func) 對RDDs裡面的元素進行聚合操作,2個輸入引數,1個輸出引數
countByValue() 針對型別統計,當一個Dstream的元素的型別是K的時候,呼叫它會返回一個新的Dstream,包含<K,Long>鍵值對,Long是每個K出現的頻率。
reduceByKey(func, [numTasks]) 對於一個(K, V)型別的Dstream,為每個key,執行func函式,預設是local是2個執行緒,cluster是8個執行緒,也可以指定numTasks
join(otherStream, [numTasks]) 把(K, V)和(K, W)的Dstream連線成一個(K, (V, W))的新Dstream
cogroup(otherStream, [numTasks]) 把(K, V)和(K, W)的Dstream連線成一個(K, Seq[V], Seq[W])的新Dstream
transform(func) 轉換操作,把原來的RDD通過func轉換成一個新的RDD
updateStateByKey(func) 針對key使用func來更新狀態和值,可以將state該為任何值
UpdateStateByKey Operation
使用這個操作,我們是希望儲存它狀態的資訊,然後持續的更新它,使用它有兩個步驟:
(1)定義狀態,這個狀態可以是任意的資料型別
(2)定義狀態更新函式,從前一個狀態更改新的狀態
下面展示一個例子:
def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
val newCount = ... // add the new values with the previous running count to get the new count
Some(newCount)
}
它可以用在包含(word, 1) 的Dstream當中,比如前面展示的example
val runningCounts = pairs.updateStateByKey[Int](updateFunction _)
它會針對裡面的每個word呼叫一下更新函式,newValues是最新的值,runningCount是之前的值。
Transform Operation
和transformWith一樣,可以對一個Dstream進行RDD->RDD操作,比如我們要對Dstream流裡的RDD和另外一個數據集進行join操作,但是Dstream的API沒有直接暴露出來,我們就可以使用transform方法來進行這個操作,下面是例子:
val spamInfoRDD = sparkContext.hadoopFile(...) // RDD containing spam information
val cleanedDStream = inputDStream.transform(rdd => {
rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
...
})
另外,我們也可以在裡面使用機器學習演算法和圖演算法。
Window Operations
、
先舉個例子吧,比如前面的word count的例子,我們想要每隔10秒計算一下最近30秒的單詞總數。
我們可以使用以下語句:
// Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10))
這裡面提到了windows的兩個引數:
(1)window length:window的長度是30秒,最近30秒的資料
(2)slice interval:計算的時間間隔
通過這個例子,我們大概能夠視窗的意思了,定期計算滑動的資料。
下面是window的一些操作函式,還是有點兒理解不了window的概念,Meaning就不翻譯了,直接刪掉
Transformation Meaning
window(windowLength, slideInterval)
countByWindow(windowLength, slideInterval)
reduceByWindow(func, windowLength, slideInterval)
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])
countByValueAndWindow(windowLength, slideInterval, [numTasks])
Output Operations
Output Operation Meaning
print() 列印到控制檯
foreachRDD(func) 對Dstream裡面的每個RDD執行func,儲存到外部系統
saveAsObjectFiles(prefix, [suffix]) 儲存流的內容為SequenceFile, 檔名 : "prefix-TIME_IN_MS[.suffix]".
saveAsTextFiles(prefix, [suffix]) 儲存流的內容為文字檔案, 檔名 : "prefix-TIME_IN_MS[.suffix]".
saveAsHadoopFiles(prefix, [suffix]) 儲存流的內容為hadoop檔案, 檔名 : "prefix-TIME_IN_MS[.suffix]".
Persistence
Dstream中的RDD也可以呼叫persist()方法儲存在記憶體當中,但是基於window和state的操作,reduceByWindow,reduceByKeyAndWindow,
updateStateByKey它們就是隱式的儲存了,系統已經幫它自動儲存了。
從網路接收的資料(such as, Kafka, Flume, sockets, etc.),預設是儲存在兩個節點來實現容錯性,以序列化的方式儲存在記憶體當中。
RDD Checkpointing
狀態的操作是基於多個批次的資料的。它包括基於window的操作和updateStateByKey。因為狀態的操作要依賴於上一個批次的資料,所以它要根據時間,不斷累積元資料。為了清空資料,它支援週期性的檢查點,通過把中間結果儲存到hdfs上。因為檢查操作會導致儲存到hdfs上的開銷,所以設定這個時間間隔,要很慎重。對於小批次的資料,比如一秒的,檢查操作會大大降低吞吐量。但是檢查的間隔太長,會導致任務變大。通常來說,5-10秒的檢查間隔時間是比較合適的。
ssc.checkpoint(hdfsPath) //設定檢查點的儲存位置
dstream.checkpoint(checkpointInterval) //設定檢查點間隔
對於必須設定檢查點的Dstream,比如通過updateStateByKey
和reduceByKeyAndWindow建立的Dstream,預設設定是至少10秒。
Performance Tuning
對於調優,可以從兩個方面考慮:
(1)利用叢集資源,減少處理每個批次的資料的時間
(2)給每個批次的資料量的設定一個合適的大小
Level of Parallelism
像一些分散式的操作,比如reduceByKey和reduceByKeyAndWindow,預設的8個併發執行緒,可以通過對應的函式提高它的值,或者通過修改引數spark.default.parallelism來提高這個預設值。
Task Launching Overheads
通過進行的任務太多也不好,比如每秒50個,傳送任務的負載就會變得很重要,很難實現壓秒級的時延了,當然可以通過壓縮來降低批次的大小。
Setting the Right Batch Size
要使流程式能在叢集上穩定的執行,要使處理資料的速度跟上資料流入的速度。最好的方式計算這個批量的大小,我們首先設定batch size為5-10秒和一個很低的資料輸入速度。確實系統能跟上資料的速度的時候,我們可以根據經驗設定它的大小,通過檢視日誌看看Total delay的多長時間。如果delay的小於batch的,那麼系統可以穩定,如果delay一直增加,說明系統的處理速度跟不上資料的輸入速度。
24/7 Operation
Spark預設不會忘記元資料,比如生成的RDD,處理的stages,但是Spark Streaming是一個24/7的程式,它需要週期性的清理元資料,通過spark.cleaner.ttl來設定。比如我設定它為600,當超過10分鐘的時候,Spark就會清楚所有元資料,然後持久化RDDs。但是這個屬性要在SparkContext 建立之前設定。
但是這個值是和任何的window操作繫結。Spark會要求輸入資料在過期之後必須持久化到記憶體當中,所以必須設定delay的值至少和最大的window操作一致,如果設定小了,就會報錯。
Monitoring
除了Spark內建的監控能力,還可以StreamingListener這個介面來獲取批處理的時間, 查詢時延, 全部的端到端的試驗。
Memory Tuning
Spark Stream預設的序列化方式是StorageLevel.MEMORY_ONLY_SER,而不是RDD的StorageLevel.MEMORY_ONLY。
預設的,所有持久化的RDD都會通過被Spark的LRU演算法剔除出記憶體,如果設定了spark.cleaner.ttl,就會週期性的清理,但是這個引數設定要很謹慎。一個更好的方法是設定spark.streaming.unpersist為true,這就讓Spark來計算哪些RDD需要持久化,這樣有利於提高GC的表現。
推薦使用concurrent mark-and-sweep GC,雖然這樣會降低系統的吞吐量,但是這樣有助於更穩定的進行批處理。
Fault-tolerance Properties
Failure of a Worker Node
下面有兩種失效的方式:
1.使用hdfs上的檔案,因為hdfs是可靠的檔案系統,所以不會有任何的資料失效。
2.如果資料來源是網路,比如Kafka和Flume,為了防止失效,預設是資料會儲存到2個節點上,但是有一種可能性是接受資料的節點掛了,那麼資料可能會丟失,因為它還沒來得及把資料複製到另外一個節點。
Failure of the Driver Node
為了支援24/7不間斷的處理,Spark支援驅動節點失效後,重新恢復計算。Spark Streaming會週期性的寫資料到hdfs系統,就是前面的檢查點的那個目錄。驅動節點失效之後,StreamingContext可以被恢復的。
為了讓一個Spark Streaming程式能夠被回覆,它需要做以下操作:
(1)第一次啟動的時候,建立 StreamingContext,建立所有的streams,然後呼叫start()方法。
(2)恢復後重啟的,必須通過檢查點的資料重新建立StreamingContext。
下面是一個實際的例子:
通過StreamingContext.getOrCreate來構造StreamingContext,可以實現上面所說的。
// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(...) // new context
val lines = ssc.socketTextStream(...) // create DStreams
...
ssc.checkpoint(checkpointDirectory) // set checkpoint directory
ssc
}
// Get StreaminContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...
// Start the context
context.start()
context.awaitTermination()
在stand-alone的部署模式下面,驅動節點失效了,也可以自動恢復,讓別的驅動節點替代它。這個可以在本地進行測試,在提交的時候採用supervise模式,當提交了程式之後,使用jps檢視程序,看到類似DriverWrapper就殺死它,如果是使用YARN模式的話就得使用其它方式來重新啟動了。
這裡順便提一下向客戶端提交程式吧,之前總結的時候把這塊給落下了。
./bin/spark-class org.apache.spark.deploy.Client launch
[client-options]
<cluster-url> <application-jar-url> <main-class>
[application-options]
cluster-url: master的地址.
application-jar-url: jar包的地址,最好是hdfs上的,帶上hdfs://...否則要所有的節點的目錄下都有這個jar的
main-class: 要釋出的程式的main函式所在類.
Client Options:
--memory <count> (驅動程式的記憶體,單位是MB)
--cores <count> (為你的驅動程式分配多少個核心)
--supervise (節點失效的時候,是否重新啟動應用)
--verbose (列印增量的日誌輸出)
在未來的版本,會支援所有的資料來源的可恢復性。
為了更好的理解基於HDFS的驅動節點失效恢復,下面用一個簡單的例子來說明:
Time Number of lines in input file Output without driver failure Output with driver failure
1 10 10 10
2 20 20 20
3 30 30 30
4 40 40 [DRIVER FAILS] no output
5 50 50 no output
6 60 60 no output
7 70 70 [DRIVER RECOVERS] 40, 50, 60, 70
8 80 80 80
9 90 90 90
10 100 100 100
在4的時候出現了錯誤,40,50,60都沒有輸出,到70的時候恢復了,恢復之後把之前沒輸出的一下子全部輸出。