MapReduce入門(二)合併小檔案
阿新 • • 發佈:2018-12-15
hadoop為什麼要合併小檔案?
小檔案是指檔案size小於HDFS上block大小的檔案。這樣的檔案會給hadoop的擴充套件性和效能帶來嚴重問題。首先,在HDFS中,任何block,檔案或者目錄在記憶體中均以物件的形式儲存,每個物件約佔150byte,如果有1000 0000個小檔案,每個檔案佔用一個block,則namenode大約需要2G空間。如果儲存1億個檔案,則namenode需要20G空間(見參考資料[1][4][5])。這樣namenode記憶體容量嚴重製約了叢集的擴充套件。 其次,訪問大量小檔案速度遠遠小於訪問幾個大檔案。HDFS最初是為流式訪問大檔案開發的,如果訪問大量小檔案,需要不斷的從一個datanode跳到另一個datanode,嚴重影響效能。最後,處理大量小檔案速度遠遠小於處理同等大小的大檔案的速度。每一個小檔案要佔用一個slot,而task啟動將耗費大量時間甚至大部分時間都耗費在啟動task和釋放task上。
一、建立MergeSmallFileJob 類:用於實現合併小檔案的任務(2M一下屬於小檔案)
package cn.itxiaobai; import com.google.common.io.Resources; import com.utils.CDUPUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import java.util.Iterator; /** * 合併小檔案的任務(2M一下屬於小檔案) */ public class MergeSmallFileJob { public static class MergeSmallFileMapper extends Mapper<LongWritable,Text,Text,Text>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //將檔名作為key,內容作為value輸出 //1.獲取檔名 FileSplit inputSplit = (FileSplit) context.getInputSplit(); String fileName = inputSplit.getPath().getName(); //列印檔名以及與之對應的內容 context.write(new Text(fileName),value); } } public static class MergeSmallFileReduce extends Reducer<Text,Text,Text,Text>{ /** * * @param key:檔名 * @param values:一個檔案的所有內容 * @param context * @throws IOException * @throws InterruptedException */ @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { //將迭代器中的內容拼接 Iterator<Text> iterator = values.iterator(); //使用StringBuffer StringBuffer stringBuffer = new StringBuffer(); while (iterator.hasNext()){ stringBuffer.append(iterator.next()).append(","); } //列印 context.write(key,new Text(stringBuffer.toString())); } } public static class MyJob{ public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration coreSiteConf = new Configuration(); coreSiteConf.addResource(Resources.getResource("core-site-local.xml")); //設定一個任務 Job job = Job.getInstance(coreSiteConf, "my small merge big file"); //設定job的執行類 job.setJarByClass(MyJob.class); //設定Map和Reduce處理類 job.setMapperClass(MergeSmallFileMapper.class); job.setReducerClass(MergeSmallFileReduce.class); //map輸出型別 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //設定job/reduce輸出型別 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileSystem fileSystem = FileSystem.get(coreSiteConf); //listFiles:可以迭代便利檔案 RemoteIterator<LocatedFileStatus> listFiles = fileSystem.listFiles(new Path("/"), true); while (listFiles.hasNext()) { LocatedFileStatus fileStatus = listFiles.next(); Path filesPath = fileStatus.getPath(); if (!fileStatus.isDirectory()) { //判斷大小 及格式 if (fileStatus.getLen() < 2 * 1014 * 1024 && filesPath.getName().contains(".txt")) { //檔案輸入路徑 FileInputFormat.addInputPath(job,filesPath); } } } //刪除存在目錄 CDUPUtils.deleteFileName("/mymergeout"); FileOutputFormat.setOutputPath(job, new Path("/mymergeout")); //執行任務 boolean flag = job.waitForCompletion(true); if (flag){ System.out.println("檔案讀取內容如下:"); CDUPUtils.readContent("/mymergeout/part-r-00000"); }else { System.out.println("檔案載入失敗...."); } } } }
二、裡面用到自己寫的工具類CDUPUtils :用於刪除已存在目錄以及閱讀檔案內容
package com.utils; import com.google.common.io.Resources; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import java.io.IOException; import java.io.InputStreamReader; import java.io.LineNumberReader; import java.util.ArrayList; public class CDUPUtils { //刪除已經存在在hdfs上面的檔案檔案 public static void deleteFileName(String path) throws IOException { //將要刪除的檔案 Path fileName = new Path(path); Configuration entries = new Configuration(); //解析core-site-master2.xml檔案 entries.addResource(Resources.getResource("core-site-local.xml")); //coreSiteConf.set(,);--在這裡可以新增配置檔案 FileSystem fileSystem = FileSystem.get(entries); if (fileSystem.exists(fileName)){ System.out.println(fileName+"已經存在,正在刪除它..."); boolean flag = fileSystem.delete(fileName, true); if (flag){ System.out.println(fileName+"刪除成功"); }else { System.out.println(fileName+"刪除失敗!"); return; } } //關閉資源 fileSystem.close(); } //讀取檔案內容 public static void readContent(String path) throws IOException { //將要讀取的檔案路徑 Path fileName = new Path(path); ArrayList<String> returnValue = new ArrayList<String>(); Configuration configuration = new Configuration(); configuration.addResource(Resources.getResource("core-site-local.xml")); //獲取客戶端系統檔案 FileSystem fileSystem = FileSystem.get(configuration); //open開啟檔案--獲取檔案的輸入流用於讀取資料 FSDataInputStream inputStream = fileSystem.open(fileName); InputStreamReader inputStreamReader = new InputStreamReader(inputStream); //一行一行的讀取資料 LineNumberReader lineNumberReader = new LineNumberReader(inputStreamReader); //定義一個字串變數用於接收每一行的資料 String str = null; //判斷何時沒有資料 while ((str=lineNumberReader.readLine())!=null){ returnValue.add(str); } //列印資料到控制檯 System.out.println("MapReduce演算法操作的檔案內容如下:"); for (String read : returnValue) { System.out.println(read); } //關閉資源 lineNumberReader.close(); inputStream.close(); inputStreamReader.close(); } }
配置檔案:cort-site.xml
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://master2:9000</value>
</property>
<property>
<name>fs.hdfs.impl</name>
<value>org.apache.hadoop.hdfs.DistributedFileSystem</value>
</property>
</configuration>
pom中新增的依賴
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.zhiyou100</groupId>
<artifactId>mrdemo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<org.apache.hadoop.version>2.7.5</org.apache.hadoop.version>
</properties>
<!--分散式計算-->
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${org.apache.hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
<version>${org.apache.hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>${org.apache.hadoop.version}</version>
</dependency>
<!--分散式儲存-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${org.apache.hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${org.apache.hadoop.version}</version>
</dependency>
<!--資料庫驅動-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.46</version>
</dependency>
</dependencies>
</project>
在本地直接執行(右擊Run)測試