1. 程式人生 > 其它 >MapReduce入門案例之wordcount(計算單詞出現次數)

MapReduce入門案例之wordcount(計算單詞出現次數)

MR的wordcount的練習就相當於java的helloworld一樣

學習前提:

  • JAVA基礎:資料型別、方法、面向物件、反射等等(看懂語法)

  • maven(處理依賴、打包)

  • Hadoop-HDFS的儲存原理(看懂叢集架構、block等)

  • Hadoop-MapReduce的執行原理(看懂流程)

  • Linux (shell操作)


一共要定義3個類,

  • Map類(定義Map階段怎麼處理)
  • Reduce類(定義Reduce階段怎麼處理)
  • JobMain主類(定義整個MapReduce處理流程,見下)

準備資料

hello,world,hadoop
hive,sqoop,flume,hello
kitty,tom,jerry,world
hadoop

儲存成.txt檔案

MapReduce 處理資料流程

Map階段:

  1. 輸入,讀取源資料( setInputFormatClass方法 / 得出K1,V1)
  2. 設定Mapper類( 繼承Mapper類 / K1,V1 轉換 K2,V2)

shuffle階段(直接預設,跳過)

  1. 分割槽
  2. 排序
  3. 規約
  4. 分組

Reduce階段

  1. 設定Reduce類( 繼承Reduce類 / 新K2,V2 轉換 K3,V3)
  2. 輸出,儲存結果( setOutputFormatClass方法 / 輸出K3,V3)

程式碼:

pom.xml 設定遠端倉庫、依賴、指令碼

    <!--指定倉庫-->
    <repositories>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
    </repositories>
    <!--打包方式-->
    <packaging>jar</packaging>
    <!--包-->
    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.6.0-mr1-cdh5.14.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.6.0-cdh5.14.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.6.0-cdh5.14.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.6.0-cdh5.14.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/junit/junit -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.testng</groupId>
            <artifactId>testng</artifactId>
            <version>RELEASE</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <minimizeJar>true</minimizeJar>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

Mapper類

1.必須繼承Mapper類並且指定好K1,V1,K2,V2對應的hadoop資料型別

2.必須重寫map方法

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/*
 KEYIN:K1的型別
 VALUEIN:V1的型別
 KEYOUT:K2的型別
 VALUEOUT:V2的型別
 注意:
 Mapper泛型裡面要用hadoop自定義的型別(其實就是Hadoop將原本的型別加上序列化操作再封裝) 
 即org.apache.hadoop包下的資料型別
 如:long -> LongWritable;
    String-> Text
    ...
*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
    //目的:將K1,V1 轉換 K2,V2
    /*
    引數:
    key    :K1  行偏移量
    value  : V1  每一行的文字資料
    context:上下文物件,橋樑,連線shuffle階段
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //建立物件儲存資料
        LongWritable ValueOut = new LongWritable();
        Text KeyOut = new Text();
        //1:拆分文字資料
        String[] split = value.toString().split(",");
        //2:遍歷資料,拆分,重組裝K2 ,V2
        for (String word : split) {
            KeyOut.set(word);
            ValueOut.set(1);
            //3:將K2,V2寫入上下文物件當中
            context.write(KeyOut, ValueOut);
        }
    }
}
//完畢

Reduce

(同Map類似)

1.必須繼承Reducer並指定K2,V2,K3,V3的型別

2.重寫reduce方法

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
//雖然第二個引數即V2是集合,但是仍然使用集合裡面的元素型別作為泛型引數
public class WordCountReduce extends Reducer<Text, LongWritable, Text, LongWritable> {
    //目的:新K2,V2 轉成K3,V3,將K3,V3寫入上下文
    /*
    該類方法實現的結果如下:
    要處理的資料:
     新 K2     V2
       Hello   <1,1,1>
     -----------------  
    最終輸出結果:	
        K3     V3
        hello  3
     */
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        long count = 0;
        //1:遍歷集合,將集合中的數字相加,得到V3
        for (LongWritable value : values) {
            count += value.get();
        }
        //2:將K3和V3寫入上下文
        context.write(key, new LongWritable(count));
    }
}

