SparkStreaming(二)入門案例
阿新 • • 發佈:2019-01-04
2、入門案例
2.1、計算單詞的數量
Java版本jdk.1.8以下:
public class WordCountOnline { public static void main(String[] args) throws InterruptedException { SparkConf conf = new SparkConf(); /* * 1、配置應用名稱以及配置兩個執行緒(注意要大於等於兩個執行緒) */ conf.setAppName("WordCountOnline").setMaster("local[2]"); /* *2、 建立SparkStreamingContext * 可以基於SparkConf引數,也可以基於持久化的SparkStreamingContext進行狀態恢復。 * 典型的場景是Driver崩潰後由於SparkStreaming具有連續不斷的24小時不間斷的執行,所以需要再Driver * 重現啟動後從上次執行的狀態恢復過來,此時的狀態需要基於曾經的CheckPoint。 */ JavaStreamingContext jssc = new JavaStreamingContext(conf,Durations.seconds(1)); /* * 3、建立SparkStreaming輸入資料來源 * a.資料輸入來源可以基於File,HDFS,Flume,Kafka,Socket等。 * b.在這裡我們指定資料來源於網路Socket埠,SparkStreaming連線上該埠,並在執行的時候一直監聽該埠的資料, * 並且後續根據業務需要不斷的有資料產生。 * c.如果經常在每隔5秒沒有資料就不斷啟動空的job其實是對資源的浪費,因為沒有接受到資料,仍然提交了job。 * 實際的做法是提交job會判斷是否有資料,如果沒有的話就不再提交job。 */ JavaReceiverInputDStream<String> lines = jssc.socketTextStream("local", 9999); /* * 4、我們就像對RDD程式設計一樣,基於DStream進行程式設計,原因是DStream是RDD產生的模板,在SparkStreaming發生計算之前,其實質 * 是把每個Batch的DStream的操作翻譯成為了RDD操作 */ //4.1、faltMap操作:將遍歷每一行,並且將每一行分割單詞返回String的Iterator JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { private static final long serialVersionUID = 1L; @Override public Iterable<String> call(String line) throws Exception { return Arrays.asList(line.split(",")); } }); //4.2、mapToPair操作:將每個單詞計數標記為1 JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String word) throws Exception { return new Tuple2<String, Integer>(word, 1); } }); //4.3、reduceByKey操作:將每個相同單詞的計數標記1相加 JavaPairDStream<String, Integer> word_count = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); /* * 4.4、print操作:此處的print方法並不會觸發job執行,因為目前程式碼還處於SparkStreaming框架的控制之下, * 具體是否觸發時取決於設定的Duration時間的間隔。 */ word_count.print(); /* * 5、開始計算:SparkStreaming引擎開始執行,也就是Driver開始執行,Driver啟動時位於一條執行緒中, * 當然內部當然還有訊息迴圈體,接收應用程式本身或者Executor傳送過來的訊息。 */ jssc.start(); //6、等待程式執行結束 jssc.awaitTermination(); } }
Java版本jdk1.8:可以使用lambda表示式簡化程式碼:
public class WordCount { public static void main(String[] args) throws InterruptedException { //1、建立一個帶有兩個執行執行緒的本地StreamingContext,並且設定流資料每批的間隔為1秒 /** * appName引數是應用程式在叢集UI上顯示的名稱。 * master是Spark,Mesos或YARN叢集URL,或者是在本地模式下執行的特殊"local[*]"字串。 * 實際上,當在叢集上執行時,不希望在程式中對master進行硬編碼,而是使用spark-submit啟動應用程式並在那裡接收它。 * 但是,對於本地測試和單元測試,您可以傳遞"local[*]"以在程序中執行Spark Streaming。 * 請注意,這會在內部建立一個JavaSparkContext(所有Spark功能的起點),可以作為ssc.sparkContext訪問。 */ SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount"); JavaStreamingContext jssc = new JavaStreamingContext(conf,Durations.seconds(1)); /** * 定義上下文後,需要執行以下操作: * 1.通過建立輸入DStreams來定義輸入源 * 2.通過將轉換和輸出操作應用於DStream來定義流式計算。 * 3.開始接收資料並使用streamingContext.start()處理它。 * 4.等待使用streamingContext.awaitTermination()停止處理(手動或由於任何錯誤)。 * 5.可以使用streamingContext.stop()手動停止處理。 * 要記住的要點: * 1.一旦啟動了上下文,就不能設定或新增新的流式計算。 * 2.上下文停止後,無法重新啟動。 * 3.在JVM中只能同時啟用一個StreamingContext。 * 4.StreamingContext上的stop()也會停止SparkContext。要僅停止StreamingContext,請將名為stopSparkContext的stop()的可選引數設定為false。 * 5.只要在建立下一個StreamingContext之前停止前一個StreamingContext(不停止SparkContext),就可以重複使用SparkContext來建立多個StreamingContexts。 */ //2、使用此context,我們可以建立一個DStream,它表示來自特定主機名(例如localhost)和埠(例如9999)TCP源的流資料。 JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999); //3、將每行文字以空格符切分成一個個單詞 JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator()); //4、計算每批單詞的數量 JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1)); JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((i1,i2) -> i1 + i2); wordCounts.print(); //5、開始計算 jssc.start(); //6、等待計算終止 jssc.awaitTermination(); } }
2.2、流式篩選並打印出包含”error”的行
public class WordFilter { public static void main(String[] args) throws InterruptedException { //建立一個Java版本的Spark Context SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("WordFilter"); //從SparkConf建立StreamingContext並指定1秒鐘的批處理大小 JavaStreamingContext jssc = new JavaStreamingContext(conf,Durations.seconds(1)); //以埠7777作為輸入來源建立DStream JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 7777); //從DStream中篩選出包含字串"error"的行 JavaDStream<String> errorLines = lines.filter(new Function<String,Boolean>(){ @Override public Boolean call(String line) throws Exception { return line.contains("error"); } }); //打印出有"error"的行 errorLines.print(); //啟動流計算環境StreamingContext並等待它"完成" jssc.start(); //等待作業完成 jssc.awaitTermination(); } }