1. 程式人生 > >用IDEA編寫一個wordcount

用IDEA編寫一個wordcount

建立一個maven專案:

在pom.xml中插入以下程式碼,匯入對應包:這裡注意<mainClass>cn.itcast.hadoop.wordcountdrive</mainClass>,不新增主類路徑hadoop  jar ***.jar命令無法找到執行主類

 

<dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.7.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.7.1</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>jdk.tools</groupId>
            <artifactId>jdk.tools</artifactId>
            <version>1.8</version>
            <scope>system</scope>
            <systemPath>C:/Program Files/Java/jdk1.8.0_144/lib/tools.jar</systemPath>
        </dependency>
    </dependencies>
    <build>
        <pluginManagement><!-- lock down plugins versions to avoid using Maven
				defaults (may be moved to parent pom) -->
            <plugins>
                <plugin>
                    <artifactId>maven-clean-plugin</artifactId>
                    <version>3.0.0</version>
                </plugin>
                <!-- see http://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
                <plugin>
                    <artifactId>maven-resources-plugin</artifactId>
                    <version>3.0.2</version>
                </plugin>
                <plugin>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.7.0</version>
                </plugin>
                <plugin>
                    <artifactId>maven-surefire-plugin</artifactId>
                    <version>2.20.1</version>
                    <!--<configuration>-->
                    <!--<source>1.8</source>-->
                    <!--<target>1.8</target>-->
                    <!--<encoding>UTF-8</encoding>-->
                    <!--</configuration>-->
                </plugin>
                <plugin>
                    <artifactId>maven-jar-plugin</artifactId>
                    <version>3.0.2</version>
                    <configuration>
                        <archive>
                            <manifest>
                                <addClasspath>true</addClasspath>
                                <classpathPrefix>lib</classpathPrefix>
                                <mainClass>cn.itcast.hadoop.wordcountdrive</mainClass>
                            </manifest>
                        </archive>
                    </configuration>
                </plugin>
                <plugin>
                    <artifactId>maven-install-plugin</artifactId>
                    <version>2.5.2</version>
                </plugin>
                <plugin>
                    <artifactId>maven-deploy-plugin</artifactId>
                    <version>2.8.2</version>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>

  在Java包中建立itcast.hadoop包,建立wordcountmapper,wordcountreducer和wordcountdrive類:

wordcountdrive:這個類就是mr程式執行時的主類 告訴本類中組裝了一些程式執行時所需的資訊 比如哪個reduce或mapper類 輸入資料在哪 輸出資料在哪 

public class wordcountdrive {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 通過job這個類來封裝本次mr的相關資訊
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        //指定本次mrjobjar包的執行主類
        job.setJarByClass(wordcountdrive.class);
        //指定reducetask的個數
        job.setNumReduceTasks(3);
        //指定重寫的分割槽類
        job.setPartitionerClass(PPartitioner.class);
        //指定本次mr所用的mapper reduce類分別是什麼
        job.setMapperClass(wordcountmapper.class);
        job.setReducerClass(wordcountreduce.class);
        //指定本次mrmapper階段的輸出kv型別
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        //指定本次mr最終輸出的kv型別
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //指定本次mr輸入的資料路徑和最終輸出結果存放於在什麼位置
        FileInputFormat.setInputPaths(job, "/wordcount/input");
        FileOutputFormat.setOutputPath(job, new Path("/wordcount/output"));
        //job.submit();
        //提交程式 並且監控列印執行情況
        boolean b=job.waitForCompletion(true);
        System.exit(b?0:1);
    }

}

 

wordcountmapper:這裡就是map階段具體的業務邏輯實現方法 該方法的呼叫取決於讀取資料的元件有沒有給mr傳入資料 如果有的話 每傳入一個kv對 該方法就會被呼叫一次

