java8實現spark streaming的wordcount
阿新 • • 發佈:2018-10-31
概念這裡就不說了,從案例開始,慣例,hellowrod,哦不,wordcount。
要計算從一個監聽 TCP socket 的資料伺服器接收到的文字資料(text data)中的字數。
主體程式碼部分跟spark相差不大,畢竟DStream是RDD產生的模板(或者說類)。
1.匯入了 Spark Streaming 類
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.0.0</version> </dependency>
2.程式碼示例
//注意本地除錯,master必須為local[n],n>1,表示一個執行緒接收資料,n-1個執行緒處理資料 SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("streaming word count"); JavaSparkContext sc = new JavaSparkContext(conf); //設定日誌執行級別 sc.setLogLevel("WARN"); JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(1)); //建立一個將要連線到hostname:port 的離散流 JavaReceiverInputDStream<String> lines = ssc.socketTextStream("master1", 9999); JavaPairDStream<String, Integer> counts = lines.flatMap(x->Arrays.asList(x.split(" ")).iterator()) .mapToPair(x -> new Tuple2<String, Integer>(x, 1)) .reduceByKey((x, y) -> x + y); // 在控制檯打印出在這個離散流(DStream)中生成的每個 RDD 的前十個元素 counts.print(); // 啟動計算 ssc.start(); ssc.awaitTermination();
3.建立服務端
找臺linux伺服器,執行netcat小工具: nc -lk 9999
也就是上面程式碼裡socketTextStream的引數.
4.執行測試
本地啟動java程式碼後,控制檯會迴圈列印時間戳。
在nc那邊隨意輸入,本地即可實時看到統計結果。