1. 程式人生 > >hadoop wordcount 詳解

hadoop wordcount 詳解

mapreducehadoop中的分散式運算程式設計框架,只要按照其程式設計規範,只需要編寫少量的業務邏輯程式碼即可實現一個強大的海量資料併發處理程式

Demo開發——wordcount

1、需求

從大量(比如T級別)文字檔案中,統計出每一個單詞出現的總次數

2mapreduce實現思路

Map階段:

a) HDFS的源資料檔案中逐行讀取資料

b) 將每一行資料切分出單詞

c) 為每一個單詞構造一個鍵值對(單詞,1)

d) 將鍵值對傳送給reduce

Reduce階段

a) 接收map階段輸出的單詞鍵值對

b) 將相同單詞的鍵值對匯聚成一組

c) 對每一組,遍歷組中的所有“值”,累加求和,即得到每一個單詞的總次數

d) (單詞,總次數)輸出到HDFS的檔案中

1、 具體編碼實現

(1)定義一個mapper

//首先要定義四個泛型的型別

//keyin:  LongWritable    valuein: Text

//keyout: Text            valueout:IntWritable

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{

//map方法的生命週期:  框架每傳一行資料就被呼叫一次

//key :  這一行的起始點在檔案中的偏移量

//value: 這一行的內容

@Override

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

//預設情況下,inputformat中的資料是逐行的處理,拿到一行資料轉換為string

String line = value.toString();

//將這一行切分出各個單詞

String[] words = line.split(" "); //按空格切分

//遍歷陣列,輸出<單詞,1>

for(String word:words){

context.write(new Text(word), new IntWritable(1));//這裡的輸出型別要與初始定義的一致

}

}

}

(2)定義一個reducer

//生命週期:框架每傳遞進來一個kv 組,reduce方法被呼叫一次

public class WordCountMapper extends Mapper<Text, IntWritable, Text, IntWritable>{

@Override

protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

//定義一個計數器

int count = 0;

//遍歷這一組kv的所有v,累加到count

for(IntWritable value:values){  //value的值儲存在迭代器上

count += value.get();

}

context.write(key, new IntWritable(count));

}

}

(3)定義一個主類,用來描述job並提交job

public class WordCountRunner {

//把業務邏輯相關的資訊(哪個是mapper,哪個是reducer,要處理的資料在哪裡,輸出的結果放哪裡。。。。。。)描述成一個job物件

//把這個描述好的job提交給叢集去執行

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();

Job wcjob = Job.getInstance(conf);

//指定我這個job所在的jar

//wcjob.setJar("/home/hadoop/wordcount.jar");

wcjob.setJarByClass(WordCountRunner.class);

wcjob.setMapperClass(WordCountMapper.class);

wcjob.setReducerClass(WordCountReducer.class);

//設定我們的業務邏輯Mapper類的輸出keyvalue的資料型別

wcjob.setMapOutputKeyClass(Text.class);

wcjob.setMapOutputValueClass(IntWritable.class);

//設定我們的業務邏輯Reducer類的輸出keyvalue的資料型別

wcjob.setOutputKeyClass(Text.class);

wcjob.setOutputValueClass(IntWritable.class);

//指定要處理的資料所在的位置

FileInputFormat.setInputPaths(wcjob, "hdfs://hdp-server01:9000/wordcount/data/big.txt");

//指定處理完成之後的結果所儲存的位置

FileOutputFormat.setOutputPath(wcjob, new Path("hdfs://hdp-server01:9000/wordcount/output/"));

//yarn叢集提交這個job

boolean res = wcjob.waitForCompletion(true);

System.exit(res?0:1);

}

1. 將程式打包:export 出jar包

2. 準備輸入資料

vi  /home/hadoop/test.txt

Hello tom

Hello jim

Hello ketty

Hello world

Ketty tom

hdfs上建立輸入資料資料夾

hadoop   fs  mkdir  -p  /wordcount/input

words.txt上傳到hdfs

hadoop  fs  –put  /home/hadoop/words.txt  /wordcount/input

3. 將程式jar包上傳到叢集的任意一臺伺服器上

4. 使用命令啟動執行wordcount程式jar

$ hadoop jar wordcount.jar cn.itcast.bigdata.mrsimple.WordCountDriver /wordcount/input /wordcount/out

 注:目錄 /wordcount/out 必須是不存在的, 否則執行時將會報the file have existed的錯誤

5. 檢視執行結果

$ hadoop fs –cat /wordcount/out/part-r-00000