1. 程式人生 > >hadoop開發MapReduce程序

hadoop開發MapReduce程序

write version prop 本地 pro 包裝類 讀取數據 red acc

準備工作:

1.設置HADOOP_HOME,指向hadoop安裝目錄,否則報這個錯:

2.在window下,需要把hadoop/bin那個目錄替換下,在網上搜一個對應版本的

3.如果還報org.apache.hadoop.io.nativeio.NativeIO$Windows.access0錯,把其中的hadoop.dll復制到c:\windows\system32目錄

依賴的jar

1.common
  hadoop-2.7.3\share\hadoop\common\hadoop-common-2.7.3.jar
  hadoop-2.7.3\share\hadoop\common\lib下的所有
2.hdfs
  hadoop-2.7.3\share\hadoop\hdfs\hadoop-hdfs-2.7.3.jar
hadoop-2.7.3\share\hadoop\hdfs\lib下的所有
3.mapreduce
hadoop-2.7.3\share\hadoop\mapreduce\hadoop-mapreduce-client-app-2.7.3.jar
hadoop-2.7.3\share\hadoop\mapreduce\hadoop-mapreduce-client-common-2.7.3.jar
hadoop-2.7.3\share\hadoop\mapreduce\hadoop-mapreduce-client-core-2.7.3.jar
hadoop-2.7.3\share\hadoop\mapreduce\hadoop-mapreduce-client-hs-2.7.3.jar
hadoop-2.7.3\share\hadoop\mapreduce\hadoop-mapreduce-client-hs-plugins-2.7.3.jar
hadoop-2.7.3\share\hadoop\mapreduce\hadoop-mapreduce-client-jobclient-2.7.3.jar
hadoop-2.7.3\share\hadoop\mapreduce\hadoop-mapreduce-client-jobclient-2.7.3-tests.jar
hadoop-2.7.3\share\hadoop\mapreduce\hadoop-mapreduce-client-shuffle-2.7.3.jar
hadoop-2.7.3\share\hadoop\mapreduce\lib下的所有
4.yarn
hadoop-2.7.3\share\hadoop\yarn\lib\hadoop-yarn-api-2.7.3.jar
hadoop-2.7.3\share\hadoop\yarn\lib\hadoop-yarn-applications-distributedshell-2.7.3.jar
hadoop-2.7.3\share\hadoop\yarn\lib\hadoop-yarn-applications-unmanaged-am-launcher-2.7.3.jar
hadoop-2.7.3\share\hadoop\yarn\lib\hadoop-yarn-client-2.7.3.jar
hadoop-2.7.3\share\hadoop\yarn\lib\hadoop-yarn-common-2.7.3.jar
hadoop-2.7.3\share\hadoop\yarn\lib\hadoop-yarn-registry-2.7.3.jar
hadoop-2.7.3\share\hadoop\yarn\lib\hadoop-yarn-server-applicationhistoryservice-2.7.3.jar
hadoop-2.7.3\share\hadoop\yarn\lib\hadoop-yarn-server-common-2.7.3.jar
hadoop-2.7.3\share\hadoop\yarn\lib\hadoop-yarn-server-nodemanager-2.7.3.jar
hadoop-2.7.3\share\hadoop\yarn\lib\hadoop-yarn-server-resourcemanager-2.7.3.jar
hadoop-2.7.3\share\hadoop\yarn\lib\hadoop-yarn-server-sharedcachemanager-2.7.3.jar
hadoop-2.7.3\share\hadoop\yarn\lib\hadoop-yarn-server-tests-2.7.3.jar
hadoop-2.7.3\share\hadoop\yarn\lib\hadoop-yarn-server-web-proxy-2.7.3.jar
hadoop-2.7.3\share\hadoop\yarn\lib下的所有

可以通過maven管理:

<?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion
>4.0.0</modelVersion> <groupId>xiaol</groupId> <artifactId>xiaol-hadoop</artifactId> <version>1.0-SNAPSHOT</version> <description>MapReduce</description> <properties> <project.build.sourceencoding
>UTF-8</project.build.sourceencoding> <hadoop.version>2.7.3</hadoop.version> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>${hadoop.version}</version> </dependency> </dependencies> </project>

編寫Mapper:

