1. 程式人生 > 其它 >Spark Streaming程式設計指南

Spark Streaming程式設計指南

Overview

Spark Streaming屬於Spark的核心api,它支援高吞吐量、支援容錯的實時流資料處理。

它可以接受來自Kafka, Flume, Twitter, ZeroMQ和TCP Socket的資料來源,使用簡單的api函式比如 mapreducejoinwindow等操作,還可以直接使用內建的機器學習演算法、圖演算法包來處理資料。

它的工作流程像下面的圖所示一樣,接受到實時資料後,給資料分批次,然後傳給Spark Engine處理最後生成該批次的結果。

它支援的資料流叫Dstream,直接支援Kafka、Flume的資料來源。Dstream是一種連續的RDDs,下面是一個例子幫助大家理解Dstream。

A Quick Example

// 建立StreamingContext,1秒一個批次
val ssc = new StreamingContext(sparkConf, Seconds(1));

// 獲得一個DStream負責連線 監聽埠:地址
val lines = ssc.socketTextStream(serverIP, serverPort);

// 對每一行資料執行Split操作
val words = lines.flatMap(_.split(" "));
// 統計word的數量
val pairs = words.map(word => (word, 1));
val wordCounts = pairs.reduceByKey(_ + _);

// 輸出結果
wordCounts.print();

ssc.start();             // 開始
ssc.awaitTermination();  // 計算完畢退出

具體的程式碼可以訪問這個頁面:

https://github.com/apache/incubator-spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala

如果已經裝好Spark的朋友,我們可以通過下面的例子試試。

首先,啟動Netcat,這個工具在Unix-like的系統都存在,是個簡易的資料伺服器。

使用下面這句命令來啟動Netcat:

$ nc -lk 9999

接著啟動example

$ ./bin/run-example org.apache.spark.streaming.examples.NetworkWordCount local[2] localhost 9999

在Netcat這端輸入hello world,看Spark這邊的

# TERMINAL 1:
# Running Netcat

$ nc -lk 9999

hello world

...
# TERMINAL 2: RUNNING NetworkWordCount or JavaNetworkWordCount

$ ./bin/run-example org.apache.spark.streaming.examples.NetworkWordCount local[2] localhost 9999
...
-------------------------------------------
Time: 1357008430000 ms
-------------------------------------------
(hello,1)
(world,1)
...

Basics

下面這塊是如何編寫程式碼的啦,哇咔咔!

首先我們要在SBT或者Maven工程新增以下資訊:

groupId = org.apache.spark
artifactId = spark-streaming_2.10
version = 0.9.0-incubating
//需要使用一下資料來源的,還要新增相應的依賴
Source    Artifact
Kafka     spark-streaming-kafka_2.10
Flume     spark-streaming-flume_2.10
Twitter     spark-streaming-twitter_2.10
ZeroMQ     spark-streaming-zeromq_2.10
MQTT     spark-streaming-mqtt_2.10

接著就是例項化

new StreamingContext(master, appName, batchDuration, [sparkHome], [jars])

這是之前的例子對DStream的操作。

Input Sources

除了sockets之外,我們還可以這樣建立Dstream

streamingContext.fileStream(dataDirectory)

這裡有3個要點:

(1)dataDirectory下的檔案格式都是一樣

(2)在這個目錄下建立檔案都是通過移動或者重新命名的方式建立的

(3)一旦檔案進去之後就不能再改變

假設我們要建立一個Kafka的Dstream。

import org.apache.spark.streaming.kafka._
KafkaUtils.createStream(streamingContext, kafkaParams, ...)

如果我們需要自定義流的receiver,可以檢視https://spark.incubator.apache.org/docs/latest/streaming-custom-receivers.html

Operations

對於Dstream,我們可以進行兩種操作,transformations 和 output 

Transformations

Transformation                          Meaning
map(func)                        對每一個元素執行func方法
flatMap(func)                    類似map函式,但是可以map到0+個輸出
filter(func)                     過濾
repartition(numPartitions)       增加分割槽,提高並行度     
union(otherStream)               合併兩個流
count()                    統計元素的個數
reduce(func)                     對RDDs裡面的元素進行聚合操作,2個輸入引數,1個輸出引數
countByValue()                   針對型別統計,當一個Dstream的元素的型別是K的時候,呼叫它會返回一個新的Dstream,包含<K,Long>鍵值對,Long是每個K出現的頻率。
reduceByKey(func, [numTasks])    對於一個(K, V)型別的Dstream,為每個key,執行func函式,預設是local是2個執行緒,cluster是8個執行緒,也可以指定numTasks 
join(otherStream, [numTasks])    把(K, V)和(K, W)的Dstream連線成一個(K, (V, W))的新Dstream 
cogroup(otherStream, [numTasks]) 把(K, V)和(K, W)的Dstream連線成一個(K, Seq[V], Seq[W])的新Dstream 
transform(func)                  轉換操作,把原來的RDD通過func轉換成一個新的RDD
updateStateByKey(func)           針對key使用func來更新狀態和值,可以將state該為任何值

