複合式MapReduce之ChainJob
依賴式的場景
我們可以設想一下MapReduce有2個子任務job1,job2構成,job2要在job1完成之後才執行。
job1:用上篇寫的合併小檔案
job2:使用單詞計數
這種關係就叫複雜資料依賴關係的組合時mapreduce。
hadoop為這種組合關係提供了一種執行和控制機制,hadoop通過job和jobControl類提供具體的程式設計方法。
Job除了維護子任務的配置資訊,還維護子任務的依賴關係。 而jobControl控制整個作業流程,把所有的子任務作業加入到JobControl中,執行JobControl的run()方法即可執行程式
一、編寫WcDMergeChainJob:實現複合式MapReduce
package cn.itxiaobai; import com.google.common.io.Resources; import com.utils.CDUPUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob; import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import java.util.List; public class WcDMergeChainJob { public static void main(String[] args) throws IOException, InterruptedException { //建立配置 Configuration configuration = new Configuration(); //載入配置檔案 configuration.addResource(Resources.getResource("core-site-local.xml")); //合併小檔案 Job job1 = setJob1(); //單詞計數 Job job2 = setJob2(); ControlledJob controlledJob1 = new ControlledJob(configuration); controlledJob1.setJob(job1); ControlledJob controlledJob2 = new ControlledJob(configuration); controlledJob2.setJob(job2); //設定依賴關係 controlledJob2.addDependingJob(controlledJob1); JobControl jobControl = new JobControl("word count depend merge small file"); jobControl.addJob(controlledJob1); jobControl.addJob(controlledJob2); //開啟執行緒執行jobControl new Thread(jobControl).start(); //列印正在執行執行緒的詳情 while (true){ List<ControlledJob> jobList = jobControl.getRunningJobList(); System.out.println(jobList); Thread.sleep(5000); } } private static Job setJob2() throws IOException { //建立配置 Configuration configuration = new Configuration(); //載入配置檔案 configuration.addResource(Resources.getResource("core-site-local.xml")); //設定一個任務 Job job2 = Job.getInstance(configuration, "my word count local"); //設定執行的job主類 job2.setJarByClass(WCJob.WCJober.class); //設定Map的載入類以及輸出型別 job2.setMapperClass(WCJob.WCMapper.class); job2.setMapOutputKeyClass(Text.class); job2.setMapOutputValueClass(IntWritable.class); //設定Reduce的載入類以及job/reduce的輸出型別 job2.setReducerClass(WCJob.WCReducer.class); job2.setOutputKeyClass(Text.class); job2.setOutputValueClass(IntWritable.class); //建立任務輸入與輸出路徑 FileInputFormat.addInputPath(job2,new Path("/mymergeout")); //在輸出路徑之前判斷一下是否存在,如果存在刪除 CDUPUtils.deleteFileName("/mywcout"); FileOutputFormat.setOutputPath(job2,new Path("/mywcout")); return job2; } private static Job setJob1() throws IOException { Configuration coreSiteConf = new Configuration(); coreSiteConf.addResource(Resources.getResource("core-site-local.xml")); //設定一個任務 Job job1 = Job.getInstance(coreSiteConf, "my small merge big file"); //設定job的執行類 job1.setJarByClass(MergeSmallFileJob.MyJob.class); //設定Map和Reduce處理類 job1.setMapperClass(MergeSmallFileJob.MergeSmallFileMapper.class); job1.setReducerClass(MergeSmallFileJob.MergeSmallFileReduce.class); //map輸出型別 job1.setMapOutputKeyClass(Text.class); job1.setMapOutputValueClass(Text.class); //設定job/reduce輸出型別 job1.setOutputKeyClass(Text.class); job1.setOutputValueClass(Text.class); FileSystem fileSystem = FileSystem.get(coreSiteConf); //listFiles:可以迭代便利檔案 RemoteIterator<LocatedFileStatus> listFiles = fileSystem.listFiles(new Path("/"), true); while (listFiles.hasNext()) { LocatedFileStatus fileStatus = listFiles.next(); Path filesPath = fileStatus.getPath(); if (!fileStatus.isDirectory()) { //判斷大小 及格式 if (fileStatus.getLen() < 2 * 1014 * 1024 && filesPath.getName().contains(".txt")) { //檔案輸入路徑 FileInputFormat.addInputPath(job1,filesPath); } } } //刪除存在目錄 CDUPUtils.deleteFileName("/mymergeout"); FileOutputFormat.setOutputPath(job1, new Path("/mymergeout")); return job1; } }
二、裡面用到自己寫的工具類CDUPUtils :用於刪除已存在目錄以及閱讀檔案內容
package com.utils;
import com.google.common.io.Resources;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.LineNumberReader;
import java.util.ArrayList;
public class CDUPUtils {
//刪除已經存在在hdfs上面的檔案檔案
public static void deleteFileName(String path) throws IOException {
//將要刪除的檔案
Path fileName = new Path(path);
Configuration entries = new Configuration();
//解析core-site-master2.xml檔案
entries.addResource(Resources.getResource("core-site-local.xml"));
//coreSiteConf.set(,);--在這裡可以新增配置檔案
FileSystem fileSystem = FileSystem.get(entries);
if (fileSystem.exists(fileName)){
System.out.println(fileName+"已經存在,正在刪除它...");
boolean flag = fileSystem.delete(fileName, true);
if (flag){
System.out.println(fileName+"刪除成功");
}else {
System.out.println(fileName+"刪除失敗!");
return;
}
}
//關閉資源
fileSystem.close();
}
//讀取檔案內容
public static void readContent(String path) throws IOException {
//將要讀取的檔案路徑
Path fileName = new Path(path);
ArrayList<String> returnValue = new ArrayList<String>();
Configuration configuration = new Configuration();
configuration.addResource(Resources.getResource("core-site-local.xml"));
//獲取客戶端系統檔案
FileSystem fileSystem = FileSystem.get(configuration);
//open開啟檔案--獲取檔案的輸入流用於讀取資料
FSDataInputStream inputStream = fileSystem.open(fileName);
InputStreamReader inputStreamReader = new InputStreamReader(inputStream);
//一行一行的讀取資料
LineNumberReader lineNumberReader = new LineNumberReader(inputStreamReader);
//定義一個字串變數用於接收每一行的資料
String str = null;
//判斷何時沒有資料
while ((str=lineNumberReader.readLine())!=null){
returnValue.add(str);
}
//列印資料到控制檯
System.out.println("MapReduce演算法操作的檔案內容如下:");
for (String read :
returnValue) {
System.out.println(read);
}
//關閉資源
lineNumberReader.close();
inputStream.close();
inputStreamReader.close();
}
}
三、配置檔案:cort-site-local.xml
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://master2:9000</value>
</property>
<property>
<name>fs.hdfs.impl</name>
<value>org.apache.hadoop.hdfs.DistributedFileSystem</value>
</property>
</configuration>
四、pom中新增的依賴
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.zhiyou100</groupId> <artifactId>mrdemo</artifactId> <version>1.0-SNAPSHOT</version> <properties> <org.apache.hadoop.version>2.7.5</org.apache.hadoop.version> </properties> <!--分散式計算--> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>${org.apache.hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-common</artifactId> <version>${org.apache.hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-jobclient</artifactId> <version>${org.apache.hadoop.version}</version> </dependency> <!--分散式儲存--> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${org.apache.hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>${org.apache.hadoop.version}</version> </dependency> </dependencies> </project>
五、在本地直接執行(右擊Run)測試
表示正在執行job1
表示正在執行job2
執行結束--即成功!!!!