1. 程式人生 > >Hadoop2.x深入詳解

Hadoop2.x深入詳解

HDFS分散式檔案系統詳解

       NameNode:是一箇中心伺服器,單一節點,負責管理檔案系統的名稱空間以及客戶端對檔案的訪問;NameNode負責檔案元資料的操作,DataNode負責處理檔案內容的讀寫請求,跟檔案內容相關的資料流不經過NameNode,只會詢問他與哪一個DataNode聯絡,否則NameNode會成為系統的瓶頸;副本存放在哪些DataNode上有NameNode來控制,根據全域性情況做出塊放置決定,讀取檔案NameNode儘量讓使用者先讀取最近的副本,降低塊消耗和讀取時延;NameNode全權管理資料塊的複製,它週期性的從叢集的每個Datanode接手心跳訊號和塊狀態報告,接收到心跳訊號意味著該DataNode節點正常工作,塊狀態報告包含了一個DataNode上所有資料塊的列表。

      一個數據塊在DataNode以檔案儲存在磁碟上,包括兩個檔案,一個數據本身,一個是元資料包括資料塊的長度,塊資料的校驗和,以及時間戳。DataNode啟動後向NameNode註冊,通過後,週期性(1個小時)的向NameNode上報所有的塊資訊。心跳是每3秒一次,心跳返回結果帶有NameNode給該DataNode的命令如複製塊資料到另一臺機器,或者刪除某個資料塊。如果超過10分鐘沒有收到某個DataNode的心跳,則認為該節點不可用。叢集執行中可以安全加入和退出一些機器。

      在core-site.xml配置檔案中配置的資料儲存目錄/data/tmp/dfs下,有如下資料夾:data(datanode存放資料)、name(namenode存放資料)、namesecondary(secondarynamenode存放資料)。

      檔案切成塊預設大小128M,以塊為單位,每個塊有多個副本儲存在不同的機器上,副本數可在檔案生成時指定預設是3。(配置或者命令都可以)
可以建立、刪除、移動或者重新命名檔案,當檔案建立、寫入和關閉之後不能修改檔案內容。

 1、資料損壞處理

       當DataNode讀取block的時候,它會計算checksum,如果計算後的checksum與block建立時值不一樣,說明該block已經損壞。Client讀取其他DataNode上的block。NameNode標記該塊已經損壞,然後複製block達到預期設定的檔案備份數。並刪除損壞的塊。DataNode在其檔案建立後三週驗證其checksum。

2、HDFS互動式shell使用

(1)檔案操作

$ bin/hdfs dfs ……
	-chgrp
	-chmod
	-chown
	-ls
	-mkdir
	-put/get
	-rm
	-rmdir
	-test
	-text

(2)叢集管理

-report				叢集狀態資訊  50070埠也可以看
-safemode			安全模式
-refresh			新增節點後重新整理
3、HDFS  java  API
public class Test {

	static final String PATH = "hdfs://hadoop-senior.ibeifeng.com:8020/";
	static final String DIR = "hdfs://hadoop-senior.ibeifeng.com:8020/d1";
	static final String FILE = "/d1/hello";
	
	public static void main(String[] args) throws Exception {
		//獲取fileSystem
		FileSystem fileSystem = getFileSystem();
		
		// 建立資料夾 hadoop dfs -mkdir /d1
		mkdir(fileSystem);
		
		// 刪除資料夾 hadoop dfs -rm /d1
		remove(fileSystem);
		
		// 上傳檔案 hadoop dfs -put src des
		putData(fileSystem);
		
		// 下載檔案 hadoop dfs -get src des
		getData(fileSystem);
		
		// 瀏覽資料夾hadoop dfs -ls /
		list(fileSystem);
	}

	private static void list(FileSystem fileSystem) throws IOException {
		FileStatus[] listStatus = fileSystem.listStatus(new Path("/"));
		for (FileStatus fileStatus : listStatus) {
			String idDir = fileStatus.isDir() ? "資料夾" : "檔案";
			String permission = fileStatus.getPermission().toString();
			short replication = fileStatus.getReplication();
			long len = fileStatus.getLen();
			String path = fileStatus.getPath().toString();
			System.out.println(idDir + "\t" + permission + "\t" + replication
					+ "\t" + len + "\t" + path);
		}
	}