UpdateStateByKey Operation

使用這個操作,我們是希望儲存它狀態的資訊,然後持續的更新它,使用它有兩個步驟:

(1)定義狀態,這個狀態可以是任意的資料型別

(2)定義狀態更新函式,從前一個狀態更改新的狀態

下面展示一個例子:

def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
    val newCount = ...  // add the new values with the previous running count to get the new count
    Some(newCount)
}

它可以用在包含(word, 1) 的Dstream當中,比如前面展示的example

val runningCounts = pairs.updateStateByKey[Int](updateFunction _)

它會針對裡面的每個word呼叫一下更新函式,newValues是最新的值,runningCount是之前的值。

Transform Operation

和transformWith一樣,可以對一個Dstream進行RDD->RDD操作,比如我們要對Dstream流裡的RDD和另外一個數據集進行join操作,但是Dstream的API沒有直接暴露出來,我們就可以使用transform方法來進行這個操作,下面是例子:

val spamInfoRDD = sparkContext.hadoopFile(...) // RDD containing spam information

val cleanedDStream = inputDStream.transform(rdd => {
  rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
  ...
})

另外,我們也可以在裡面使用機器學習演算法和圖演算法。

Window Operations

先舉個例子吧,比如前面的word count的例子,我們想要每隔10秒計算一下最近30秒的單詞總數。

我們可以使用以下語句:

// Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10))

這裡面提到了windows的兩個引數:

(1)window length:window的長度是30秒,最近30秒的資料

(2)slice interval:計算的時間間隔

通過這個例子,我們大概能夠視窗的意思了,定期計算滑動的資料。

下面是window的一些操作函式,還是有點兒理解不了window的概念,Meaning就不翻譯了,直接刪掉

Transformation                                                                              Meaning
window(windowLength, slideInterval)     
countByWindow(windowLength, slideInterval)     
reduceByWindow(func, windowLength, slideInterval)     
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])     
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])    
countByValueAndWindow(windowLength, slideInterval, [numTasks])     

Output Operations

Output Operation                                      Meaning
print()                                 列印到控制檯
foreachRDD(func)                        對Dstream裡面的每個RDD執行func,儲存到外部系統
saveAsObjectFiles(prefix, [suffix])     儲存流的內容為SequenceFile, 檔名 : "prefix-TIME_IN_MS[.suffix]".
saveAsTextFiles(prefix, [suffix])       儲存流的內容為文字檔案, 檔名 : "prefix-TIME_IN_MS[.suffix]".
saveAsHadoopFiles(prefix, [suffix])     儲存流的內容為hadoop檔案, 檔名 : "prefix-TIME_IN_MS[.suffix]".

Persistence

 Dstream中的RDD也可以呼叫persist()方法儲存在記憶體當中,但是基於window和state的操作,reduceByWindow,reduceByKeyAndWindow,updateStateByKey它們就是隱式的儲存了,系統已經幫它自動儲存了。

從網路接收的資料(such as, Kafka, Flume, sockets, etc.),預設是儲存在兩個節點來實現容錯性,以序列化的方式儲存在記憶體當中。

RDD Checkpointing

 狀態的操作是基於多個批次的資料的。它包括基於window的操作和updateStateByKey。因為狀態的操作要依賴於上一個批次的資料,所以它要根據時間,不斷累積元資料。為了清空資料,它支援週期性的檢查點,通過把中間結果儲存到hdfs上。因為檢查操作會導致儲存到hdfs上的開銷,所以設定這個時間間隔,要很慎重。對於小批次的資料,比如一秒的,檢查操作會大大降低吞吐量。但是檢查的間隔太長,會導致任務變大。通常來說,5-10秒的檢查間隔時間是比較合適的。

ssc.checkpoint(hdfsPath)  //設定檢查點的儲存位置
dstream.checkpoint(checkpointInterval)  //設定檢查點間隔

對於必須設定檢查點的Dstream,比如通過updateStateByKeyreduceByKeyAndWindow建立的Dstream,預設設定是至少10秒。

Performance Tuning

對於調優,可以從兩個方面考慮:

(1)利用叢集資源,減少處理每個批次的資料的時間

(2)給每個批次的資料量的設定一個合適的大小

Level of Parallelism

像一些分散式的操作,比如reduceByKey和reduceByKeyAndWindow,預設的8個併發執行緒,可以通過對應的函式提高它的值,或者通過修改引數spark.default.parallelism來提高這個預設值。

Task Launching Overheads

通過進行的任務太多也不好,比如每秒50個,傳送任務的負載就會變得很重要,很難實現壓秒級的時延了,當然可以通過壓縮來降低批次的大小。

Setting the Right Batch Size

