1. 程式人生 > 其它 >Hadoop 用Java編寫MapReduce詞頻統計程式並提交到Hadoop叢集執行

Hadoop 用Java編寫MapReduce詞頻統計程式並提交到Hadoop叢集執行

一、MapReduce介紹

MapReduce是一個分散式計算框架,可以部署在Hadoop、Spark等大資料平臺上,實現海量資料的平行計算。它採用“分而治之”的思想,將一個計算任務交給叢集中的多臺機器共同完成,之後再彙總成最終結果。

一般來說讀取一個TB,PB級的檔案,普通計算機的速度是比較慢的,而要想提高速度就要提高計算機的硬體配置,這對於普通使用者來說是很難做到的,也提高了這一領域的門檻。而採用廉價的機器組成分散式系統,只要叢集的機器數量足夠多,那麼計算的速度就會足夠快。

二、MapReduce特點

優點

(1)易於程式設計

完全獨立完成一個MapReduce程式是一個很困難的事情,這需要很強的程式設計能力。好在MapReduce給我們提供了大量的方便開發的介面,我們只需要繼承一些介面,實現一些特定的函式就能完成一個MapReduce程式。

(2)高拓展性

這是一個分散式計算框架,我們可以簡單粗暴的,通過增加機器來提高計算效能。

(3)高容錯性

由於MapReduce叢集採用的大多是廉價的機器,宕機,BUG等都是家常便飯。但MapReduce框架提供多種有效的錯誤檢測和恢復機制。如果一個結點出現了問題,其他結點會接替這個結點的工作,等結點恢復正常後,又可以繼續工作,這些都由Hadoop內部完成。

(4)高吞吐量

MapReduce可以對PB級以上也就是1024TB的資料進行離線計算。

缺點

(1)難以實時計算

MapReduce處理的是磁碟上的資料。

(2)不能流式計算

MapReduce處理的是磁碟上的靜態資料,而流式計算的輸入資料的動態的。

(3)難以用於DAG計算

DAG(有向無環圖)多個任務間存在依賴關係,後一個應用的輸入可能是前一個應用的輸出。而MapReduce的輸出結果都會寫在磁碟上,這會造成大量的磁碟IO,降低叢集的效能。

三、MapReduce程式設計

以詞頻統計程式為例

pom依賴

https://mvnrepository.com/尋找以下幾個依賴
hadoop-common、hadoop-hdfs、hadoop-mapreduce-client-core、junit
外掛:maven-compiler-plugin、maven-shade-plugin
下面是我的pom檔案:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn.pineapple</groupId>
    <artifactId>MapReduceTest</artifactId>
<!--    <packaging>pom</packaging>-->
    <version>1.0-SNAPSHOT</version>
    <properties>
        <hadoop.version>2.6.0</hadoop.version>
    </properties>
    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapreduce-client-core -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/junit/junit -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>RELEASE</version>
<!--            <scope>test</scope>-->
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.maven.plugins/maven-shade-plugin -->
<!--        <dependency>-->
<!--            <groupId>org.apache.maven.plugins</groupId>-->
<!--            <artifactId>maven-shade-plugin</artifactId>-->
<!--            <version>2.4.3</version>-->
<!--        </dependency>-->
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <minimizeJar>true</minimizeJar>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

要做統計的檔案內容為:

Hello,Hadoop,BigData
Hello,Hadoop,MapReduce
Hello,Hadoop,HDFS
BigData,Perfect

MapReduce由Map和Reduce兩個階段組成。

Map

在此之前還有一個讀取檔案的操作,這個只要在主類中指定一下就好,不需要寫。

讀取後文件的內容為:

//K1	V1
0 		Hello,Hadoop,BigData
21 		Hello,Hadoop,MapReduce
44 		Hello,Hadoop,HDFS
62 		BigData,Perfect

Map後的檔案內容為:

//K2		V2
Hello 		1
Hadoop 		1
BigData 	1
Hello 		1
Hadoop 		1
MapReduce 	1
Hello 		1
Hadoop 		1
HDFS 		1
BigData 	1
Perfect 	1

Map表示“對映”,將檔案拆分成多個塊,然後發給叢集上的機器統一計算
要繼承Mapper類並重寫map()函式,將K1,V1轉換成下面的K2,V2

package cn.pineapple.day1;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * 四個泛型的解釋:
 * <p>
 * KEYIN:K1的型別
 * <p>
 * VALUEIN:V1的型別
 * <p>
 * KEYOUT:K2的型別
 * <p>
 * VALUEOUT:V2的型別
 */

