Spark Streaming優雅的關閉策略優化
前面文章介紹了不少有關Spark Streaming的offset的管理以及如何優雅的關閉Spark Streaming的流程式。
到目前為止還有幾個問題:
(1)有關spark streaming整合kafka時,如果kafka新增分割槽, 那麼spark streaming程式能不能動態識別到而不用重啟?
(2)如果需要重啟,那麼在自己管理offset時,如何才能識別到新增的分割槽?
(3)spark streaming優雅關閉的策略還有那些?
首先第一個問題,如果kafka要新增分割槽,對於正在執行的實時流程式能否動態識別到?
經過測試,是不能識別的,我推測使用createDirectStream建立流物件一旦建立就是不可變的,也就是說建立例項那一刻的分割槽數量,會一直使用直到流程式結束,就算中間kafka的分割槽數量擴充套件了,流程式也是不能識別到的。所以在擴充套件kafka分割槽前,一定要先把流程式給停掉,然後擴充套件完成後需要再次重啟流程式。
然後看第二個問題,如果是我們自己管理offset時,一定要考慮到kafka擴充套件分割槽的情況,每次啟動程式前都得檢測下目前儲存的偏移量裡面的kafka的分割槽個數是否小於kafka實際元資料裡面實際的分割槽個數,正常沒擴充套件分割槽的情況下兩個值應該是相等的,如果值不一致,就說明是kafka分割槽得到擴充套件了,所以我們的程式需要能夠相容這種情況。
核心程式碼如下:
//這個topic在zk裡面最新的分割槽數量
val lastest_partitions= ZkUtils.getPartitionsForTopics(zkClient,Seq(topic)).get(topic).get
var offsets = offsetsRangesStr.split(",")//按逗號split成陣列
.map(s => s.split(":"))//按冒號拆分每個分割槽和偏移量
.map { case Array(partitionStr, offsetStr) => (TopicAndPartition(topic, partitionStr.toInt) -> offsetStr.toLong) }//加工成最終的格式
.toMap//返回一個Map
//說明有分割槽擴充套件了
if (offsets.size<lastest_partitions.size){
//得到舊的所有分割槽序號
val old_partitions=offsets.keys.map(p=>p.partition).toArray
//通過做差集得出來多的分割槽數量陣列
val add_partitions=lastest_partitions.diff(old_partitions)
if(add_partitions.size>0){
log.warn("發現kafka新增分割槽:"+add_partitions.mkString(","))
add_partitions.foreach(partitionId=>{
offsets += (TopicAndPartition(topic,partitionId)->0)
log.warn("新增分割槽id:"+partitionId+"新增完畢....")
})
}
}else{
log.warn("沒有發現新增的kafka分割槽:"+lastest_partitions.mkString(","))
}
上面的程式碼在每次啟動程式時,都會檢查當前我們自己管理的offset的分割槽數量與zk元資料裡面實際的分割槽數量,如果不一致就會把新增的分割槽id給加到TopicAndPartition裡面並放入到Map物件裡面,這樣在啟動前就會傳入到createDirectStream物件中,就能相容新增的分割槽了。
最後一個問題,前面的文章談到過我們可以有兩種方式來更加優雅的停止流程式,分別是通過http暴露服務,和通過HDFS做訊息中轉來定時掃描mark檔案是否存在來觸發關閉服務。
下面我們先來看下通過http暴露服務的核心程式碼:
/****
* 負責啟動守護的jetty服務
* @param port 對外暴露的埠號
* @param ssc Stream上下文
*/
def daemonHttpServer(port:Int,ssc: StreamingContext)={
val server=new Server(port)
val context = new ContextHandler();
context.setContextPath( "/close" );
context.setHandler( new CloseStreamHandler(ssc) )
server.setHandler(context)
server.start()
}
/*** 負責接受http請求來優雅的關閉流
* @param ssc Stream上下文
*/
class CloseStreamHandler(ssc:StreamingContext) extends AbstractHandler {
override def handle(s: String, baseRequest: Request, req: HttpServletRequest, response: HttpServletResponse): Unit ={
log.warn("開始關閉......")
ssc.stop(true,true)//優雅的關閉
response.setContentType("text/html; charset=utf-8");
response.setStatus(HttpServletResponse.SC_OK);
val out = response.getWriter();
out.println("close success");
baseRequest.setHandled(true);
log.warn("關閉成功.....")
}
}
然後在來看下另一種方式掃描HDFS檔案的方式:
/***
* 通過一個訊息檔案來定時觸發是否需要關閉流程式
* @param ssc StreamingContext
*/
def stopByMarkFile(ssc:StreamingContext):Unit= {
val intervalMills = 10 * 1000 // 每隔10秒掃描一次訊息是否存在
var isStop = false
val hdfs_file_path = "/spark/streaming/stop" //判斷訊息檔案是否存在,如果存在就
while (!isStop) {
isStop = ssc.awaitTerminationOrTimeout(intervalMills)
if (!isStop && isExistsMarkFile(hdfs_file_path)) {
log.warn("2秒後開始關閉sparstreaming程式.....")
Thread.sleep(2000)
ssc.stop(true, true)
}
}
}
/***
* 判斷是否存在mark file
* @param hdfs_file_path mark檔案的路徑
* @return
*/
def isExistsMarkFile(hdfs_file_path:String):Boolean={
val conf = new Configuration()
val path=new Path(hdfs_file_path)
val fs =path.getFileSystem(conf);
fs.exists(path)
}
上面是兩種方式的核心程式碼,最後提下觸發停止流程式:
第一種需要在啟動服務的機器上,執行下面封裝的指令碼:
## tx.log是提交spark任務後的輸出log重定向的log
## &> tx.log &
#!/bin/bash
driver=`cat tx.log | grep ApplicationMaster | grep -Po '\d+.\d+.\d+.\d+'`
echo $driver
curl http://$driver:port/close/
echo "stop finish"
第二種方式,找到一個擁有HDFS客戶端機器,向HDFS上寫入指定的檔案:
#生成檔案後,10秒後程序就會自動停止
hadoop fs -touch /spark/streaming/stop
#下次啟動前,需要清空這個檔案,否則程式啟動後就會停止
hadoop fs -rm -r /spark/streaming/stop