Spark Streaming優雅的停止服務
暴力停掉sparkstreaming是有可能出現問題的,比如你的資料來源是kafka,已經載入了一批資料到sparkstreaming中正在處理,如果中途停掉,這個批次的資料很有可能沒有處理完,就被強制stop了,下次啟動時候會重複消費或者部分資料丟失。
如何解決呢?
一、如果只是本地並且一次性的測試,只需要在SparkConf裡面設定下面的引數即可
val conf = new SparkConf()conf.setAppName("ts-bigdata-v2")
.set("spark.streaming.stopGracefullyOnShutdown","true")
.setMaster("local[*]")
二、如果是部署到叢集上的程式可以通過以下方式去解決:
主要思路:使用HDFS系統做訊息通知,
在驅動程式中,加一段程式碼,這段程式碼的作用每隔一段時間可以是10秒也可以是3秒,掃描HDFS上某一個檔案,如果發現這個檔案存在,就呼叫StreamContext物件stop方法,自己優雅的終止自己,其實這裡HDFS可以換成redis,zk,hbase,db都可以,這裡唯一的問題就是依賴了外部的一個儲存系統來達到訊息通知的目的,如果使用了這種方式後。停止流程式就比較簡單了,登入上有hdfs客戶端的機器,然後touch一個空檔案到指定目錄,然後等到間隔的掃描時間到之後,發現有檔案存在,就知道需要關閉程式了。
具體實現如下:
def main(args:Array[String]){
val conf = new SparkConf()
conf.setAppName("ts-bigdata-v2")
.setMaster("local[*])
val sc=new SparkContext(conf)
val ssc=new StreamingContext(sc,Seconds(5))
........
ssc.start()
//增加的程式,每5秒檢測指定hdfs檔案有無檔案,如果存在則呼叫stop介面去停止程序
stopByMarkFile(ssc)
ssc.awaitTermination()
ssc.stop()
}
/**
* 每隔一段時間檢查是否存在stop程序的檔案,如果存在則結束該程序。
* 前提:
* 會在所有訊息處理完成後結束程序
* @param ssc
*/
def stopByMarkFile(ssc:StreamingContext):Unit= {
val intervalMills = 5 * 1000 // 每隔30秒掃描一次訊息是否存在
var isStop = false
val master = "hdfs://IP:埠"
val path = "/user/new_user/stop" //判斷訊息檔案是否存在,如果存在就
while (!isStop) {
isStop = ssc.awaitTerminationOrTimeout(intervalMills)
if (!isStop && isExistsMarkFile(master+path)) {
println("5秒後開始關閉sparstreaming程式.....")
Thread.sleep(30000)
ssc.stop(true, true)
delHdfsFile(master,path)
}else{
println("***********未檢測到有停止訊號*****************")
}
}
}
/**
* 判斷檔案是否存在
* @param hdfs_file_path
* @return
*/
def isExistsMarkFile(hdfs_file_path:String):Boolean={
val conf = new Configuration()
val path=new Path(hdfs_file_path)
val fs =path.getFileSystem(conf)
fs.exists(path)
}
如何測試:
可以寫一個簡單的介面程式,去生成“"hdfs://IP:埠/user/new_user/stop”檔案。
檔案生成後,可以觀察上面程式的程序。
問題解決