1. 程式人生 > 其它 >hadoop離線day09 mapreduce入門和wordcount示例

hadoop離線day09 mapreduce入門和wordcount示例

技術標籤:hadoop離線分散式hadoopmapreduce大資料spark

mapreduce入門和wordcount示例

1、mapreduce入門

分散式檔案計算系統,主要用於計算我們的一些資料
MapReduce的核心思想:分而治之
最主要有兩個階段:
map階段:負責任務拆分,
reduce階段:負責結果聚合

mapreduce程式設計可控的八個步驟(天龍八部)

  • map階段兩個步驟
    1、第一步:讀取檔案,解析成key,value對,這裡是我們的K1 V1
    2、第二步:接收我們的k1 v1,自定義我們的map邏輯,然後轉換成新的key2 value2 進行輸出 往下發送 這裡傳送出去的是我們k2 v2

  • shuffle階段四個步驟
    3、第三步:分割槽 相同key的value傳送到同一個reduce裡面去,key合併,value形成一個集合
    4、第四步:排序 預設按照欄位順序進行排序
    5、第五步:規約
    6、第六步:分組

  • reduce階段兩個步驟
    7、接收我們的k2 v2 自定義我們的reduce邏輯,轉換成新的k3 v3 進行輸出
    8、將我們的K3 v3 進行輸出

2、wordcount示例

需求:在一堆給定的文字檔案中統計輸出每一個單詞出現的總次數
資料格式準備如下:

cd /export/servers
vim wordcount.txt
#hello,world,hadoop
#hive,sqoop,flume,hello
#kitty,tom,jerry,world hadoop hdfs dfs -mkdir /wordcount/ hdfs dfs -put wordcount.txt /wordcount/

java 實現程式碼

package cn.itcat.wordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import
org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * 除了javaweb的程式打成一個war包進行執行,其他的程式都是打成一個jar包進行執行 * 執行一個jar包,需要一個main方法,作為我們程式的入口類 */ public class MainCount extends Configured implements Tool{ /** * 程式的入口類 * @param args */ public static void main(String[] args) throws Exception { Configuration configuration = new Configuration(); //這裡執行完成之後,得到一個int型別的返回值,表示我們程式的退出狀態碼 //如果退出狀態碼是0 程式執行成功 //通過這裡設定configuration,就相當於我們把父類的configuration設定值了 int run = ToolRunner.run(configuration, new MainCount(), args); System.exit(run); } /** * 這個run方法很重,這裡面就是通過job物件來組裝我們的程式,說白了就是組裝我們的八個類 * @param args */ @Override public int run(String[] args) throws Exception { //第一步:讀取檔案,解析成key,value對 //從父類裡面獲取configuration配置檔案 //getInstance需要兩個引數,第一個引數是我們的configuration配置檔案,第二個引數叫做jobName隨便寫 Job job = Job.getInstance(super.getConf(), "xxx"); //如果打包到叢集上面去執行,需要新增這一句,指定我們main方法所在的java類 job.setJarByClass(MainCount.class); TextInputFormat.addInputPath(job,new Path("hdfs://node01:8020/wordcount")); job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(WordCountMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setReducerClass(WordCountReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); TextOutputFormat.setOutputPath(job,new Path("hdfs://node01:8020/wordcountoutput")); job.setOutputFormatClass(TextOutputFormat.class); boolean b = job.waitForCompletion(true); return b?0:1; } }
package cn.itcat.wordcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;


public class WordCountMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
    /***
     * 重寫map方法,實現我們自己的邏輯,接受我們key1,value1  轉換成新的k2  v2  輸出
     * @param key  注意這個key是我們的k1
     * @param value  注意這個value是我們的v1
     * @param context  上下文物件,承上啟下,銜接我們的上面的元件與下面的元件
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//hive,sqoop,flume,hello
        //第一步:切開我們一行資料
        String line = value.toString();
        String[] split = line.split(",");//[hive,sqoop,flume,hello]
        //  key2    value2    往下發送
        //  hive   1
        for (String word : split) {

           Text k2 =  new Text(word);
           IntWritable v2 =  new IntWritable(1);

           //通過write方法,將我們的資料往下發送
            context.write(k2,v2);

        }


    }
}
package cn.itcat.wordcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable> {

    /**
     *
     * @param key  注意這個key  是  k2
     * @param values  注意這個values是一個集合,集合的類習慣是  v2的型別
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int j= 0;
        for (IntWritable value : values) {
            //IntWritable這個類,沒有普通的  +  方法,不能累加,需要轉換成  int型別進行累加
            int num = value.get();
            j += num;

        }
        //輸出我們的key3  value3  型別
        context.write(key,new IntWritable(j));


    }
}