public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> {

    /**
     * 將K1,V1轉換成K2,V2
     *
     * @param key:     行偏移量
     * @param value:   一行文字內容
     * @param context: 上下文物件
     * @throws IOException:
     * @throws InterruptedException:
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        Text text = new Text();
        LongWritable longWritable = new LongWritable();
        //1:對一行的文字資料進行拆分
        String[] split = value.toString().split(",");
        //2:遍歷陣列,組裝K2和V2
        for (String word : split) {
            //3:將K2和V2寫入上下文中
            text.set(word);
            longWritable.set(1);
            context.write(text, longWritable);
        }
    }
}

map()函式內的程式碼,只需要針對檔案一行考慮。所以首先要提取每個單詞,用value.toString()將value也就是這一行的內容轉換成String型別,再用split()方法進行拆分。K2的值預設為1。在寫入的時候,還要將K2和V2轉換成Text型別和LongWritable型別。

Reduce

在Map和Reduce中間還有一個shuffle,目前用不到這個shuffle,可以採取預設的方式,它會把K2,V2轉換成:

//K2		V2
Hello 		<1,1,1>
Hadoop 		<1,1,1>
BigData 	<1,1>
HDFS 		<1>
MapReduce 	<1>
Perfect 	<1>

接著Reduce的最終結果是:

//K3 		V3
BigData		2
HDFS		1
Hadoop		3
Hello		3
MapReduce	1
Perfect		1

Reduce表示“歸約”,將所有結果都統一起來,繼承Reducer類並重寫reduce()函式,將K2和V2轉換為K3和V3,也就是最終結果。

package cn.pineapple.day1;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;


/**
 * 四個泛型的解釋:
 * <p>
 * KEYIN:K2型別
 * <p>
 * VALUEIN:V2型別
 * <p>
 * KEYOUT:K3型別
 * <p>
 * VALUEOUT:V3型別
 */
public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {

    /**
     * 將K2和V2轉換為K3和V3,將K3和V3寫入上下文中
     *
     * @param key:     新K2
     * @param values:  新V2
     * @param context: 上下文物件
     * @throws IOException:
     * @throws InterruptedException:
     */
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        long count = 0;
        //1:遍歷集合,將集合中的數字相加,得到V3
        for (LongWritable value : values) {
            count += value.get();
        }
        //2:將K3V3寫入上下文中
        context.write(key, new LongWritable(count));
    }
}

為了實現這個最終結果,K3和K2是一樣的,我們只需要遍歷這個集合,然後相加就能得到V3。value.get()可以將LongWritable型別轉換成long型別,最後在寫入的時候再進行一次轉換。

Main方法

光有這兩個類是不行的,我們還要寫一個Main方法,詳細的列一下任務流程,指定一下任務配置等。
要繼承Configured類並實現Tool介面。

package cn.pineapple.day1;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class JobMain extends Configured implements Tool {

    /**
     * 指定一個job任務
     *
     * @param strings:
     * @return :
     * @throws Exception:
     */
    public int run(String[] strings) throws Exception {
        //1:建立一個job任務物件
        Job wordCount = Job.getInstance(super.getConf(), "WordCount");
        //如果打包執行出錯,則需要加改配置
        wordCount.setJarByClass(JobMain.class);

        //2:配置job任務物件(八個步驟)

        //第一步:指定檔案的讀取方式和
        wordCount.setInputFormatClass(TextInputFormat.class);
        //      讀取路徑
        TextInputFormat.addInputPath(wordCount, new Path("hdfs://nsv:8020/input/wordcount"));

        //第二步:指定Map階段的處理方式和資料型別
        wordCount.setMapperClass(WordCountMapper.class);
        //      設定K2的型別
        wordCount.setMapOutputKeyClass(Text.class);
        //      設定V2的型別
        wordCount.setMapOutputValueClass(LongWritable.class);

        //第三、四、五、六shuffle階段採用預設的方式

        //第七步:指定Reduce階段的處理方式和資料型別
        wordCount.setReducerClass(WordCountReducer.class);
        //      設定K3的型別
        wordCount.setOutputKeyClass(Text.class);
        //      設定V3的型別
        wordCount.setOutputValueClass(LongWritable.class);

        //第八步:指定輸出型別
        wordCount.setOutputFormatClass(TextOutputFormat.class);
        //      輸出路徑
        TextOutputFormat.setOutputPath(wordCount, new Path("hdfs://nsv:8020/output/wordcount"));

        //等待任務結束
        boolean bl = wordCount.waitForCompletion(true);

        return bl ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        //啟動job任務
        int run = ToolRunner.run(configuration, new JobMain(), args);
        System.exit(run);
    }
}

run()方法裡就是本次任務的詳細流程和配置,然後main()方法裡要呼叫上面寫的run方法。它接受一個int型別作為退出程式碼,0或1。

四、打包執行

雙擊package打包,出現BUILD SUCCESS表示打包成功


出現了兩個jar包,origin包是因為用了maven-shade-plugin打包外掛,不會包含依賴jar包,所有體積較小,可以選擇這個jar包執行。

將要統計的檔案放到HDFS上
hdfs dfs -mkdir -p /input/wordcount
hdfs dfs -put wordcount.txt /input/wordcount

將jar包上傳到叢集上後,跑一下嘍
hadoop jar original-MapReduceTest-1.0-SNAPSHOT.jar cn.pineapple.day1.JobMain,cn.pineapple.day1.JobMain是主類的全路徑


completed successfully看來是成功了,去Web上看一下實際結果


點選Download可以檢視下載結果