1. 程式人生 > >(2)Hadoop核心-java程式碼對MapReduce的操作

(2)Hadoop核心-java程式碼對MapReduce的操作

上一篇檔案介紹了java程式碼怎麼操作hdfs檔案的,hdfs理念“就是一切皆檔案”,我們現在搞定了怎麼使用java上傳下載等操作了接下來就要處理檔案了,hadoop的mapreduce模組。

一、Hadoop Map/Reduce框架

       Hadoop Map/Reduce是一個使用簡易的軟體框架,基於它寫出來的應用程式能夠執行在由上千個商用機器組成的大型叢集上,並以一種可靠容錯的方式並行處理上T級別的資料集。

      一個Map/Reduce 作業(job) 通常會把輸入的資料集切分為若干獨立的資料塊,由 map任務(task)

以完全並行的方式處理它們。框架會對map的輸出先進行排序, 然後把結果輸入給reduce任務。通常作業的輸入和輸出都會被儲存在檔案系統中。 整個框架負責任務的排程和監控,以及重新執行已經失敗的任務。

       通常,Map/Reduce框架和分散式檔案系統是執行在一組相同的節點上的,也就是說,計算節點和儲存節點通常在一起。這種配置允許框架在那些已經存好資料的節點上高效地排程任務,這可以使整個叢集的網路頻寬被非常高效地利用。

      Map/Reduce框架由一個單獨的master JobTracker 和每個叢集節點一個slave TaskTracker共同組成。master負責排程構成一個作業的所有任務,這些任務分佈在不同的slave上,master監控它們的執行,重新執行已經失敗的任務。而slave僅負責執行由master指派的任務。

      應用程式至少應該指明輸入/輸出的位置(路徑),並通過實現合適的介面或抽象類提供map和reduce函式。再加上其他作業的引數,就構成了作業配置(job configuration)。然後,Hadoop的 job client提交作業(jar包/可執行程式等)和配置資訊給JobTracker,後者負責分發這些軟體和配置資訊給slave、排程任務並監控它們的執行,同時提供狀態和診斷資訊給job-client。

注:以上Hadoop Map/Reduce摘自hadoop官方介紹,地址:http://hadoop.apache.org/docs/r1.0.4/cn/mapred_tutorial.html

二、環境準備

1.已經搭建好的linux虛擬機器,地址為192.168.2.2

2.IDEA 開發編輯器,springboot專案

3.下載一個部小說(本文使用著名小說:三國演義 sgyy.txt,檔案網上隨便下載的可能不是全集)

4.上一篇中的專案基礎,程式碼地址:程式碼地址

三、開發編碼

1.啟動hdfs服務

使用xshell連線之前搭建的虛擬機器,並啟動hdfs服務

cd /usr/local/hadoop/hadoop-3.1.1

sbin/start-all.sh

2.上傳檔案到hdfs

在上一篇文章的基礎上上傳三國演義(sgyy.txt)檔案到java目錄,用作讀取的檔案,上傳成功後我們開啟hdfs檔案管理系統檢視。

3.編寫WordCount程式,計算出指定資料集中指定單詞出現的次數

我們後面會使用到分詞器,所以需要先新增一個jar包

<dependency>
	 <groupId>cn.bestwu</groupId>
	 <artifactId>ik-analyzers</artifactId>
	 <version>5.1.0</version>
</dependency>

mapReduce的工作原理簡單說就是map負責將檔案安裝使用者指定規則拆分成key,value格式的資料,reduce負責將拆成的資料進行統計並輸出到另外一個檔案中。

現在寫新建一個 WordCountMap 類用來接收檔案,這裡使用到了分詞器,因為預設map讀取檔案是按照行讀取的,也就是key是一行的內容,value為數字1,使用分詞器後會將行內容拆分成我們常用的詞語,比如中國會拆分成 <中,1><國,1><中國,1>

這種才是符合我們的要求的。我們拆分的是三國演義,會把sgyy.txt 中出現的文字統計一遍,下圖是統計後的結果,統計前單詞都是一個一個的,reduce負責將key相同的單詞累加起來並輸出到檔案。

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.wltea.analyzer.core.IKSegmenter;
import org.wltea.analyzer.core.Lexeme;

import java.io.*;
import java.util.StringTokenizer;

/**
 * 類或方法的功能描述 :統計單個字元出現的次數
 *
 * @date: 2018-12-05 15:37
 */
public class WordCountMap extends Mapper<Object, Text, Text, IntWritable> {

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    @Override
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
//        // 防止中文亂碼
        String line = new String(value.getBytes(), 0, value.getLength(), "GBK");
//        // 未使用分詞器,分隔檔案行預設為空格,也就是一行的內容作為key,value就是數量1即<行內容,1>
//        StringTokenizer itr = new StringTokenizer(line);
//        while (itr.hasMoreTokens()) {
//            word.set(itr.nextToken());
//            context.write(word, one);
//        }

