基於HDFS,Spark Stream的實時統計
阿新 • • 發佈:2019-02-15
最近在搞一個小功能,具體要求是:資料到了hdfs,然後統計。需求很簡,程式實現也挺簡單的,但是目錄有點複雜,如base目錄下面有/業務/省/yyyyMMdd/h/aa.txt檔案
如果是按照之前的約定的方式的話,是可以實現的,但是這個資料夾太複雜了,所以按照約定的方式來弄好像難度也挺複雜的,所以這種方法我放棄了。還有一種方案就是把檔案目錄放到kafka中,然後訂閱kafka的內容,取得了之後將引數傳程序序執行,但是這個方案的問題卡在瞭如何實時訂閱kafka的內容,而且把引數傳到spark程式中執行,而且肯定是使用shell來訂閱,然後獲取引數再提交給spark應用,沒人用過,暫時擱置了;還有一種方案就是使用spark stream 實時的從hdfs中監控,但是貌似這東西只能從監控hdfs的某個目錄下面的檔案,而監控不了某個目錄下面的子目錄。程式碼如下。但是不應該啊,為什麼這點功能都做不到,難道設計者都沒想到這個問題嗎?很簡單的需求啊。
public class HDFSWordCount {
private static String BASE="hdfs://hadoop0:9000/data/xx/yy/zz/";public static void main(String[] args) {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN);
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.ERROR);
SparkConf conf = new SparkConf()
.setMaster("local[2]")
.setAppName("HDFSWordCount");
// sc.textFile("hdfs://n1:8020/user/hdfs/input");
// sc.textFile("hdfs://hadoop0:9000/spark/");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));
// 首先,使用JavaStreamingContext的textFileStream()方法,針對HDFS目錄建立輸入資料流
JavaDStream<String> callLines = jssc.textFileStream(BASE+"oidd_call/");
// JavaDStream<String> smsLines = jssc.textFileStream(BASE+"oidd_sms/*/*/*/");
// JavaDStream<String> locationLines = jssc.textFileStream(BASE+"oidd_location/*/*/*/");
callLines.print();
// smsLines.print();
// locationLines.print();
jssc.start();
jssc.awaitTermination();
jssc.close();
}