1. 程式人生 > >複合式MapReduce之ChainJob

複合式MapReduce之ChainJob

依賴式的場景

我們可以設想一下MapReduce有2個子任務job1,job2構成,job2要在job1完成之後才執行。

job1:用上篇寫的合併小檔案

job2:使用單詞計數

這種關係就叫複雜資料依賴關係的組合時mapreduce。

hadoop為這種組合關係提供了一種執行和控制機制,hadoop通過job和jobControl類提供具體的程式設計方法。

Job除了維護子任務的配置資訊,還維護子任務的依賴關係。 而jobControl控制整個作業流程,把所有的子任務作業加入到JobControl中,執行JobControl的run()方法即可執行程式

一、編寫WcDMergeChainJob:實現複合式MapReduce

package cn.itxiaobai;

import com.google.common.io.Resources;
import com.utils.CDUPUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.List;

public class WcDMergeChainJob {
    public static void main(String[] args) throws IOException, InterruptedException {
        //建立配置
        Configuration configuration = new Configuration();
        //載入配置檔案
        configuration.addResource(Resources.getResource("core-site-local.xml"));
        //合併小檔案
        Job job1 = setJob1();
        //單詞計數
        Job job2 = setJob2();

        ControlledJob controlledJob1 = new ControlledJob(configuration);
        controlledJob1.setJob(job1);

        ControlledJob controlledJob2 = new ControlledJob(configuration);
        controlledJob2.setJob(job2);

        //設定依賴關係
        controlledJob2.addDependingJob(controlledJob1);

        JobControl jobControl = new JobControl("word count depend merge small file");

        jobControl.addJob(controlledJob1);
        jobControl.addJob(controlledJob2);
        //開啟執行緒執行jobControl
        new Thread(jobControl).start();

        //列印正在執行執行緒的詳情
        while (true){
            List<ControlledJob> jobList = jobControl.getRunningJobList();
            System.out.println(jobList);
            Thread.sleep(5000);
        }
    }

    private static Job setJob2() throws IOException {
        //建立配置
        Configuration configuration = new Configuration();
        //載入配置檔案
        configuration.addResource(Resources.getResource("core-site-local.xml"));
        //設定一個任務
        Job job2 = Job.getInstance(configuration, "my word count local");
        //設定執行的job主類
        job2.setJarByClass(WCJob.WCJober.class);
        //設定Map的載入類以及輸出型別
        job2.setMapperClass(WCJob.WCMapper.class);
        job2.setMapOutputKeyClass(Text.class);
        job2.setMapOutputValueClass(IntWritable.class);
        //設定Reduce的載入類以及job/reduce的輸出型別
        job2.setReducerClass(WCJob.WCReducer.class);
        job2.setOutputKeyClass(Text.class);
        job2.setOutputValueClass(IntWritable.class);

        //建立任務輸入與輸出路徑
        FileInputFormat.addInputPath(job2,new Path("/mymergeout"));
        //在輸出路徑之前判斷一下是否存在,如果存在刪除
        CDUPUtils.deleteFileName("/mywcout");
        FileOutputFormat.setOutputPath(job2,new Path("/mywcout"));
        return job2;
    }

    private static Job setJob1() throws IOException {
        Configuration coreSiteConf = new Configuration();
        coreSiteConf.addResource(Resources.getResource("core-site-local.xml"));
        //設定一個任務
        Job job1 = Job.getInstance(coreSiteConf, "my small merge big file");
        //設定job的執行類
        job1.setJarByClass(MergeSmallFileJob.MyJob.class);

        //設定Map和Reduce處理類
        job1.setMapperClass(MergeSmallFileJob.MergeSmallFileMapper.class);
        job1.setReducerClass(MergeSmallFileJob.MergeSmallFileReduce.class);

        //map輸出型別
        job1.setMapOutputKeyClass(Text.class);
        job1.setMapOutputValueClass(Text.class);
        //設定job/reduce輸出型別
        job1.setOutputKeyClass(Text.class);
        job1.setOutputValueClass(Text.class);

        FileSystem fileSystem = FileSystem.get(coreSiteConf);
        //listFiles:可以迭代便利檔案
        RemoteIterator<LocatedFileStatus> listFiles = fileSystem.listFiles(new Path("/"), true);
        while (listFiles.hasNext()) {
            LocatedFileStatus fileStatus = listFiles.next();
            Path filesPath = fileStatus.getPath();
            if (!fileStatus.isDirectory()) {
                //判斷大小 及格式
                if (fileStatus.getLen() < 2 * 1014 * 1024 && filesPath.getName().contains(".txt")) {
                    //檔案輸入路徑
                    FileInputFormat.addInputPath(job1,filesPath);
                }
            }
        }

        //刪除存在目錄
        CDUPUtils.deleteFileName("/mymergeout");

        FileOutputFormat.setOutputPath(job1, new Path("/mymergeout"));
        return job1;
    }
}

