1. 程式人生 > >基於HDFS,Spark Stream的實時統計

基於HDFS,Spark Stream的實時統計

       最近在搞一個小功能,具體要求是:資料到了hdfs,然後統計。需求很簡,程式實現也挺簡單的,但是目錄有點複雜,如base目錄下面有/業務/省/yyyyMMdd/h/aa.txt檔案

如果是按照之前的約定的方式的話,是可以實現的,但是這個資料夾太複雜了,所以按照約定的方式來弄好像難度也挺複雜的,所以這種方法我放棄了。還有一種方案就是把檔案目錄放到kafka中,然後訂閱kafka的內容,取得了之後將引數傳程序序執行,但是這個方案的問題卡在瞭如何實時訂閱kafka的內容,而且把引數傳到spark程式中執行,而且肯定是使用shell來訂閱,然後獲取引數再提交給spark應用,沒人用過,暫時擱置了;還有一種方案就是使用spark stream 實時的從hdfs中監控,但是貌似這東西只能從監控hdfs的某個目錄下面的檔案,而監控不了某個目錄下面的子目錄。程式碼如下。但是不應該啊,為什麼這點功能都做不到,難道設計者都沒想到這個問題嗎?很簡單的需求啊。

public class HDFSWordCount {

    private static String BASE="hdfs://hadoop0:9000/data/xx/yy/zz/";
    public static void main(String[] args) {
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN);
        Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.ERROR);
        SparkConf conf = new SparkConf()
                .setMaster("local[2]")
                .setAppName("HDFSWordCount");  
//        sc.textFile("hdfs://n1:8020/user/hdfs/input");
//        sc.textFile("hdfs://hadoop0:9000/spark/");
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));
        
        // 首先,使用JavaStreamingContext的textFileStream()方法,針對HDFS目錄建立輸入資料流
        JavaDStream<String> callLines = jssc.textFileStream(BASE+"oidd_call/");
//        JavaDStream<String> smsLines = jssc.textFileStream(BASE+"oidd_sms/*/*/*/");
//        JavaDStream<String> locationLines = jssc.textFileStream(BASE+"oidd_location/*/*/*/");

        callLines.print();
//        smsLines.print();
//        locationLines.print();
        jssc.start();
        jssc.awaitTermination();
        jssc.close();
    }