使用Java實現在單機上統計單詞的數目
阿新 • • 發佈:2019-01-22
準備
首先在IDE(eclipse或者IntelliJ IDEA)上面建立一個Java專案。匯入F:\spark\spark-1.6.0-bin-hadoop2.6\lib\spark-assembly-1.6.0-hadoop2.6.0.jar
這個jar包。
然後我們就可以開始寫程式啦!
new一個class,比如取名就叫WordCount
吧,生成main
函式。
在main
函式中的程式碼生成步驟如下:
第一步
建立SparkConf 物件,並設定spark應用的一些資訊
- setAppName 設定應用名稱
- setMaster 設定Spark應用程式要連線的Spark叢集的master結點的url,如果設定為
local
SparkConf conf = new SparkConf()
.setAppName("WordCountApp")
.setMaster("local");
第二步
建立Java版本的SparkContext
SparkContext是Spark所有功能的一個入口
在不同型別的Spark應用程式中,使用的SparkContext是不同的:
- Java:JavaSparkContext
- scala:SparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
第三步
- 讀取我們輸入的資料,資料來自資料來源(本地檔案或者hdfs檔案等等),建立一個RDD。
- 資料來源中的資料會被打亂,然後而被分配到每個RDD的partitaon中去,從而形成一個初始的分散式資料集。
- 下面程式中的
textFile()
方法是用於根據資料來源來建立RDD(在Java中,建立的普通RDD被稱為JavaRDD
)。 - 如果是本地檔案或者hdfs檔案,則RDD中的每一個元素相當於檔案裡面的一行。
注:RDD:彈性分散式資料集,是Spark對分散式資料和計算的基本抽象。
JavaRDD<String> lines = sc.textFile("E:/pythonProject/sparktest.txt" );//這個來自檔案
第四步
將每行切分為單詞
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
private static final long serialVersionUID = 1L;
@Override
public Iterable<String> call(String line) throws Exception {
return Arrays.asList(line.split(" "));
}
});
第五步
轉化為鍵值對並計算
- 對映為
(單詞,1)
這種形式,這樣在後面才能根據單詞作為key,來計算每個單詞出現的次數。 - mapToPair()方法:將每個元素轉化為
Tuple2
型別的形式。 - 下面程式中
JavaPairRDD<String, Integer>
中的引數是Tuple2的鍵值對型別。 - 下面程式中的
PairFunction<String, String, Integer>()
中的引數分別代表:輸入引數,Tuple2的鍵值對型別。
JavaPairRDD<String, Integer> pairs = words.mapToPair(
new PairFunction<String, String, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Integer> call(String word) throws Exception {
return new Tuple2<String, Integer>(word, 1);
}
});
第六步
統計每個單詞出現的次數
- reduceByKey():通過key的值來減少
JavaPairRDD<String, Integer>
中元素的個數。 - call():對相同key值的元素的value進行操作。如下面的程式
return v1 + v2;
就是將value的值相加,從而實現對單詞的計數。
JavaPairRDD<String, Integer> wordCounts = pairs.reduceByKey(
new Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
第七部
輸出統計結果
- 之前我們進行的
flatMap
,mapToPair
,reduceByKey
操作被稱為transformation操作。 - 在一個Spark應用程式中,僅僅有transformation操作是不夠的,還需要有action操作來觸發程式的執行。
- 下面程式的foreach方法就是一種action操作,來觸發程式執行。
wordCounts.foreach(new VoidFunction<Tuple2<String,Integer>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Tuple2<String, Integer> wordCount) throws Exception {
System.out.println("單詞 "+wordCount._1 + "出現 " + wordCount._2 + " 次。");
}
});
第八步
關閉JavaSparkContext
sc.close();
總結
以上就是一個用Java實現的Spark應用程式,用來統計一個檔案中每個單詞出現的次數的程式了!