	private static void getData(FileSystem fileSystem) throws IOException {
		FSDataInputStream in = fileSystem.open(new Path(FILE));
		IOUtils.copyBytes(in, System.out, 1024, true);
	}

	private static void putData(FileSystem fileSystem) throws IOException {
		FSDataOutputStream out = fileSystem.create(new Path(FILE));
		FileInputStream in = new FileInputStream("C:/readme.txt");
		IOUtils.copyBytes(in, out, 1024, true);

	}

	private static void remove(FileSystem fileSystem) throws IOException {
		fileSystem.delete(new Path(DIR), true);
	}

	private static void mkdir(FileSystem fileSystem) throws IOException {
		fileSystem.mkdirs(new Path(DIR));
	}

	private static FileSystem getFileSystem() throws IOException,
			URISyntaxException {
		FileSystem fileSystem = FileSystem.get(new URI(PATH),
				new Configuration());
		return fileSystem;
	}

}
4、NameNode啟動過程詳解

NameNode的資料存放在兩個地方

記憶體當中
本地磁碟
        fsimage  格式化檔案系統就是為了生成這個檔案
        edits

第一次啟動,格式化HDFS後,目的生成fsimage
	Start NameNode執行這個命令後,會read fsimage 檔案
	Start DataNode向NameNode註冊,block report(塊彙報)
	接下來執行命令
		create dir /user/beifeng/tmp			-> write [edits]
		put files /user/beifeg/tmp(*-site.xml)		-> write [edits]
		delete file /user/beifeng/tmp/core-site.xml	-> write [edits]
第二次啟動
	Start NameNode執行這個命令後,會read fsimage;read edits;
	生成一個新的fsimage  (合併fsimage和edits)
	生成新的edits 這個時候edits為null
		Start DataNode向NameNode註冊;block report(塊彙報)
	接下來執行命令
		create dir /user/beifeng/tmp			-> write [edits]
 		put files /user/beifeg/tmp(*-site.xml)		-> write [edits]
 		delete file /user/beifeng/tmp/core-site.xml	-> write [edits]
5、SecondaryNameNode輔助功能詳解(HA之後不啟動)

    NameNode中的編輯日誌檔案太大的話如何處理。定期(預設一個小時一次)合併fsimage和edits生成新的fsimage  給NameNode


6、安全模式safemode

等待DataNodes向NameNode傳送block report,(datanodes blocks)/(total blocks) = 99.99%s時,安全模式結束。在安全模式下可以檢視檔案系統的檔案,不能改變檔案系統的命令空間,如建立、上傳、刪除檔案等操作。

$ bin/hdfs dfsadmin –safemode  
				enter  
				leave  
				get 

yarn架構詳解


Yarn的架構

在Yarn中,JobTracker被分為兩部分:ResouceManager(RM)和ApplicationMaster(AM)
MR v1主要由三部分組成:程式設計模型(API)、資料處理引擎(MapTask和ReduceTask)和執行環境(JobTracker和TaskTracker);
Yarn繼承了MR v1的程式設計模型和處理引擎,改變的只是執行環境,所以對程式設計沒有什麼影響。
RM負責資源排程,AM負責任務排程:RM負責整個叢集的資源管理與排程;NodeManager(NM)負責單個節點的資源管理與排程;NM定時的通過心跳的形式與RM進行通訊,報告節點的健康狀態與記憶體使用情況;AM通過與RM互動獲取資源,然後通過與NM互動,啟動計算任務。

