hadoop離線day09 mapreduce入門和wordcount示例
阿新 • • 發佈:2020-12-10
技術標籤:hadoop離線分散式hadoopmapreduce大資料spark
mapreduce入門和wordcount示例
1、mapreduce入門
分散式檔案計算系統,主要用於計算我們的一些資料
MapReduce的核心思想:分而治之
最主要有兩個階段:
map階段:負責任務拆分,
reduce階段:負責結果聚合
mapreduce程式設計可控的八個步驟(天龍八部)
-
map階段兩個步驟
1、第一步:讀取檔案,解析成key,value對,這裡是我們的K1 V1
2、第二步:接收我們的k1 v1,自定義我們的map邏輯,然後轉換成新的key2 value2 進行輸出 往下發送 這裡傳送出去的是我們k2 v2 -
shuffle階段四個步驟
3、第三步:分割槽 相同key的value傳送到同一個reduce裡面去,key合併,value形成一個集合
4、第四步:排序 預設按照欄位順序進行排序
5、第五步:規約
6、第六步:分組 -
reduce階段兩個步驟
7、接收我們的k2 v2 自定義我們的reduce邏輯,轉換成新的k3 v3 進行輸出
8、將我們的K3 v3 進行輸出
2、wordcount示例
需求:在一堆給定的文字檔案中統計輸出每一個單詞出現的總次數
資料格式準備如下:
cd /export/servers
vim wordcount.txt
#hello,world,hadoop
#hive,sqoop,flume,hello
#kitty,tom,jerry,world
hadoop
hdfs dfs -mkdir /wordcount/
hdfs dfs -put wordcount.txt /wordcount/
java 實現程式碼
package cn.itcat.wordcount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* 除了javaweb的程式打成一個war包進行執行,其他的程式都是打成一個jar包進行執行
* 執行一個jar包,需要一個main方法,作為我們程式的入口類
*/
public class MainCount extends Configured implements Tool{
/**
* 程式的入口類
* @param args
*/
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
//這裡執行完成之後,得到一個int型別的返回值,表示我們程式的退出狀態碼
//如果退出狀態碼是0 程式執行成功
//通過這裡設定configuration,就相當於我們把父類的configuration設定值了
int run = ToolRunner.run(configuration, new MainCount(), args);
System.exit(run);
}
/**
* 這個run方法很重,這裡面就是通過job物件來組裝我們的程式,說白了就是組裝我們的八個類
* @param args
*/
@Override
public int run(String[] args) throws Exception {
//第一步:讀取檔案,解析成key,value對
//從父類裡面獲取configuration配置檔案
//getInstance需要兩個引數,第一個引數是我們的configuration配置檔案,第二個引數叫做jobName隨便寫
Job job = Job.getInstance(super.getConf(), "xxx");
//如果打包到叢集上面去執行,需要新增這一句,指定我們main方法所在的java類
job.setJarByClass(MainCount.class);
TextInputFormat.addInputPath(job,new Path("hdfs://node01:8020/wordcount"));
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setReducerClass(WordCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
TextOutputFormat.setOutputPath(job,new Path("hdfs://node01:8020/wordcountoutput"));
job.setOutputFormatClass(TextOutputFormat.class);
boolean b = job.waitForCompletion(true);
return b?0:1;
}
}
package cn.itcat.wordcount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WordCountMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
/***
* 重寫map方法,實現我們自己的邏輯,接受我們key1,value1 轉換成新的k2 v2 輸出
* @param key 注意這個key是我們的k1
* @param value 注意這個value是我們的v1
* @param context 上下文物件,承上啟下,銜接我們的上面的元件與下面的元件
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//hive,sqoop,flume,hello
//第一步:切開我們一行資料
String line = value.toString();
String[] split = line.split(",");//[hive,sqoop,flume,hello]
// key2 value2 往下發送
// hive 1
for (String word : split) {
Text k2 = new Text(word);
IntWritable v2 = new IntWritable(1);
//通過write方法,將我們的資料往下發送
context.write(k2,v2);
}
}
}
package cn.itcat.wordcount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
/**
*
* @param key 注意這個key 是 k2
* @param values 注意這個values是一個集合,集合的類習慣是 v2的型別
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int j= 0;
for (IntWritable value : values) {
//IntWritable這個類,沒有普通的 + 方法,不能累加,需要轉換成 int型別進行累加
int num = value.get();
j += num;
}
//輸出我們的key3 value3 型別
context.write(key,new IntWritable(j));
}
}