二、裡面用到自己寫的工具類CDUPUtils :用於刪除已存在目錄以及閱讀檔案內容

package com.utils;   import com.google.common.io.Resources; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path;   import java.io.IOException; import java.io.InputStreamReader; import java.io.LineNumberReader; import java.util.ArrayList;   public class CDUPUtils {     //刪除已經存在在hdfs上面的檔案檔案     public static void deleteFileName(String path) throws IOException {         //將要刪除的檔案         Path fileName = new Path(path);         Configuration entries = new Configuration();         //解析core-site-master2.xml檔案         entries.addResource(Resources.getResource("core-site-local.xml"));         //coreSiteConf.set(,);--在這裡可以新增配置檔案         FileSystem fileSystem = FileSystem.get(entries);         if (fileSystem.exists(fileName)){             System.out.println(fileName+"已經存在,正在刪除它...");             boolean flag = fileSystem.delete(fileName, true);             if (flag){                 System.out.println(fileName+"刪除成功");             }else {                 System.out.println(fileName+"刪除失敗!");                 return;             }         }         //關閉資源         fileSystem.close();     }       //讀取檔案內容     public static void readContent(String path) throws IOException {         //將要讀取的檔案路徑         Path fileName = new Path(path);         ArrayList<String> returnValue = new ArrayList<String>();         Configuration configuration = new Configuration();         configuration.addResource(Resources.getResource("core-site-local.xml"));         //獲取客戶端系統檔案         FileSystem fileSystem = FileSystem.get(configuration);         //open開啟檔案--獲取檔案的輸入流用於讀取資料         FSDataInputStream inputStream = fileSystem.open(fileName);         InputStreamReader inputStreamReader = new InputStreamReader(inputStream);         //一行一行的讀取資料         LineNumberReader lineNumberReader = new LineNumberReader(inputStreamReader);         //定義一個字串變數用於接收每一行的資料         String str = null;         //判斷何時沒有資料         while ((str=lineNumberReader.readLine())!=null){             returnValue.add(str);         }         //列印資料到控制檯         System.out.println("MapReduce演算法操作的檔案內容如下:");         for (String read :                 returnValue) {             System.out.println(read);         }         //關閉資源         lineNumberReader.close();         inputStream.close();         inputStreamReader.close();     } }

三、配置檔案:cort-site-local.xml

<?xml version="1.0" encoding="UTF-8"?> <configuration>     <property>         <name>fs.defaultFS</name>         <value>hdfs://master2:9000</value>     </property>     <property>         <name>fs.hdfs.impl</name>         <value>org.apache.hadoop.hdfs.DistributedFileSystem</value>     </property> </configuration>

四、pom中新增的依賴 

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">     <modelVersion>4.0.0</modelVersion>       <groupId>com.zhiyou100</groupId>     <artifactId>mrdemo</artifactId>     <version>1.0-SNAPSHOT</version>       <properties>         <org.apache.hadoop.version>2.7.5</org.apache.hadoop.version>     </properties>       <!--分散式計算-->     <dependencies>         <dependency>             <groupId>org.apache.hadoop</groupId>             <artifactId>hadoop-mapreduce-client-core</artifactId>             <version>${org.apache.hadoop.version}</version>         </dependency>             <dependency>                 <groupId>org.apache.hadoop</groupId>                 <artifactId>hadoop-mapreduce-client-common</artifactId>                 <version>${org.apache.hadoop.version}</version>             </dependency>             <dependency>                 <groupId>org.apache.hadoop</groupId>                 <artifactId>hadoop-mapreduce-client-jobclient</artifactId>                 <version>${org.apache.hadoop.version}</version>             </dependency>           <!--分散式儲存-->         <dependency>             <groupId>org.apache.hadoop</groupId>             <artifactId>hadoop-common</artifactId>             <version>${org.apache.hadoop.version}</version>         </dependency>         <dependency>             <groupId>org.apache.hadoop</groupId>             <artifactId>hadoop-hdfs</artifactId>             <version>${org.apache.hadoop.version}</version>         </dependency>         </dependencies> </project>

五、在本地直接執行(右擊Run)測試

表示正在執行job1

表示正在執行job2

執行結束--即成功!!!!