/*
 * 這裡就是mapreduce程式 map階段逐漸實現的類
 * <KEYIN, VALUEIN, KEYOUT, VALUEOUT>
 * KEYIN表示mapper資料輸入的時候,在預設的讀取資料元件下叫inputformat,
 * 他的行為是一行一行的讀取待處理的資料,
 * 讀取一行返回一行給我們的mr程式,在這種情況下,
 * KEYIN就表示我們的每一行的起始偏移量
 * 因此資料型別是long型別
 * VALUEIN表示mapper資料輸入的時候value的資料型別,
 * 在預設的資料讀取情況下,valuein就表示讀取的這一行的內容
 * 因此資料型別是string
 * KETOUT表示mapper資料輸出的時候key的資料型別 在本案列種 key是單詞,
 * 所以是string
 * VALUEOUT表示mapper資料輸出的時候value的資料型別 。。。是integer
 * 這裡所說的資料型別是jdk自帶的型別 在序列化時 效率低下
 * 因此hadoop自己封裝了一套資料型別
 */
public class wordcountmapper extends Mapper<LongWritable, Text, Text, IntWritable> {
 /**
     * 這裡就是map階段具體的業務邏輯實現方法 該方法的呼叫取決於讀取資料的元件有沒有給mr傳入資料 如果有的話
     * 沒傳入一個kv對 該方法就會被呼叫一次
     */
    @Override
    protected void map(LongWritable key, Text value,
                  Mapper<LongWritable, Text, Text, IntWritable>.Context context)
            throws IOException, InterruptedException {
        //拿到傳入進來的型別 轉化為string
        String line=value.toString();
        //將這一行內容按照分隔符,進行一行內容的切割
        //切割成一個個單詞陣列
        String[] words=line.split(" ");
        //遍歷陣列 沒出現一個單詞 就標記一個數字1
        //<單詞,1>
        for(String word:words){
            //使用mr的程式的上下文context,吧map階段處理的資料傳送出去
            //作為reduce階段輸入資料
            context.write(new Text(word),new IntWritable(1));
        }
    }
    public static void main(String[] args) {
        // TODO Auto-generated method stub

    }

}

 

wordcountreduce:reduce接受後 按照key的字典序進行排序 按照key是否相同作為一組去呼叫reduce方法 本方法的k就是這一組相同kv對的共同key 把這一組所有的v作為迭代器傳入我們的reduce方法

 

public class wordcountreduce extends
        Reducer<Text, IntWritable, Text, IntWritable> {
    /**
     * reduce接受後 按照key的字典序進行排序 按照key是否相同作為一組去呼叫reduce方法 本方法的k就是這一組相同kv對的共同key
     * 吧這一組所有的v作為迭代器傳入我們的reduce方法
     */
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,
                          Reducer<Text, IntWritable, Text, IntWritable>.Context context)
            throws IOException, InterruptedException {
        // TODO Auto-generated method stub
        // super.reduce(arg0, arg1, arg2);
        int count = 0;
        // 遍歷一組迭代器,吧每一個數量一累加起來 就構成了單詞的總次數
        for (IntWritable value : values) {
            count += value.get();
        }
        // 吧最終的結果輸出
        context.write(key, new IntWritable(count));
    }

    public static void main(String[] args) {
        // TODO Auto-generated method stub

    }

}

PPartitioner :按照首字母ASCII值進行分割槽,因為每個reduce會進行自排序,所以我們只用分好區就好了,排序交給框架。

package cn.itcast.hadoop;
import org.apache.hadoop.mapreduce.Partitioner;
public class PPartitioner extends Partitioner {
    @Override
    public int getPartition(Object o, Object o2, int i) {
        String value = o.toString();
        //取首字母
        char word = value.charAt(0);
        int asc = word;
        if (97 <= asc && asc <= 102) {
            return 0;
        } else if (102 < asc && asc <= 109) {
            return 1;
        }
        return 2;
    }
}

 

將專案打成jar包,打包成功後會在target下看到打好的jar包:

 將這個jar包放在叢集裡執行

 

 

 

 

成功。