        // 使用分詞器,分隔檔案行內容根據常用的短語分隔,比如我們,被分隔成 <我,1>,<們,1><我們,1>
        byte[] btValue = line.getBytes();
        InputStream inputStream = new ByteArrayInputStream(btValue);
        Reader reader = new InputStreamReader(inputStream);
        IKSegmenter ikSegmenter = new IKSegmenter(reader, true);
        Lexeme lexeme;
        while ((lexeme = ikSegmenter.next()) != null) {
            word.set(lexeme.getLexemeText());
            context.write(word, one);
        }
    }
}

接下來再新建一個 WordCountReduce 類用來統計map中切分的單詞,這裡我們統計三國演義中曹操和孫權出現的次數並輸出到控制檯。

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
 * 類或方法的功能描述 : 統計單個字元出現的次數
 *
 * @date: 2018-12-05 18:29
 */
public class WordCountReduce extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();
    private String text = "孫權";
    private int textSum = 0;
    private List<String> textList = null;

    public WordCountReduce() {
        textList = new ArrayList<>();
        textList.add("曹操");
        textList.add("孫權");
    }

    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        result.set(sum);
        context.write(key, result);

        String keyStr = key.toString();
        // 未使用分詞器,需要根據map傳過來的行內容檢索並累加
//        boolean isHas = keyStr.contains(text);
//        if (isHas) {
//            textSum++;
//            System.out.println("============ " + text + " 統計分詞為: " + textSum + " ============");
//        }

        // 使用分詞器,內容已經被統計好了,直接輸出即可
        if (textList.contains(keyStr)) {
            System.out.println("============ " + keyStr + " 統計分詞為: " + sum + " ============");
        }
    }
}

新建一個 ReduceJobsUtils 類用來配置hadoop的執行環境,跟上一篇的 HdfsUtils 類似,這裡用到的是 getWordCountJobsConf 方法。

import com.sunvalley.hadoop.reduce.mapper.WeatherMap;
import com.sunvalley.hadoop.reduce.mapper.WordCount;
import com.sunvalley.hadoop.reduce.mapper.WordCountMap;
import com.sunvalley.hadoop.reduce.reducer.WeatherReduce;
import com.sunvalley.hadoop.reduce.reducer.WordCountReduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.io.IOException;

/**
 * 類或方法的功能描述 : Map/Reduce工具類
 *
 * @date: 2018-12-04 14:16
 */
@Component
public class ReduceJobsUtils {
    @Value("${hdfs.path}")
    private String path;

    private static String hdfsPath;

    /**
     * 獲取HDFS配置資訊
     * @return
     */
    public static Configuration getConfiguration() {
        Configuration configuration = new Configuration();
        configuration.set("fs.defaultFS", hdfsPath);
        configuration.set("mapred.job.tracker", hdfsPath);
        return configuration;
    }

    /**
     * 獲取單詞統計的配置資訊
     * @param jobName
     * @return
     */
    public static void getWordCountJobsConf(String jobName, String inputPath, String outputPath) throws IOException , ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration(getConfiguration());
        Job job = Job.getInstance(conf, jobName);
        job.setMapperClass(WordCountMap.class);
        job.setCombinerClass(WordCountReduce.class);
        job.setReducerClass(WordCountReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(inputPath));
        FileOutputFormat.setOutputPath(job, new Path(outputPath));
        job.waitForCompletion(true);
    }

    /**
     * 單詞統計
     * @param jobName
     * @return
     * @throws IOException
     * @throws ClassNotFoundException
     * @throws InterruptedException
     */
    public static void wordCount(String jobName, String inputPath, String outputPath) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration(getConfiguration());
        Job job = Job.getInstance(conf, jobName);
        job.setJarByClass(WordCount.class);
        job.setMapperClass(WordCount.TokenizerMapper.class);
        job.setCombinerClass(WordCount.IntSumReducer.class);
        job.setReducerClass(WordCount.IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(inputPath));
        FileOutputFormat.setOutputPath(job, new Path(outputPath));
        job.waitForCompletion(true);
//        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

    /**
     * 獲取單詞一年最高氣溫計算配置
     * @param jobName
     * @return
     */
    public static JobConf getWeatherJobsConf(String jobName) {
        JobConf jobConf = new JobConf(getConfiguration());
        jobConf.setJobName(jobName);
        jobConf.setOutputKeyClass(Text.class);
        jobConf.setOutputValueClass(LongWritable.class);
        jobConf.setMapperClass(WeatherMap.class);
        jobConf.setCombinerClass(WeatherReduce.class);
        jobConf.setReducerClass(WeatherReduce.class);
        jobConf.setInputFormat(TextInputFormat.class);
        jobConf.setOutputFormat(TextOutputFormat.class);
        return jobConf;
    }


    @PostConstruct
    public void getPath() {
        hdfsPath = this.path;
    }

    public static String getHdfsPath() {
        return hdfsPath;
    }
} 

因為我使用的是RPC的方式呼叫mapReduce方法,使用postman傳送請求,所以我們再新建一個 MapReduceService 和 MapReduceController 類。MapReduceService 中的wordCount方法中我們默認了輸出目錄為/output + jobName

import com.sunvalley.hadoop.util.HdfsUtil;
import com.sunvalley.hadoop.util.ReduceJobsUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.springframework.stereotype.Service;