在Yarn的框架管理中,無論是AM從RM申請資源,還是NM管理自己所在節點的資源,都是通過Container進行的。Container是Yarn的資源抽象,此處的資源包含記憶體和CPU等。

  • ResourceManager:全域性資源管理器,整個叢集只有一個,負責叢集資源的統一管理和排程分配。
    功能:處理客戶端請求;啟動/監控ApplicationMaster;監控NodeManager;資源分配和排程。
  • NodeManager:整個叢集有多個,負責單節點資源管理和使用。
    功能:單個節點上的資源管理和任務管理;處理來自ResourceManager的命令;處理來自ApplicationMaster的命令。
    NodeManger管理抽象容器,這些容器代表著可供一個特定應用程式使用的針對每個節點的資源。定時的想RM彙報本節點上資源使用情況和各個Container的執行狀態。
  • ApplicationMaster:管理一個在Yarn內執行的應用程式的每個例項。
    功能:資料切分;為應用程式申請資源,並進一步分配給內部任務;任務監控和容錯。負責協調來自ResourceManager的資源,開通過NodeManager監視容器的執行和資源使用(CPU、記憶體等資源的分配)
  • Container:Yarn中的資源抽象,封裝某個節點上多維度資源,如記憶體、CPU、磁碟網路等,當AM向RM申請資源時,RM向AM返回的資源便是用Container表示的。Yarn會為每個任務分配一個Container,且該任務只能使用該Container中描述的資源。
    功能:對任務執行環境的抽象;描述一系列資訊;任務執行資源(節點、記憶體、CPU);任務啟動命令;任務執行環境

Yarn資源管理和排程

        資源排程和資源隔離是Yarn作為一個資源管理系統,最重要和最基礎的兩個功能,資源排程由ResourceManager完成,而資源隔離由各個NodeManager實現。
ResourceManager將某個NodeManager上資源分配給任務(這就是所謂的“資源排程”)後,NodeManager需按照要求為任務提供相應的資源,甚至保證這些資源應具有獨佔性,為任務執行提供基礎的保證,這就是所謂的資源隔離。

        當談及到資源時,我們通常指記憶體、CPU和IO三種資源。Hadoop YARN同時支援記憶體和CPU兩種資源的排程。記憶體資源的多少會決定任務的生死,如果記憶體不夠,任務可能會執行失敗,相比之下,CPU資源則不同,它只會決定任務執行的快慢,不會對生死產生影響。

       Yarn執行使用者配置每個節點上可用的實體記憶體資源,注意:這裡是“可用的”,因為一個節點上的記憶體會被若干個服務共享,比如一部分給yarn,一部分給hdfs,一部分給hbase等等。

Yarn的執行流程:

  1. 使用者向Yarn中提交應用程式,其中包括AM程式,啟動AM的命令,使用者程式等。
  2. RM為該程式分配第一個Container,並與對應的NM通訊,要求它在這個Container中啟動應用程式AM。
  3. AM首先向RM註冊,這樣使用者可以直接通過RM檢視應用程式的執行狀態,然後將為各個任務申請資源,並監控它的執行狀態,直到執行結束。重複4-7步驟。
  4. AM採用輪詢的方式通過RPC協議向RM申請和領取資源
  5. 一旦AM申請到資源後,便與對應的NM通訊,要求它啟動任務
  6. NM為任務設定好執行環境(包括環境變數,JAR包、二進位制程式等)後,將任務啟動命令寫到一個指令碼中,並通過執行該指令碼啟動任務。
  7. 應用程式執行完成後,AM向RM登出並關閉自己。
配置CPU和記憶體的大小:
<property>
	<name>yarn.nodemanager.resource.memory-mb </name>
	<value>4096</value>  4G記憶體 預設8G
</property>

<property>
	<name> yarn.nodemanager.resource.cpu-vcores</name>
	<value>4 </value>  4核   預設8核
</property>

MapReduce程式設計模型

一種分散式計算模型,解決海量資料的計算問題。MapReduce將整個平行計算過程抽象到兩個函式:
    1、Map:對一些獨立元素組成的列表的每一個元素進行指定的操作,可以高度並行。

1.1 讀取輸入檔案內容,解析成key、value對。對輸入檔案的每一行,解析成key、value對。每一個鍵值對呼叫一次map函式。
1.2 寫自己的邏輯,對輸入的key、value處理,轉換成新的key、value輸出。
1.3 對輸出的key、value進行分割槽。
1.4 對不同分割槽的資料,按照key進行排序、分組。相同key的value放到一個集合中。
1.5 (可選)分組後的資料進行歸約。

    2、Reduce:對一個列表的元素進行合併。

2.1 對多個map任務的輸出,按照不同的分割槽,通過網路copy到不同的reduce節點。
2.2 對多個map任務的輸出進行合併、排序。寫reduce函式自己的邏輯,對輸入的key、value處理,轉換成新的key、value輸出。
2.3 把reduce的輸出儲存到檔案中。

一個簡單的MapReduce程式只需要指定map(),reduce()、input、output,剩下的事由框架完成。

