1. 程式人生 > >Spark Streaming優雅的關閉策略優化

Spark Streaming優雅的關閉策略優化

前面文章介紹了不少有關Spark Streaming的offset的管理以及如何優雅的關閉Spark Streaming的流程式。

到目前為止還有幾個問題:

(1)有關spark streaming整合kafka時,如果kafka新增分割槽, 那麼spark streaming程式能不能動態識別到而不用重啟?

(2)如果需要重啟,那麼在自己管理offset時,如何才能識別到新增的分割槽?

(3)spark streaming優雅關閉的策略還有那些?

首先第一個問題,如果kafka要新增分割槽,對於正在執行的實時流程式能否動態識別到?

經過測試,是不能識別的,我推測使用createDirectStream建立流物件一旦建立就是不可變的,也就是說建立例項那一刻的分割槽數量,會一直使用直到流程式結束,就算中間kafka的分割槽數量擴充套件了,流程式也是不能識別到的。所以在擴充套件kafka分割槽前,一定要先把流程式給停掉,然後擴充套件完成後需要再次重啟流程式。

然後看第二個問題,如果是我們自己管理offset時,一定要考慮到kafka擴充套件分割槽的情況,每次啟動程式前都得檢測下目前儲存的偏移量裡面的kafka的分割槽個數是否小於kafka實際元資料裡面實際的分割槽個數,正常沒擴充套件分割槽的情況下兩個值應該是相等的,如果值不一致,就說明是kafka分割槽得到擴充套件了,所以我們的程式需要能夠相容這種情況。

核心程式碼如下:

//這個topic在zk裡面最新的分割槽數量
        val  lastest_partitions= ZkUtils.getPartitionsForTopics(zkClient,Seq(topic)).get(topic).get
var offsets = offsetsRangesStr.split(",")//按逗號split成陣列 .map(s => s.split(":"))//按冒號拆分每個分割槽和偏移量 .map { case Array(partitionStr, offsetStr) => (TopicAndPartition(topic, partitionStr.toInt) -> offsetStr.toLong) }//加工成最終的格式 .toMap//返回一個Map //說明有分割槽擴充套件了 if
(offsets.size<lastest_partitions.size){ //得到舊的所有分割槽序號 val old_partitions=offsets.keys.map(p=>p.partition).toArray //通過做差集得出來多的分割槽數量陣列 val add_partitions=lastest_partitions.diff(old_partitions) if(add_partitions.size>0){ log.warn("發現kafka新增分割槽:"+add_partitions.mkString(",")) add_partitions.foreach(partitionId=>{ offsets += (TopicAndPartition(topic,partitionId)->0) log.warn("新增分割槽id:"+partitionId+"新增完畢....") }) } }else{ log.warn("沒有發現新增的kafka分割槽:"+lastest_partitions.mkString(",")) }

上面的程式碼在每次啟動程式時,都會檢查當前我們自己管理的offset的分割槽數量與zk元資料裡面實際的分割槽數量,如果不一致就會把新增的分割槽id給加到TopicAndPartition裡面並放入到Map物件裡面,這樣在啟動前就會傳入到createDirectStream物件中,就能相容新增的分割槽了。

最後一個問題,前面的文章談到過我們可以有兩種方式來更加優雅的停止流程式,分別是通過http暴露服務,和通過HDFS做訊息中轉來定時掃描mark檔案是否存在來觸發關閉服務。

下面我們先來看下通過http暴露服務的核心程式碼:

/****
    * 負責啟動守護的jetty服務
    * @param port 對外暴露的埠號
    * @param ssc Stream上下文
    */
  def daemonHttpServer(port:Int,ssc: StreamingContext)={
    val server=new Server(port)
    val context = new ContextHandler();
    context.setContextPath( "/close" );
    context.setHandler( new CloseStreamHandler(ssc) )
    server.setHandler(context)
    server.start()
  }

  /*** 負責接受http請求來優雅的關閉流
    * @param ssc  Stream上下文
    */
  class CloseStreamHandler(ssc:StreamingContext) extends AbstractHandler {
    override def handle(s: String, baseRequest: Request, req: HttpServletRequest, response: HttpServletResponse): Unit ={
      log.warn("開始關閉......")
      ssc.stop(true,true)//優雅的關閉
      response.setContentType("text/html; charset=utf-8");
      response.setStatus(HttpServletResponse.SC_OK);
      val out = response.getWriter();
      out.println("close success");
      baseRequest.setHandled(true);
      log.warn("關閉成功.....")
    }
  }

然後在來看下另一種方式掃描HDFS檔案的方式:

/***
    * 通過一個訊息檔案來定時觸發是否需要關閉流程式
    * @param ssc StreamingContext
    */
  def stopByMarkFile(ssc:StreamingContext):Unit= {
    val intervalMills = 10 * 1000 // 每隔10秒掃描一次訊息是否存在
    var isStop = false
    val hdfs_file_path = "/spark/streaming/stop" //判斷訊息檔案是否存在,如果存在就
    while (!isStop) {
      isStop = ssc.awaitTerminationOrTimeout(intervalMills)
      if (!isStop && isExistsMarkFile(hdfs_file_path)) {
        log.warn("2秒後開始關閉sparstreaming程式.....")
        Thread.sleep(2000)
        ssc.stop(true, true)
      }

    }
  }

    /***
      * 判斷是否存在mark file
      * @param hdfs_file_path  mark檔案的路徑
      * @return
      */
    def isExistsMarkFile(hdfs_file_path:String):Boolean={
      val conf = new Configuration()
      val path=new Path(hdfs_file_path)
      val fs =path.getFileSystem(conf);
      fs.exists(path)
    }

上面是兩種方式的核心程式碼,最後提下觸發停止流程式:

第一種需要在啟動服務的機器上,執行下面封裝的指令碼:

## tx.log是提交spark任務後的輸出log重定向的log 
## &> tx.log &  

 #!/bin/bash
    driver=`cat tx.log | grep ApplicationMaster | grep -Po '\d+.\d+.\d+.\d+'`

    echo $driver

    curl http://$driver:port/close/

  echo "stop finish"

第二種方式,找到一個擁有HDFS客戶端機器,向HDFS上寫入指定的檔案:

#生成檔案後,10秒後程序就會自動停止
hadoop fs -touch /spark/streaming/stop

#下次啟動前,需要清空這個檔案,否則程式啟動後就會停止
hadoop fs -rm -r /spark/streaming/stop