1. 程式人生 > >spark streaming容錯實現

spark streaming容錯實現

如果Executor故障,所有未被處理的資料會丟失,解決辦法可以通過wal(hbase,hdfs/WAL)方式,將資料預先寫到hdfs或者s3

如果Driver故障,driver程式就會停止,所有executor都會失去丟失,停止計算過程,解決辦法需要配置和程式設計
1.配置diver程式自動重啟,使用特定的clustermanager實現,
2。重啟時從宕機的地方重啟。通過檢查點機制,就可以實現該功能。
//可以使用本地目錄或者hdfs
jsc.checkpoint(“d:…”)
不要使用new的方式來建立sparkstreamingcontext上下文物件,而是通過工廠的方式JavaStreamingContext().getOrCreate()方法來建立上下文物件,在建立上下文物件的時候,首先會檢查檢查點目錄,看是否job允許,沒有job允許就直接new新的。
程式碼如下:

public class JavaSparkStreamingWCApp {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setAppName("wc");
        conf.setMaster("local[4]");
        //建立spark應用上下文,間隔批次是2
        /**
         * 這裡為了做到容錯,不使用new的方式來建立streamingContext物件,
         * 而是通過工廠的方式JavaStreamingContext.getOrCreate()方式來建立,
         *這種方式建立的好處是:在啟動程式的時候回首先去檢查點目錄檢查
         * 如果有資料,就直接從資料中回覆,如果沒有資料,就直接new一個新的。
         * 實驗:一下程式碼實現了當driver正常執行的時候,宕機,
         * 重啟會自動從以前的checkpoint點拿取資料,重新計算。
         */
        Function0<JavaStreamingContext> contextFunction = new Function0<JavaStreamingContext>() {
            //首次建立context物件呼叫該方法
            public JavaStreamingContext call() throws Exception {
                JavaStreamingContext jsc = new JavaStreamingContext(conf, Seconds.apply(2));
                JavaReceiverInputDStream<String> socket = jsc.socketTextStream("mini4", 9999);
                /*變化的程式碼放到處這裡是一個視窗函式,視窗的長度是24小時,滑塊的間隔是2秒*/
                JavaDStream<Long> dsCount = socket.countByWindow(new Duration(24 * 60 * 60 * 1000), new Duration(2000));
                dsCount.print();
                //切記,這裡一定要寫檢查點
                jsc.checkpoint("g:/log/data/spark/ck/java_spark_streaming_wc");
                return jsc;
            }
        };
        //失敗重新連線的時候會經過檢查點。
        JavaStreamingContext jsc = JavaStreamingContext.getOrCreate("g:/log/data/spark/ck/java_spark_streaming_wc",contextFunction);

        jsc.start();
        try {
            jsc.awaitTermination();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

當Driver宕機後,重啟後會從checkpoint端自動獲取未讀取的資料資料。
這裡是驗證上面的程式碼,mini4上使用
加粗樣式
當輸入到8的時候將程式停止,然後mini4上不停的輸入資料,再次重啟還會從9以後開始讀取