Spark Streaming開發入門——WordCount(Java&Scala)
阿新 • • 發佈:2019-02-01
一、Java方式開發
1、開發前準備
假定您以搭建好了Spark叢集。
2、開發環境採用eclipse maven工程,需要新增Spark Streaming依賴。
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.6.0</version>
</dependency>
3、Spark streaming 基於Spark Core進行計算,需要注意事項:
1.local模式的話,local後必須為大於等於2的數字【即至少2條執行緒】。因為receiver 佔了一個。
因為,SparkStreaming 在執行的時候,至少需要一條執行緒不斷迴圈接收資料,而且至少有一條執行緒處理接收的資料。
如果只有一條執行緒的話,接受的資料不能被處理。
溫馨提示:
對於叢集而言,每隔exccutor一般肯定不只一個Thread,那對於處理Spark Streaming應用程式而言,每個executor一般分配多少core比較合適?根據我們過去的經驗,5個左右的core是最佳的(段子:分配為奇數個core的表現最佳,例如:分配3個、5個、7個core等)
接下來,讓我們開始動手寫寫Java程式碼吧!
第一步:
SparkConf conf = new SparkConf().setMaster("spark://Master:7077").setAppName("wordCountOnline");
第二步:
我們採用基於配置檔案的方式建立SparkStreamingContext物件
/**
* 第二步:建立SparkStreamingContext,這個是SparkStreaming應用程式所有功能的起始點和程式排程的核心
* SparkStreamingContext的構建可以基於SparkConf引數,也可基於持久化的SparkStreamingContext的內容來回復過來
* (典型的場景是Driver崩潰後重新啟動,由於Spark Streaming具有連續7*24小時不間斷執行的特徵,
* 所有需要再Drver重新啟動後繼續上的狀態,此時的狀態恢復需要基於曾經的Checkpoint)
*
*
* 2.在一個SparkStreaming應用程式中可以有多個SparkStreamingContext物件,使用下一個SparkStreaming程式之前
* 需要把前面正在執行的SparkStreamingContext物件關閉掉,由此,我們獲得一個啟發:
*
* SparkStreaming框架也 就是Spark Core上的一個應用程式。
* 只不過需要Spark工程師寫業務邏輯程式碼。
* 所以要掌握好Spark,學好SparkStreaming 就行了。
*
* java虛擬機器在os角度上看就是一個簡單的c。c++應用程式。
*
* 這裡可以用工廠方法建立ssc
*
*
*/
JavaStreamingContext jsc = new JavaStreamingContext(conf,Durations.seconds(5));
第三步,建立Spark Streaming輸入資料來源:
我們將資料來源配置為本地埠9999(注意埠要求沒有被佔用):
/**
* 第三步:建立Spark Streaming 輸入資料來源:input Stream
* 1.資料輸入來源可以基於File、HDFS、Flume、Kafka、Socket
*
* 2.在這裡我們指定資料來源於網路Socket埠,
* Spark Streaming連線上該埠並在執行的時候一直監聽該埠的資料(當然該埠服務首先必須存在,並且在後續會根據業務需要不斷的有資料產生)。
*
* 有資料和沒有資料 在處理邏輯上沒有影響(在程式看來都是資料)
*
* 3.如果經常在每間隔5秒鐘 沒有資料的話,不斷的啟動空的job其實是會造成排程資源的浪費。因為 並沒有資料需要發生計算。
* 所以企業級生產環境的程式碼在具體提交Job前會判斷是否有資料,如果沒有的話,就不再提交Job;
*
*
*/
JavaReceiverInputDStream<String> lines = jsc.socketTextStream("Master", 9999);
第四步:我們就像對RDD程式設計一樣,基於DStream進行程式設計,原因是DStream是RDD產生的模板,在Spark Streaming發生計算前,其實質是把每個Batch的DStream的操作翻譯成為了RDD操作。DStream對RDD進行了一次抽象。如同DataFrame對RDD進行一次抽象。JavaRDD —-> JavaDStream
1、flatMap操作:
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String line) throws Exception {
// TODO Auto-generated method stub
return Arrays.asList(line.split(" "));
}
});
2、 mapToPair操作:
JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Integer> call(String word) throws Exception {
// TODO Auto-generated method stub
return new Tuple2<String,Integer>(word,1);
}
});
3、reduceByKey操作:
JavaPairDStream<String, Integer> wordsCount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
// TODO Auto-generated method stub
return v1 + v2;
}
});
4、print等操作:
/**
* 此處的print並不會直接觸發Job的執行,因為現在的一切都是在Spark Streaming框架的控制之下的。
* 具體是否真正觸發Job的執行是基於設定的Duration時間間隔的觸發的。
*
*
* Spark應用程式要想執行具體的Job對DStream就必須有output Stream操作。
* ouput Stream 有很多型別的函式觸發,類print、savaAsTextFile、saveAsHadoopFiles等,最為重要的是foreachRDD,因為Spark Streamimg處理的結果一般都會
* 放在Redis、DB、DashBoard等上面,foreachRDD主要
* 就是用來 完成這些功能的,而且可以隨意的自定義具體資料放在哪裡。
*
*/
wordsCount.print();
jsc.start();
jsc.awaitTermination();
jsc.close();
溫馨提示:
除了print()方法將處理後的資料輸出之外,還有其他的方法也非常重要,在開發中需要重點掌握,比如SaveAsTextFile,SaveAsHadoopFile等,最為重要的是foreachRDD方法,這個方法可以將資料寫入Redis,DB,DashBoard等,甚至可以隨意的定義資料放在哪裡,功能非常強大。
二、Scala方式開發
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.storage.StorageLevel
/**
* @author lxh
*/
object WordCountOnline {
def main(args: Array[String]): Unit = {
/**
* 建立SparkConf
*/
val conf = new SparkConf().setMaster("spark://Master:7077").setAppName("wordcountonline")
val ssc = new StreamingContext(conf,Seconds(1))
val lines = ssc.socketTextStream("Master", 9999, StorageLevel.MEMORY_AND_DISK)
/**
* 按空格分割
*/
val words = lines.flatMap { line => line.split(" ") }
/**
* 把單個的word變成tuple
*/
val wordCount = words.map { word => (word,1) }
/**
* (key1,1) (key1,1)
* key相同的累加。
*/
wordCount.reduceByKey(_+_)
wordCount.print()
ssc.start()
/**
* 等待程式結束
*/
ssc.awaitTermination()
ssc.stop(true)
}
}