MapReduce入門案例之wordcount(計算單詞出現次數)
阿新 • • 發佈:2021-08-04
MR的wordcount的練習就相當於java的helloworld一樣
學習前提:
-
JAVA基礎:資料型別、方法、面向物件、反射等等(看懂語法)
-
maven(處理依賴、打包)
-
Hadoop-HDFS的儲存原理(看懂叢集架構、block等)
-
Hadoop-MapReduce的執行原理(看懂流程)
-
Linux (shell操作)
一共要定義3個類,
- Map類(定義Map階段怎麼處理)
- Reduce類(定義Reduce階段怎麼處理)
- JobMain主類(定義整個MapReduce處理流程,見下)
準備資料
hello,world,hadoop hive,sqoop,flume,hello kitty,tom,jerry,world hadoop
儲存成.txt檔案
MapReduce 處理資料流程:
Map階段:
- 輸入,讀取源資料( setInputFormatClass方法 / 得出K1,V1)
- 設定Mapper類( 繼承Mapper類 / K1,V1 轉換 K2,V2)
shuffle階段(直接預設,跳過)
- 分割槽
- 排序
- 規約
- 分組
Reduce階段
- 設定Reduce類( 繼承Reduce類 / 新K2,V2 轉換 K3,V3)
- 輸出,儲存結果( setOutputFormatClass方法 / 輸出K3,V3)
程式碼:
pom.xml 設定遠端倉庫、依賴、指令碼
<!--指定倉庫--> <repositories> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> </repositories> <!--打包方式--> <packaging>jar</packaging> <!--包--> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.6.0-mr1-cdh5.14.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.6.0-cdh5.14.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.6.0-cdh5.14.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>2.6.0-cdh5.14.0</version> </dependency> <!-- https://mvnrepository.com/artifact/junit/junit --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <dependency> <groupId>org.testng</groupId> <artifactId>testng</artifactId> <version>RELEASE</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.0</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <minimizeJar>true</minimizeJar> </configuration> </execution> </executions> </plugin> </plugins> </build>
Mapper類
1.必須繼承Mapper類並且指定好K1,V1,K2,V2對應的hadoop資料型別
2.必須重寫map方法
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /* KEYIN:K1的型別 VALUEIN:V1的型別 KEYOUT:K2的型別 VALUEOUT:V2的型別 注意: Mapper泛型裡面要用hadoop自定義的型別(其實就是Hadoop將原本的型別加上序列化操作再封裝) 即org.apache.hadoop包下的資料型別 如:long -> LongWritable; String-> Text ... */ public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> { //目的:將K1,V1 轉換 K2,V2 /* 引數: key :K1 行偏移量 value : V1 每一行的文字資料 context:上下文物件,橋樑,連線shuffle階段 */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //建立物件儲存資料 LongWritable ValueOut = new LongWritable(); Text KeyOut = new Text(); //1:拆分文字資料 String[] split = value.toString().split(","); //2:遍歷資料,拆分,重組裝K2 ,V2 for (String word : split) { KeyOut.set(word); ValueOut.set(1); //3:將K2,V2寫入上下文物件當中 context.write(KeyOut, ValueOut); } } } //完畢
Reduce
(同Map類似)
1.必須繼承Reducer並指定K2,V2,K3,V3的型別
2.重寫reduce方法
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
//雖然第二個引數即V2是集合,但是仍然使用集合裡面的元素型別作為泛型引數
public class WordCountReduce extends Reducer<Text, LongWritable, Text, LongWritable> {
//目的:新K2,V2 轉成K3,V3,將K3,V3寫入上下文
/*
該類方法實現的結果如下:
要處理的資料:
新 K2 V2
Hello <1,1,1>
-----------------
最終輸出結果:
K3 V3
hello 3
*/
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long count = 0;
//1:遍歷集合,將集合中的數字相加,得到V3
for (LongWritable value : values) {
count += value.get();
}
//2:將K3和V3寫入上下文
context.write(key, new LongWritable(count));
}
}
主類JobMain
(依照8個流程步驟編寫程式碼)
1.必須繼承Configured類、實現Tool介面
2.必須重寫run方法
3.main方法來啟動程式
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.net.URI;
//MapReduce需要定義主類來描述job並提交job,用來啟動MR-Job
//關鍵點1:必須要繼承 Configured 配置類,和實現 Tool 介面,注意是hadoop包下的
public class JobMain extends Configured implements Tool {
//關鍵點2:必須重寫一個run方法來呼叫
@Override
public int run(String[] strings) throws Exception {
Job job = Job.getInstance(super.getConf(), JobMain.class.getSimpleName());
//打包到叢集上面執行時候,必須要新增以下配置,指定程式的main函式
//如果打包出錯就需要加上該配置
job.setJarByClass(JobMain.class);
//第一步:設定輸入型別,讀取路徑,讀取輸入檔案解析成鍵值對K1,V1
job.setInputFormatClass(TextInputFormat.class);
//叢集做好hosts地址對映,不用直接些IP,寫node01即可
TextInputFormat.addInputPath(job, new Path("hdfs://node01:8020/wordcount"));//自動讀取資料夾所有的檔案
//本地執行(必須配置本地hadoop環境)
/*TextInputFormat.addInputPath(job,
new Path("file:///F:\\mapreduce\\mrinput\\wordcount.txt"));*/
//第二步:設定Mapper類,並設定Map階段完成之後的輸出型別(K2,V2)
job.setMapperClass(WordCountMapper.class);//class是反射的知識
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//第三、四、五、六步,預設,暫時不用寫
//第七步:設定Reduce類,並設定Reduce階段完成之後的輸出型別(K3,V3)
job.setReducerClass(WordCountReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//第八步:設定輸出型別和輸出路徑
job.setOutputFormatClass(TextOutputFormat.class);
Path path = new Path("hdfs://node01:8020/wordcount_out");
//本地執行(必須配置本地hadoop環境)
/*TextOutputFormat.setOutputPath(job,
new Path("file:///F:\\mapreduce\\mroutput"));*/
TextOutputFormat.setOutputPath(job, path);//如果目錄已存在會報錯
//改良:避免目錄已存在,先判斷是否存在,存在就刪除
//連線HDFS檔案系統
//獲取FileSystem
FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:8020"),
new Configuration());
//判斷目錄是否存在
boolean hbl = fileSystem.exists(path);
if (hbl) {
//刪除目錄 第一個引數是刪除哪個目錄 第二個引數是否遞迴刪除
fileSystem.delete(path, true);
}
//等待MR程式完成..
boolean mrb = job.waitForCompletion(true);
//返回run主類的執行結果
return mrb ? 0 : 1;//三元運算子
}
//關鍵點3:由於是主啟動函式,需要建立main主函式
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
//本地執行,可以加入配置引數
//configuration.set("mapreduce.framework.name", "local");
//configuration.set("yarn.resourcemanager.hostname", "local");
Tool tool = new JobMain();
int run = ToolRunner.run(configuration, tool, args);
System.exit(run);
}
}
最後,打包,上次,執行
叢集執行:
#hadoop jar 編寫好的MR程式Jar包 主方法所在的類
hadoop jar original-mapreducedemo-1.0-SNAPSHOT.jar com.yh.mapreduce.JobMain
本地執行需要的環境:
- 下載解壓windows版Hadoop
- 環境變數HADOOP_HOME
- Path:%HADOOP_HOME%\bin
- 複製bin目錄下的hadoop.dll到c:\system32目錄下
- 重啟
End!~