Hadoop 用Java編寫MapReduce詞頻統計程式並提交到Hadoop叢集執行
一、MapReduce介紹
MapReduce是一個分散式計算框架
,可以部署在Hadoop、Spark等大資料平臺上,實現海量資料的平行計算。它採用“分而治之
”的思想,將一個計算任務交給叢集中的多臺機器共同完成,之後再彙總成最終結果。
一般來說讀取一個TB,PB級的檔案,普通計算機的速度是比較慢的,而要想提高速度就要提高計算機的硬體配置,這對於普通使用者來說是很難做到的,也提高了這一領域的門檻。而採用廉價的機器組成分散式系統,只要叢集的機器數量足夠多,那麼計算的速度就會足夠快。
二、MapReduce特點
優點
(1)易於程式設計
完全獨立完成一個MapReduce程式是一個很困難的事情,這需要很強的程式設計能力。好在MapReduce給我們提供了大量的方便開發的介面,我們只需要繼承一些介面,實現一些特定的函式就能完成一個MapReduce程式。
(2)高拓展性
這是一個分散式計算框架,我們可以簡單粗暴的,通過增加機器來提高計算效能。
(3)高容錯性
由於MapReduce叢集採用的大多是廉價的機器,宕機,BUG等都是家常便飯。但MapReduce框架提供多種有效的錯誤檢測和恢復機制。如果一個結點出現了問題,其他結點會接替這個結點的工作,等結點恢復正常後,又可以繼續工作,這些都由Hadoop內部完成。
(4)高吞吐量
MapReduce可以對PB級以上也就是1024TB的資料進行離線計算。
缺點
(1)難以實時計算
MapReduce處理的是磁碟上的資料。
(2)不能流式計算
MapReduce處理的是磁碟上的靜態資料,而流式計算的輸入資料的動態的。
(3)難以用於DAG計算
DAG(有向無環圖)多個任務間存在依賴關係,後一個應用的輸入可能是前一個應用的輸出。而MapReduce的輸出結果都會寫在磁碟上,這會造成大量的磁碟IO,降低叢集的效能。
三、MapReduce程式設計
以詞頻統計程式為例
pom依賴
去https://mvnrepository.com/尋找以下幾個依賴
hadoop-common、hadoop-hdfs、hadoop-mapreduce-client-core、junit
外掛:maven-compiler-plugin、maven-shade-plugin
下面是我的pom檔案:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>cn.pineapple</groupId> <artifactId>MapReduceTest</artifactId> <!-- <packaging>pom</packaging>--> <version>1.0-SNAPSHOT</version> <properties> <hadoop.version>2.6.0</hadoop.version> </properties> <dependencies> <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>${hadoop.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapreduce-client-core --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>${hadoop.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/junit/junit --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>RELEASE</version> <!-- <scope>test</scope>--> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.maven.plugins/maven-shade-plugin --> <!-- <dependency>--> <!-- <groupId>org.apache.maven.plugins</groupId>--> <!-- <artifactId>maven-shade-plugin</artifactId>--> <!-- <version>2.4.3</version>--> <!-- </dependency>--> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <minimizeJar>true</minimizeJar> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
要做統計的檔案內容為:
Hello,Hadoop,BigData
Hello,Hadoop,MapReduce
Hello,Hadoop,HDFS
BigData,Perfect
MapReduce由Map和Reduce兩個階段組成。
Map
在此之前還有一個讀取檔案的操作,這個只要在主類中指定一下就好,不需要寫。
讀取後文件的內容為:
//K1 V1
0 Hello,Hadoop,BigData
21 Hello,Hadoop,MapReduce
44 Hello,Hadoop,HDFS
62 BigData,Perfect
Map後的檔案內容為:
//K2 V2
Hello 1
Hadoop 1
BigData 1
Hello 1
Hadoop 1
MapReduce 1
Hello 1
Hadoop 1
HDFS 1
BigData 1
Perfect 1
Map表示“對映”,將檔案拆分成多個塊,然後發給叢集上的機器統一計算要繼承Mapper類並重寫map()函式
,將K1,V1轉換成下面的K2,V2
package cn.pineapple.day1;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* 四個泛型的解釋:
* <p>
* KEYIN:K1的型別
* <p>
* VALUEIN:V1的型別
* <p>
* KEYOUT:K2的型別
* <p>
* VALUEOUT:V2的型別
*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
/**
* 將K1,V1轉換成K2,V2
*
* @param key: 行偏移量
* @param value: 一行文字內容
* @param context: 上下文物件
* @throws IOException:
* @throws InterruptedException:
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
Text text = new Text();
LongWritable longWritable = new LongWritable();
//1:對一行的文字資料進行拆分
String[] split = value.toString().split(",");
//2:遍歷陣列,組裝K2和V2
for (String word : split) {
//3:將K2和V2寫入上下文中
text.set(word);
longWritable.set(1);
context.write(text, longWritable);
}
}
}
map()函式內的程式碼,只需要針對檔案一行考慮。所以首先要提取每個單詞,用value.toString()將value也就是這一行的內容轉換成String型別,再用split()方法進行拆分。K2的值預設為1。在寫入的時候,還要將K2和V2轉換成Text型別和LongWritable型別。
Reduce
在Map和Reduce中間還有一個shuffle,目前用不到這個shuffle,可以採取預設的方式,它會把K2,V2轉換成:
//K2 V2
Hello <1,1,1>
Hadoop <1,1,1>
BigData <1,1>
HDFS <1>
MapReduce <1>
Perfect <1>
接著Reduce的最終結果是:
//K3 V3
BigData 2
HDFS 1
Hadoop 3
Hello 3
MapReduce 1
Perfect 1
Reduce表示“歸約”,將所有結果都統一起來,繼承Reducer類並重寫reduce()函式
,將K2和V2轉換為K3和V3,也就是最終結果。
package cn.pineapple.day1;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* 四個泛型的解釋:
* <p>
* KEYIN:K2型別
* <p>
* VALUEIN:V2型別
* <p>
* KEYOUT:K3型別
* <p>
* VALUEOUT:V3型別
*/
public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
/**
* 將K2和V2轉換為K3和V3,將K3和V3寫入上下文中
*
* @param key: 新K2
* @param values: 新V2
* @param context: 上下文物件
* @throws IOException:
* @throws InterruptedException:
*/
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long count = 0;
//1:遍歷集合,將集合中的數字相加,得到V3
for (LongWritable value : values) {
count += value.get();
}
//2:將K3V3寫入上下文中
context.write(key, new LongWritable(count));
}
}
為了實現這個最終結果,K3和K2是一樣的,我們只需要遍歷這個集合,然後相加就能得到V3。value.get()可以將LongWritable型別轉換成long型別,最後在寫入的時候再進行一次轉換。
Main方法
光有這兩個類是不行的,我們還要寫一個Main方法,詳細的列一下任務流程,指定一下任務配置等。要繼承Configured類並實現Tool介面。
package cn.pineapple.day1;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class JobMain extends Configured implements Tool {
/**
* 指定一個job任務
*
* @param strings:
* @return :
* @throws Exception:
*/
public int run(String[] strings) throws Exception {
//1:建立一個job任務物件
Job wordCount = Job.getInstance(super.getConf(), "WordCount");
//如果打包執行出錯,則需要加改配置
wordCount.setJarByClass(JobMain.class);
//2:配置job任務物件(八個步驟)
//第一步:指定檔案的讀取方式和
wordCount.setInputFormatClass(TextInputFormat.class);
// 讀取路徑
TextInputFormat.addInputPath(wordCount, new Path("hdfs://nsv:8020/input/wordcount"));
//第二步:指定Map階段的處理方式和資料型別
wordCount.setMapperClass(WordCountMapper.class);
// 設定K2的型別
wordCount.setMapOutputKeyClass(Text.class);
// 設定V2的型別
wordCount.setMapOutputValueClass(LongWritable.class);
//第三、四、五、六shuffle階段採用預設的方式
//第七步:指定Reduce階段的處理方式和資料型別
wordCount.setReducerClass(WordCountReducer.class);
// 設定K3的型別
wordCount.setOutputKeyClass(Text.class);
// 設定V3的型別
wordCount.setOutputValueClass(LongWritable.class);
//第八步:指定輸出型別
wordCount.setOutputFormatClass(TextOutputFormat.class);
// 輸出路徑
TextOutputFormat.setOutputPath(wordCount, new Path("hdfs://nsv:8020/output/wordcount"));
//等待任務結束
boolean bl = wordCount.waitForCompletion(true);
return bl ? 0 : 1;
}
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
//啟動job任務
int run = ToolRunner.run(configuration, new JobMain(), args);
System.exit(run);
}
}
run()方法裡就是本次任務的詳細流程和配置,然後main()方法裡要呼叫上面寫的run方法。它接受一個int型別作為退出程式碼,0或1。
四、打包執行
雙擊package打包,出現BUILD SUCCESS表示打包成功
出現了兩個jar包,origin包是因為用了maven-shade-plugin打包外掛,不會包含依賴jar包,所有體積較小,可以選擇這個jar包執行。
將要統計的檔案放到HDFS上hdfs dfs -mkdir -p /input/wordcount
hdfs dfs -put wordcount.txt /input/wordcount
將jar包上傳到叢集上後,跑一下嘍hadoop jar original-MapReduceTest-1.0-SNAPSHOT.jar cn.pineapple.day1.JobMain
,cn.pineapple.day1.JobMain是主類的全路徑
completed successfully看來是成功了,去Web上看一下實際結果
點選Download可以檢視下載結果