1. 程式人生 > >java8實現spark streaming的wordcount

java8實現spark streaming的wordcount

概念這裡就不說了,從案例開始,慣例,hellowrod,哦不,wordcount。 
要計算從一個監聽 TCP socket 的資料伺服器接收到的文字資料(text data)中的字數。 
主體程式碼部分跟spark相差不大,畢竟DStream是RDD產生的模板(或者說類)。

1.匯入了 Spark Streaming 類

 <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>2.0.0</version>
    </dependency>

2.程式碼示例

//注意本地除錯,master必須為local[n],n>1,表示一個執行緒接收資料,n-1個執行緒處理資料
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("streaming word count");
JavaSparkContext sc = new JavaSparkContext(conf);
//設定日誌執行級別
sc.setLogLevel("WARN");
JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(1));
//建立一個將要連線到hostname:port 的離散流
JavaReceiverInputDStream<String> lines = 
ssc.socketTextStream("master1", 9999);
JavaPairDStream<String, Integer> counts = 
        lines.flatMap(x->Arrays.asList(x.split(" ")).iterator())
        .mapToPair(x -> new Tuple2<String, Integer>(x, 1))
        .reduceByKey((x, y) -> x + y);

// 在控制檯打印出在這個離散流(DStream)中生成的每個 RDD 的前十個元素
counts.print();
// 啟動計算
ssc.start();
ssc.awaitTermination();

3.建立服務端 

找臺linux伺服器,執行netcat小工具: 
nc -lk 9999 
也就是上面程式碼裡socketTextStream的引數.

4.執行測試 
本地啟動java程式碼後,控制檯會迴圈列印時間戳。 
在nc那邊隨意輸入,本地即可實時看到統計結果。 
這裡寫圖片描述