Spark實時流計算Java案例
阿新 • • 發佈:2019-01-10
現在,網上基於spark的程式碼基本上都是Scala,很多書上也都是基於Scala,沒辦法,誰叫spark是Scala寫出來的了,但是我現在還沒系統的學習Scala,所以只能用java寫spark程式了,spark支援java,而且Scala也基於JVM,不說了,直接上程式碼
這是官網上給出的例子,大資料學習中經典案例單詞計數
在linux下一個終端 輸入 $ nc -lk 9999
然後執行下面的程式碼
package com.tg.spark.stream;
import java.util.Arrays;
import org.apache.spark.*;
import org.apache .spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;
/**
*
* @author 湯高
*
*/
public class SparkStream {
public static void main(String[] args) {
// Create a local StreamingContext with two working thread and batch
// interval of 1 second
SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("NetworkWordCount").set("spark.testing.memory",
"2147480000");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
System.out.println (jssc);
// Create a DStream that will connect to hostname:port, like
// localhost:9999
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("master", 9999);
//JavaDStream<String> lines = jssc.textFileStream("hdfs://master:9000/stream");
// Split each line into words
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String x) {
System.out.println(Arrays.asList(x.split(" ")).get(0));
return Arrays.asList(x.split(" "));
}
});
// Count each word in each batch
JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
});
System.out.println(pairs);
JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
// Print the first ten elements of each RDD generated in this DStream to
// the console
wordCounts.print();
//wordCounts.saveAsHadoopFiles("hdfs://master:9000/testFile/", "spark", new Text(), new IntWritable(), JavaPairDStream<Text,IntWritable>());
wordCounts.dstream().saveAsTextFiles("hdfs://master:9000/testFile/", "spark");
//wordCounts.saveAsHadoopFiles("hdfs://master:9000/testFile/", "spark",Text,IntWritable);
//System.out.println(wordCounts.count());
jssc.start();
//System.out.println(wordCounts.count());// Start the computation
jssc.awaitTermination(); // Wait for the computation to terminate
}
}
然後再剛剛的終端輸入 hello world
# TERMINAL 1:
# Running Netcat
$ nc -lk 9999
hello world
就可以通過控制檯看到
-------------------------------------------
Time: 1357008430000 ms
-------------------------------------------
(hello,1)
(world,1)
...
並且hdfs上也可以看到通過計算生成的實時檔案
第二個案例是,不是通過socketTextStream套接字,而是直接通過hdfs上的某個檔案目錄來作為輸入資料來源
package com.tg.spark.stream;
import java.util.Arrays;
import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;
/**
*
* @author 湯高
*
*/
public class SparkStream2 {
public static void main(String[] args) {
// Create a local StreamingContext with two working thread and batch
// interval of 1 second
SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("NetworkWordCount").set("spark.testing.memory",
"2147480000");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
System.out.println(jssc);
// Create a DStream that will connect to hostname:port, like
// localhost:9999
//JavaReceiverInputDStream<String> lines = jssc.socketTextStream("master", 9999);
JavaDStream<String> lines = jssc.textFileStream("hdfs://master:9000/stream");
// Split each line into words
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String x) {
System.out.println(Arrays.asList(x.split(" ")).get(0));
return Arrays.asList(x.split(" "));
}
});
// Count each word in each batch
JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
});
System.out.println(pairs);
JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
// Print the first ten elements of each RDD generated in this DStream to
// the console
wordCounts.print();
//wordCounts.saveAsHadoopFiles("hdfs://master:9000/testFile/", "spark", new Text(), new IntWritable(), JavaPairDStream<Text,IntWritable>());
wordCounts.dstream().saveAsTextFiles("hdfs://master:9000/testFile/", "spark");
//wordCounts.saveAsHadoopFiles("hdfs://master:9000/testFile/", "spark",Text,IntWritable);
//System.out.println(wordCounts.count());
jssc.start();
//System.out.println(wordCounts.count());// Start the computation
jssc.awaitTermination(); // Wait for the computation to terminate
}
}
這樣就存在埠一直在監控你的那個目錄,只要它有檔案生成,就會馬上讀取到它裡面的內容,你可以先執行程式,然後手動新增一個檔案到剛剛的目錄,就可以看到輸出結果了