package xiaol;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * 整個工作過程:input->split->map->shuffle->reduce->output
 * input:  每一行都是空格分割的單詞
 *         hello java
 *         hello python
 * split:   默認按行讀取input,每一行作為一個KV對,交給下一步
 *          K就是行首地址,V就是行內容
 *          K:1   V:hello java
 *          K:11  V:hello python
 *          當然這一步可以用戶自己重寫
 * map:     必須由用戶實現的步驟,進行業務邏輯處理
 *          從split的結果中讀取數據,統計單詞,產生KEYOUT VALUEOUT交給shuffle
 *          這裏交給shuffle的K是單詞,V是單詞出現的次數
 *          hello 1
 *          java 1
 * shuffle  map的結果是KV對的形式,會把相同的K移動到同一個Node上去進行reduce
 *          當傳給reduce的時候會相同K的V組裝成Iterable<VALUEOUT>類型
 *          hello 1,1
 *          當然這一步可以用戶自己重寫
 * reduce   必須由用戶實現的步驟,進行業務邏輯處理,將shuffle過來的結果進行匯總
 *          從shuffle的結果中讀取數據,統計單詞,產生KEYOUT VALUEOUT交給output
 *          hello 2
 */
/**
 * org.apache.hadoop.mapreduce.Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
 *     KEYIN    split完成後交給map的key的類型
 *     VALUEIN  split完成後交給map的value的類型
 *     KEYOUT   map完成後交給shuffle的key的類型
 *     VALUEOUT map完成後交給shuffle的key的類型
 * org.apache.hadoop.io.LongWritable    hadoop自己的Long包裝類
 * org.apache.hadoop.io.Text            hadoop自己的Text
 * org.apache.hadoop.io.IntWritable     hadoop自己的Int包裝類
 */
public class WordMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
    /**
     * 重寫map方法
     * protected void map(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException
     *      KEYIN       split完成後交給map的key的類型,就是那一行的起始地址
     *      VALUEIN     split完成後交給map的value的類型,就是那一行的內容
     *      Context     整個MapReduce的執行環境
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String s = value.toString();
        String[] words = s.split(" ");  //由於每一行都是空格分割的單詞,比如hello java這種的,要統計個數,就先拆分
        for(String word: words){
            /**
             * 在執行環境中寫入KEYOUT和VALUEOUT作為下一步(shuffle)的輸入
             *
             * 這一步是要統計在當前處理這一行裏每個單詞出現的次數,這裏直接給了個1
             * 這裏可能有的人會有疑問:如果在某一行裏出現了兩個相同的單詞會怎麽樣?
             * 這個是不影響的,比如出現了兩個hello,結果就是給shuffle的時候會有兩個hello 1
             * 然後shuffle的時候會把這兩個hello 1交給reduce去處理
             */
            context.write(new Text(word), new IntWritable(1));
        }
    }
}

編寫Reducer

package xiaol;

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * org.apache.hadoop.mapreduce.Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
 */
public class WordReducer extends Reducer<Text, IntWritable, Text, LongWritable> {

    /**
     * 重寫reduce方法
     * protected void reduce(KEYIN key, Iterable<VALUEIN> values, Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException
     *      KEYIN                   shuffle完成後交給reduce的key的類型,其實就是map的KEYOUT
     *      Iterable<VALUEIN>       shuffle完成後交給reduce的value的類型的數組(shuffle那一步會把相同的K分發到同一個node上去進行reduce,所以這裏是V數組),其實就是map的VALUEOUT數組
     *      Context                 整個MapReduce的執行環境
     */
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
        long count = 0;
        for(IntWritable v : values) {
            count += v.get();
        }
        context.write(key, new LongWritable(count));
    }

}

編寫啟動類:

package xiaol;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.util.Properties;

/**
 *
 */
public class Test {
    public static void main(String[] args) throws Exception {
        //本地運行直接new一個Configuration,遠程運行需要配集群相關的配置
        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf);

        //設定mapper和reducer的class
        job.setMapperClass(WordMapper.class);
        job.setReducerClass(WordReducer.class);

        //設定mapper和outputKey和outputValue的class
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        //設定reducer和outputKey和outputValue的class
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        FileInputFormat.setInputPaths(job, "d:/test/test.txt");
        FileOutputFormat.setOutputPath(job, new Path("d:/test/out/"));

        //等待結束,true代表打印中間日誌
        job.waitForCompletion(true);
    }
}

hadoop開發MapReduce程序