MapReduce程式設計例項(wordcount)

public class WordCountApp extends Configured implements Tool{

	static final String INPUT_PATH = "hdfs://hadoop-senior.ibeifeng.com:8020/hello";
	static final String OUT_PATH = "hdfs://hadoop-senior.ibeifeng.com:8020/out";
	
	public static void main(String[] args) throws Exception {  
		
		ToolRunner.run(new WordCountApp(), args);  
		
	}  
	
	public static void run(String[] args) throws Exception {

		INPUT_PATH = args[0];  
		OUT_PATH = args[1]; 
		
		Configuration conf = new Configuration();
		final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
		final Path outPath = new Path(OUT_PATH);
		if (fileSystem.exists(outPath)) {
			fileSystem.delete(outPath, true);
		}

		final Job job = new Job(conf, WordCountApp.class.getSimpleName());
		
		//打成jar包執行
		job.setJarByClass(WordCountApp.class);  
		
		// 1.1指定讀取的檔案位於哪裡
		FileInputFormat.setInputPaths(job, INPUT_PATH);
		
		// 指定如何對輸入檔案進行格式化,把輸入檔案每一行解析成鍵值對
		job.setInputFormatClass(TextInputFormat.class);

		// 1.2 指定自定義的map類
		job.setMapperClass(MyMapper.class);
		// map輸出的<k,v>型別。如果<k3,v3>的型別與<k2,v2>型別一致,則可以省略
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(LongWritable.class);

		// 1.3 分割槽
		//job.setPartitionerClass(HashPartitioner.class);

		// 設定reduce任務數,有一個reduce任務執行
		job.setNumReduceTasks(1);

		// 1.4 TODO 排序、分組

		// 1.5 TODO 規約

		// 2.2 指定自定義reduce類
		job.setReducerClass(MyReducer.class);

		// 指定reduce的輸出型別
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(LongWritable.class);

		// 2.3 指定寫出到哪裡
		FileOutputFormat.setOutputPath(job, outPath);

		// 指定輸出檔案的格式化類
		job.setOutputFormatClass(TextOutputFormat.class);

		//把job提交執行 true表示列印結果資訊 
		job.waitForCompletion(true);
	}

	/**
	 * KEYIN 即k1:表示行的偏移量
	 * VALUEIN 即v1:表示行文字內容
	 * KEYOUT 即k2:表示行中出現的單詞
	 * VALUEOUT 即v:2表示行中出現的單詞的次數,固定值1
	 */

	static class MyMapper extends
			Mapper<LongWritable, Text, Text, LongWritable> {
		
		private Text word = new Text();
		private LongWritable one = new LongWritable(1);
		
		protected void map(LongWritable k1, Text v1, Context ctx)
		throws java.io.IOException, InterruptedException {
			
			StringTokenizer st = new StringTokenizer(v1, "\t");
			while(st.hasMoreTokens()){
				word.set(st.nextToken());
				ctx.write(word, one);
			}
		};

	}

	/**
	 * KEYIN 即k2:表示行中出現的單詞
	 * VALUEIN 即v2:表示行中出現的單詞的次數
	 * KEYOUT 即k3:表示文字中出現的不同單詞
	 * VALUEOUT 即v3:表示文字中出現的不同單詞的總次數
	 */

	static class MyReducer extends
			Reducer<Text, LongWritable, Text, LongWritable> {

		protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s,Context ctx)
		throws java.io.IOException, InterruptedException {
			
			long times = 0L;
			for (LongWritable count : v2s) {
				times += count.get();
			}
			ctx.write(k2, new LongWritable(times));
		};
	}

}
自定義資料型別實現KPI

自定義的型別必須實現Writable,如果需要排序還必須實現 Comparable

public class KpiApp extends Configured implements Tool{


	static final String INPUT_PATH = "hdfs://hadoop-senior.ibeifeng.com:8020/wlan";
	static final String OUT_PATH = "hdfs://hadoop-senior.ibeifeng.com:8020/out";
	
	public static void main(String[] args) throws Exception {  
		
		ToolRunner.run(new KpiApp(), args);  
		
	}  
	