主類JobMain

(依照8個流程步驟編寫程式碼)

1.必須繼承Configured類、實現Tool介面

2.必須重寫run方法

3.main方法來啟動程式

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.net.URI;

//MapReduce需要定義主類來描述job並提交job,用來啟動MR-Job
//關鍵點1:必須要繼承 Configured 配置類,和實現 Tool 介面,注意是hadoop包下的
public class JobMain extends Configured implements Tool {

    //關鍵點2:必須重寫一個run方法來呼叫
    @Override
    public int run(String[] strings) throws Exception {
        Job job = Job.getInstance(super.getConf(), JobMain.class.getSimpleName());
        
        //打包到叢集上面執行時候,必須要新增以下配置,指定程式的main函式
        //如果打包出錯就需要加上該配置
        job.setJarByClass(JobMain.class);

        //第一步:設定輸入型別,讀取路徑,讀取輸入檔案解析成鍵值對K1,V1
        job.setInputFormatClass(TextInputFormat.class);
        
        //叢集做好hosts地址對映,不用直接些IP,寫node01即可
        TextInputFormat.addInputPath(job, new Path("hdfs://node01:8020/wordcount"));//自動讀取資料夾所有的檔案

        //本地執行(必須配置本地hadoop環境)
        /*TextInputFormat.addInputPath(job,
        			new Path("file:///F:\\mapreduce\\mrinput\\wordcount.txt"));*/

        //第二步:設定Mapper類,並設定Map階段完成之後的輸出型別(K2,V2)
        job.setMapperClass(WordCountMapper.class);//class是反射的知識
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        //第三、四、五、六步,預設,暫時不用寫

        //第七步:設定Reduce類,並設定Reduce階段完成之後的輸出型別(K3,V3)
        job.setReducerClass(WordCountReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        //第八步:設定輸出型別和輸出路徑
        job.setOutputFormatClass(TextOutputFormat.class);
        Path path = new Path("hdfs://node01:8020/wordcount_out");

        //本地執行(必須配置本地hadoop環境)
  		/*TextOutputFormat.setOutputPath(job, 
 							 new Path("file:///F:\\mapreduce\\mroutput"));*/

        TextOutputFormat.setOutputPath(job, path);//如果目錄已存在會報錯

        //改良:避免目錄已存在,先判斷是否存在,存在就刪除
        //連線HDFS檔案系統
        //獲取FileSystem
        FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:8020"), 
                                               new Configuration());
        //判斷目錄是否存在
        boolean hbl = fileSystem.exists(path);
        if (hbl) {
            //刪除目錄   第一個引數是刪除哪個目錄   第二個引數是否遞迴刪除
            fileSystem.delete(path, true);
        }
        //等待MR程式完成..
        boolean mrb = job.waitForCompletion(true);
        
        //返回run主類的執行結果
        return mrb ? 0 : 1;//三元運算子
    }

    //關鍵點3:由於是主啟動函式,需要建立main主函式
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        //本地執行,可以加入配置引數
        //configuration.set("mapreduce.framework.name", "local");
        //configuration.set("yarn.resourcemanager.hostname", "local");
        Tool tool = new JobMain();
        int run = ToolRunner.run(configuration, tool, args);
        System.exit(run);
    }
}

最後,打包,上次,執行

叢集執行:

#hadoop jar  編寫好的MR程式Jar包    主方法所在的類
hadoop jar original-mapreducedemo-1.0-SNAPSHOT.jar com.yh.mapreduce.JobMain

本地執行需要的環境:

  • 下載解壓windows版Hadoop
  • 環境變數HADOOP_HOME
  • Path:%HADOOP_HOME%\bin
  • 複製bin目錄下的hadoop.dll到c:\system32目錄下
  • 重啟

End!~