要使流程式能在叢集上穩定的執行,要使處理資料的速度跟上資料流入的速度。最好的方式計算這個批量的大小,我們首先設定batch size為5-10秒和一個很低的資料輸入速度。確實系統能跟上資料的速度的時候,我們可以根據經驗設定它的大小,通過檢視日誌看看Total delay的多長時間。如果delay的小於batch的,那麼系統可以穩定,如果delay一直增加,說明系統的處理速度跟不上資料的輸入速度。

24/7 Operation

Spark預設不會忘記元資料,比如生成的RDD,處理的stages,但是Spark Streaming是一個24/7的程式,它需要週期性的清理元資料,通過spark.cleaner.ttl來設定。比如我設定它為600,當超過10分鐘的時候,Spark就會清楚所有元資料,然後持久化RDDs。但是這個屬性要在SparkContext 建立之前設定。

但是這個值是和任何的window操作繫結。Spark會要求輸入資料在過期之後必須持久化到記憶體當中,所以必須設定delay的值至少和最大的window操作一致,如果設定小了,就會報錯。

Monitoring

除了Spark內建的監控能力,還可以StreamingListener這個介面來獲取批處理的時間, 查詢時延, 全部的端到端的試驗。

Memory Tuning

Spark Stream預設的序列化方式是StorageLevel.MEMORY_ONLY_SER,而不是RDD的StorageLevel.MEMORY_ONLY

預設的,所有持久化的RDD都會通過被Spark的LRU演算法剔除出記憶體,如果設定了spark.cleaner.ttl,就會週期性的清理,但是這個引數設定要很謹慎。一個更好的方法是設定spark.streaming.unpersist為true,這就讓Spark來計算哪些RDD需要持久化,這樣有利於提高GC的表現。

推薦使用concurrent mark-and-sweep GC,雖然這樣會降低系統的吞吐量,但是這樣有助於更穩定的進行批處理。

Fault-tolerance Properties

Failure of a Worker Node

下面有兩種失效的方式:

1.使用hdfs上的檔案,因為hdfs是可靠的檔案系統,所以不會有任何的資料失效。

2.如果資料來源是網路,比如Kafka和Flume,為了防止失效,預設是資料會儲存到2個節點上,但是有一種可能性是接受資料的節點掛了,那麼資料可能會丟失,因為它還沒來得及把資料複製到另外一個節點。

Failure of the Driver Node

為了支援24/7不間斷的處理,Spark支援驅動節點失效後,重新恢復計算。Spark Streaming會週期性的寫資料到hdfs系統,就是前面的檢查點的那個目錄。驅動節點失效之後,StreamingContext可以被恢復的。

為了讓一個Spark Streaming程式能夠被回覆,它需要做以下操作:

(1)第一次啟動的時候,建立 StreamingContext,建立所有的streams,然後呼叫start()方法。

(2)恢復後重啟的,必須通過檢查點的資料重新建立StreamingContext。

下面是一個實際的例子:

通過StreamingContext.getOrCreate來構造StreamingContext,可以實現上面所說的。

// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
    val ssc = new StreamingContext(...)   // new context
    val lines = ssc.socketTextStream(...) // create DStreams
    ...
    ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
    ssc
}

// Get StreaminContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...

// Start the context
context.start()
context.awaitTermination()

在stand-alone的部署模式下面,驅動節點失效了,也可以自動恢復,讓別的驅動節點替代它。這個可以在本地進行測試,在提交的時候採用supervise模式,當提交了程式之後,使用jps檢視程序,看到類似DriverWrapper就殺死它,如果是使用YARN模式的話就得使用其它方式來重新啟動了。

這裡順便提一下向客戶端提交程式吧,之前總結的時候把這塊給落下了。

./bin/spark-class org.apache.spark.deploy.Client launch
   [client-options] 
   <cluster-url> <application-jar-url> <main-class> 
   [application-options]

cluster-url: master的地址.
application-jar-url: jar包的地址,最好是hdfs上的,帶上hdfs://...否則要所有的節點的目錄下都有這個jar的 
main-class: 要釋出的程式的main函式所在類. 
Client Options: 
--memory <count> (驅動程式的記憶體,單位是MB) 
--cores <count> (為你的驅動程式分配多少個核心) 
--supervise (節點失效的時候,是否重新啟動應用) 
--verbose (列印增量的日誌輸出)

在未來的版本,會支援所有的資料來源的可恢復性。

為了更好的理解基於HDFS的驅動節點失效恢復,下面用一個簡單的例子來說明:

Time     Number of lines in input file     Output without driver failure     Output with driver failure
1      10                     10                    10
2      20                     20                    20
3      30                     30                    30
4      40                     40                    [DRIVER FAILS] no output
5      50                     50                    no output
6      60                     60                    no output
7      70                     70                    [DRIVER RECOVERS] 40, 50, 60, 70
8      80                     80                    80
9      90                     90                    90
10     100                     100                   100

在4的時候出現了錯誤,40,50,60都沒有輸出,到70的時候恢復了,恢復之後把之前沒輸出的一下子全部輸出。