	public static void run(String[] args) throws Exception {

		INPUT_PATH = args[0];  
		OUT_PATH = args[1]; 
		
		Configuration conf = new Configuration();
		final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
		final Path outPath = new Path(OUT_PATH);
		if (fileSystem.exists(outPath)) {
			fileSystem.delete(outPath, true);
		}

		final Job job = new Job(conf, KpiApp.class.getSimpleName());
		
		//打成jar包執行
		job.setJarByClass(KpiApp.class);  
		
		// 1.1指定讀取的檔案位於哪裡
		FileInputFormat.setInputPaths(job, INPUT_PATH);
		
		// 指定如何對輸入檔案進行格式化,把輸入檔案每一行解析成鍵值對
		job.setInputFormatClass(TextInputFormat.class);

		// 1.2 指定自定義的map類
		job.setMapperClass(MyMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(KpiWritable.class);

		// 1.3 分割槽
		//job.setPartitionerClass(HashPartitioner.class);
		// 設定reduce任務數,有一個reduce任務執行
		//job.setNumReduceTasks(1);

		// 1.4 TODO 排序、分組

		// 1.5 TODO 合併、規約

		// 2.2 指定自定義reduce類
		job.setReducerClass(MyReducer.class);
		// 指定reduce的輸出型別
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(KpiWritable.class);

		// 2.3 指定寫出到哪裡
		FileOutputFormat.setOutputPath(job, outPath);

		// 指定輸出檔案的格式化類
		job.setOutputFormatClass(TextOutputFormat.class);

		//把job提交執行
		job.waitForCompletion(true);
	}

	static class MyMapper extends Mapper<LongWritable, Text, Text, KpiWritable> {

		@Override
		protected void map(LongWritable key,Text value,Context context)
				throws IOException, InterruptedException {

			String[] splited = value.toString().split("\t");
			String msisdn = splited[1];
			Text k2 = new Text(msisdn);
			KpiWritable v2 = new KpiWritable(splited[6], splited[7],splited[8], splited[9]);
			context.write(k2, v2);
		}

	}

	static class MyReducer extends Reducer<Text, KpiWritable, Text, KpiWritable> {

		@Override
		protected void reduce(Text k2,Iterable<KpiWritable> v2s,Context context)
				throws IOException, InterruptedException {

			long upPackNum = 0L;
			long downPackNum = 0L;
			long upPayLoad = 0L;
			long downPayLoad = 0L;
			
			for (KpiWritable kpiWritable : v2s) {
				upPackNum += kpiWritable.upPackNum;
				downPackNum += kpiWritable.downPackNum;
				upPayLoad += kpiWritable.upPayLoad;
				downPayLoad += kpiWritable.downPayLoad;
			}

			KpiWritable v3 = new KpiWritable(upPackNum + "", downPackNum + "",upPayLoad + "", downPayLoad + "");
			context.write(k2, v3);
		}

	}

}

class KpiWritable implements Writable {

	long upPackNum;
	long downPackNum;
	long upPayLoad;
	long downPayLoad;
	
	public KpiWritable() {
	}

	public KpiWritable(String upPackNum, String downPackNum, String upPayLoad,String downPayLoad) {

		this.upPackNum = Long.parseLong(upPackNum);
		this.downPackNum = Long.parseLong(downPackNum);
		this.upPayLoad = Long.parseLong(upPayLoad);
		this.downPayLoad = Long.parseLong(downPayLoad);
	}

	@Override
	public void readFields(DataInput in) throws IOException {

		this.upPackNum = in.readLong();
		this.downPackNum = in.readLong();
		this.upPayLoad = in.readLong();
		this.downPayLoad = in.readLong();
	}

	@Override
	public void write(DataOutput out) throws IOException {

		out.writeLong(upPackNum);
		out.writeLong(downPackNum);
		out.writeLong(upPayLoad);
		out.writeLong(downPayLoad);
	}