/**
 * 類或方法的功能描述 : 單詞統計
 *
 * @date: 2018-12-05 19:02
 */
@Service
public class MapReduceService {
    // 預設reduce輸出目錄
    private static final String OUTPUT_PATH = "/output";

    /**
     * 單詞統計,統計某個單詞出現的次數
     * @param jobName
     * @param inputPath
     * @throws Exception
     */
    public void wordCount(String jobName, String inputPath) throws Exception {
        if (StringUtils.isEmpty(jobName) || StringUtils.isEmpty(inputPath)) {
            return;
        }
        // 輸出目錄 = output/當前Job,如果輸出路徑存在則刪除,保證每次都是最新的
        String outputPath = OUTPUT_PATH + "/" + jobName;
        if (HdfsUtil.existFile(outputPath)) {
            HdfsUtil.deleteFile(outputPath);
        }
        ReduceJobsUtils.getWordCountJobsConf(jobName, inputPath, outputPath);
    }

    /**
     * 單詞統計,統計所有分詞出現的次數
     * @param jobName
     * @param inputPath
     * @throws Exception
     */
    public void newWordCount(String jobName, String inputPath) throws Exception {
        if (StringUtils.isEmpty(jobName) || StringUtils.isEmpty(inputPath)) {
            return;
        }
        // 輸出目錄 = output/當前Job,如果輸出路徑存在則刪除,保證每次都是最新的
        String outputPath = OUTPUT_PATH + "/" + jobName;
        if (HdfsUtil.existFile(outputPath)) {
            HdfsUtil.deleteFile(outputPath);
        }
        ReduceJobsUtils.wordCount(jobName, inputPath, outputPath);
    }

    /**
     * 一年最高氣溫統計
     * @param jobName
     * @param inputPath
     * @throws Exception
     */
    public void weather(String jobName, String inputPath) throws Exception {
        if (StringUtils.isEmpty(jobName) || StringUtils.isEmpty(inputPath)) {
            return;
        }
        // 輸出目錄 = output/當前Job
        String outputPath = OUTPUT_PATH + "/" + jobName;
        if (HdfsUtil.existFile(outputPath)) {
            HdfsUtil.deleteFile(outputPath);
        }
        JobConf jobConf = ReduceJobsUtils.getWeatherJobsConf(jobName);
        FileInputFormat.setInputPaths(jobConf, new Path(inputPath));
        FileOutputFormat.setOutputPath(jobConf, new Path(outputPath));
        JobClient.runJob(jobConf);
    }
} 
import com.sunvalley.hadoop.VO.BaseReturnVO;
import com.sunvalley.hadoop.reduce.service.MapReduceService;
import com.sunvalley.hadoop.util.HdfsUtil;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

/**
 * 類或方法的功能描述 :TODO
 * @date: 2018-12-05 19:10
 */
@RestController
@RequestMapping("/hadoop/reduce")
public class MapReduceController {
    @Autowired
    MapReduceService mapReduceService;

    /**
     * 單詞統計
     * @param jobName
     * @param inputPath
     * @return
     */
    @PostMapping("/wordCount")
    public BaseReturnVO wordCount(@RequestParam("jobName") String jobName, @RequestParam("inputPath") String inputPath) throws  Exception{
        if (StringUtils.isEmpty(jobName) || StringUtils.isEmpty(inputPath)) {
            return new BaseReturnVO("請求引數為空");
        }
        mapReduceService.wordCount(jobName, inputPath);
        return new BaseReturnVO("單詞統計成功");
    }

    /**
     * 單詞統計
     * @param jobName
     * @param inputPath
     * @return
     * @throws Exception
     */
    @PostMapping("/newWordCount")
    public BaseReturnVO newWordCount(@RequestParam("jobName") String jobName, @RequestParam("inputPath") String inputPath) throws  Exception{
        if (StringUtils.isEmpty(jobName) || StringUtils.isEmpty(inputPath)) {
            return new BaseReturnVO("請求引數為空");
        }
        mapReduceService.newWordCount(jobName, inputPath);
        return new BaseReturnVO("單詞統計成功");
    }

    /**
     * 一年最高氣溫統計
     * @param jobName
     * @param inputPath
     * @return
     */
    @PostMapping("/weather")
    public BaseReturnVO weather(@RequestParam("jobName") String jobName, @RequestParam("inputPath") String inputPath) throws  Exception{
        if (StringUtils.isEmpty(jobName) || StringUtils.isEmpty(inputPath)) {
            return new BaseReturnVO("請求引數為空");
        }
        mapReduceService.weather(jobName, inputPath);
        return new BaseReturnVO("溫度統計成功");
    }
} 

 好了,到此程式碼就已經全部寫完了,接下來我們啟動專案,使用postman呼叫一下

統計成功後我們檢視控制檯輸出,我們可以看到sgyy.txt檔案中曹操出現 913次,孫權出現 316次

reduce結果輸出到檔案,我統一將輸出的目錄設定為了 /output/xxx,xxx為reduce的jobName

使用postman下載輸出檔案

我們把輸出檔案下載到了D盤,現在開啟檔案內容,我們可以清楚的看到三國演義中每個單詞出現的次數分別是多少