	@Override
	public String toString() {

		return upPackNum + "\t" + downPackNum + "\t" + upPayLoad + "\t"+ downPayLoad;

	}

}

Combiners程式設計

/**
 * 問:為什麼使用Combiner?
 * 答:Combiner發生在Map端,對資料進行規約處理,資料量變小了,傳送到reduce端的資料量變小了,傳輸時間變短,作業的整體時間變短。
 * 
 * 問:為什麼Combiner不作為MR執行的標配,而是可選步驟哪?
 * 答:因為不是所有的演算法都適合使用Combiner處理,例如求平均數。
 *
 * 問:Combiner本身已經執行了reduce操作,為什麼在Reducer階段還要執行reduce操作哪?
 * 答:combiner操作發生在map端的,處理一個任務所接收的檔案中的資料,不能跨map任務執行;只有reduce可以接收多個map任務處理的資料。
 *
 */

//1.5 TODO 規約
job.setCombinerClass(MyCombiner.class);

static class MyCombiner extends Reducer<Text, LongWritable, Text, LongWritable>{
	
	protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s, Context ctx) 
			throws java.io.IOException ,InterruptedException {
	
		long times = 0L;
		for (LongWritable count : v2s) {
			times += count.get();
		}
		ctx.write(k2, new LongWritable(times));
	}
}

Partitioner程式設計

/**
 * 分割槽的例子必須打成jar執行
 * 用處: 1.根據業務需要,產生多個輸出檔案
 *        2.多個reduce任務在執行,提高整體job的執行效率
 */

//1.3 指定分割槽類
job.setPartitionerClass(KpiPartitioner.class);
job.setNumReduceTasks(2);

static class KpiPartitioner extends HashPartitioner<Text, LongWritable>{
	@Override
	public int getPartition(Text key, LongWritable value, int numReduceTasks) {
		return (key.toString().length()==11)?0:1;
	}

}

Shuffle執行流程詳解

MR執行過程

Step1:    
	Input輸入        
		InputFormat讀取資料轉換成<key,value>            
			FileInputFormat                
				TextInputFormat            
			DBInputFormat
Step2:   
	Map階段    
		Map(KEYIN,VALUEIN,KEYOUT,VALUEOUT)    
		預設情況下:        
			KEYIN:偏移量    LongWritable       
			VALUE:內容    TEXT
Step3:    
	Shuffle階段    
		Map端輸出,output<key,value>:       
			output輸出首先是到記憶體,後面spill(溢寫到磁碟),可能有很多檔案。
				分割槽 partition (指定哪些map給哪些reduce處理)      
				排序sort        
			很多小檔案,spill            
				合併merge           
				排序sort          
			最後變成大檔案 –>Map Task執行機器的本地磁碟 
		Reduce端輸入        
			Reduce Task會到Map Task執行機器上拷貝要處理的資料。      
				合併merge         
				排序sort            
				分組group :將相同key的value放在一起        
		總的來說:        
			分割槽:partitioner     
			排序:sort        
			拷貝:copy      
			分組:group     
			壓縮:compress(map階段大檔案) 
			合併:combiner(Map任務端的reduce,不是所有情況都適用)
Step4:    
	Reduce階段    
	Reduce(KEYIN,VALUEIN,KEYOUT,VALUEOUT)
Step5:    
	Output輸出        
		OutputFormat           
			FileOutputFormat               
				TextOutputFormat:預設情況每個<key,value>輸出一行,key、value的中間分隔符為\t,				
				預設呼叫key和value的toString()方法。            
			DBOutputFormat

MapReduce調優

1、Shuffle過程

compress壓縮調優 (Map端輸出壓縮)Combiner調優    (Map端輸出組合)

2、Map階段

Map階段排序小檔案合併因子配置
<property>
    <name>mapreduce.task.io.sort.factor </name>
    <value>10</value>
</property>
Map階段輸出到記憶體大小設定
<property>
    <name>mapreduce.task.io.sort.mb </name>
    <value>10</value>
</property>
達到記憶體多少開始spill到磁碟
<property>
    <name>mapreduce.map.sort.spill.percent </name>
    <value>0.8</value>
</property>
在沒有reduce的情況下(job.setNumReduceTask(0)),直接寫入硬碟,把資料設定很小
如果設定reducer任務數為0,map端不會執行combiner,sort,merge操作,會直接輸出無序結果(讀一行,輸一行),
輸出的檔案數量,與map task的數量匹配(一個Input split對應一個map task)。
3、reduce階段

設定reduce任務
通常情況一個block就對應一個map任務進行處理,reduce任務如果人工不去設定的話就一個。reduceReduce任務個數該如何設定,程式中通過job.setNumReduceTask(2)2個。Reduce任務的數量是根據程式慢慢調。