1. 程式人生 > >MapReduce程式設計【WordCount】

MapReduce程式設計【WordCount】

課程回顧:     wordcount程式:     單機版:         統計的6個檔案         定義一個方法------讀取每一個小檔案進行統計         這個方法呼叫了6次         定義了一個最終統計的方法         這個方法呼叫了1次         最大值   最小值   平均值         map-----分         reduce----彙總     mapreduce的程式設計:         mapper<輸入的key---預設是每一行的偏移量,輸入的value---每一行的內容,         輸出的key,輸出的value>{             map(偏移量,一行內容,context){             }         }                      reducer<輸入key,輸入value,輸出key,輸出value>{             reduce(一組的中的相同的key,一組中的所有value的迭代器,上下文物件){                          }         }         driver-----main     mapreduce的執行模式:         1.打jar包的方式執行         生產用的    缺點:不便於本地除錯         2.本地模式                  本地的程式碼除錯                      1.路徑  全路徑             2.許可權                 System.setProperty("HADOOP_USER_NAME", "hadoop");         3.本地模式執行的時候提交到yarn叢集上             不用    配置太麻煩---環境   修改原始碼 mapreduce----分散式的平行計算 maptask的並行度:map階段的分散式執行     整個map階段的任務被分成了幾個小任務執行  每一個小任務就稱為一個maptask任務     maptask的概念:執行mapper的任務     並行度:同時執行的任務數量     同時可以執行的maptask的個數     wordcount-----只執行一個maptask  執行多個maptask     注意:一個maptask任務只能執行在一個節點上,一個節點上可以執行多個maptask任務的     整個任務被分成幾個maptask任務,取決於  節點    資料量     一個maptask--------計算的任務量     4個檔案進行資料統計         大小1.42kb         3個從節點  datanode   nodemanager         maptask任務數量---3?  4?   ----mapper---java---jps         執行wordcount的時候  發現  map階段執行的時候 4個yarnchild             yarnchild-----maptask任務----mapper類             1個yarnchild-----1個maptask任務             maptask任務和資料有關的  和資料的檔案個數有關?             4份資料:1T   ---   假設和資料的檔案個數有關----4maptask----1T             節點:10個   最多執行在4個節點上   剩下的6個節點  沒有事情做--負載不均衡                 1)負載不均衡                 2)資料儲存的  分塊儲存   1t---8192塊   分散儲存在10個節點上                 每一個maptask要訪問8192個數據塊    跨節點訪問資料                 maptask任務和檔案的個數無關             maptask任務應該資料量有關             一個maptask任務對應多少資料量?                 wc---1個檔案    500M                      實際儲存:                     blk01:0-127M                     blk02:128-255M                     blk03:256-383M                     blk04:384-500M                 對應100M資料量---一個maptask任務  處理100M的資料                     maptask01:0-99M    blk01                     maptask02:100-199M  blk01  blk02                     maptask03:200-299M  blk02  blk03                     .......                     有極大的可能跨資料塊  跨節點訪問資料  效能低  不合理的                 對應的資料量200M---一個maptask任務  處理200M資料量                     maptask01:0-199M     blk01  blk02                     ....                     有極大的可能跨資料塊  跨節點訪問資料    不合理                 分析  maptask對應的資料量128M的時候最合理的                     maptask01:blk01                     maptask02:blk02                     maptask03:blk03                     maptask04:blk04                 底層實現是否是這樣的?                     事實上一個maptask任務對應的資料量   一個切片大小---split                     切片:邏輯上的概念(邏輯切塊) 邏輯切片    僅僅是一個偏移量的劃分                     一個maptask任務對應資料量就是一個邏輯切片                     一個邏輯切片假設100M    意思就是這個邏輯切片對應的資料0-99M資料  不會真正的切分資料                     理論上切片的大小就是資料塊的大小                     檔案輸入類:                         FileInputFormat                             getSplits()                                                                                         //獲取最終的切片大小                               //128M      1      long_max                               protected long computeSplitSize(long blockSize, long minSize,                                                               long maxSize) {                                 //Math.min(maxSize====long_max, blockSize==128*1024*1024M)    ====blocksize                                   //Math.max(minSize===1,blocksize===128M)   ========   blocksize                                 return Math.max(minSize, Math.min(maxSize, blockSize));                               }                               }                         修改切片大小:                             小於128M-----修改max                             大於128M-----修改min                                                          修改的方式:                             1.修改配置檔案  mapred-site.xml                               增加以下的兩項配置                             max---                               mapreduce.input.fileinputformat.split.maxsize                               min----                               mapreduce.input.fileinputformat.split.minsize                               一般不使用                             2.程式碼中修改                             FileInputFormat.setMaxInputSplitSize(job, 1*1024);     總結:maptask的並行度  和切片大小有關         一個切片----1個maptask任務         預設的時候1個切片的大小就是一個數據塊的大小         block   split的區別?             block:hdfs的資料儲存的切分單元  預設128M   物理切分             split:mapreduce中maptask任務的時候  進行資料劃分的單元  邏輯上的概念  沒有真正的切分資料             理論上預設的split的大小===blocksize             當前的檔案  不夠128M   則單獨成一個邏輯切片             最後一個切片   最大大小  blockSize/splitSize*1.1  128M*1.1=140.8M                      maptask任務在哪一個節點執行---yarn資源分配決定的             maptask任務  執行MyMapper的類的程式碼        

自定義的類---Writable介面     實現了自己的一套  序列化反序列化的規則  數值   Writable     hadoop內建的序列化 反序列化的型別:         intwritable         longwritable         text         nullwritable         bytewritable         doublewritable         floatwritable         booleanwritable     使用者自定義的型別  作為mapreduce的傳輸的類怎麼辦?         具備序列化 和  反序列的能力    實現Writable介面         案例:             1、 統計每一個使用者(手機號)所耗費的總上行流量、總下行流量,總流量                 關鍵字:每一----分組                 分組:手機號                 map端:                     取到每一行的內容  拆分每一個欄位  傳送                     key:手機號                     value:上行流量+下行流量                         兩種傳送方式:                             1)拼接字串---練習                             2)自定義一個類  FlowBean                 reduce端:                     迴圈遍歷values  求和                 mapreduce程式中  輸出的key  --  value的分隔                 實現Writable 介面                       重寫:write()  readFields方法

mapreduce的排序:shuffle(快速排序    歸併排序)     mapreduce:中會預設按照map輸出的key進行排序  預設升序         如果map輸出key是數值型別    按照大小排序的  升序排序的         如果map輸出的key是字串型別的  按照順序進行排序的   升序排序的     需求:         對wordcount的結果進行排序  按照單詞出現的次數進行排序         shuffle過程中  可以進行排序的   利用這個排序為我們排序             hadoop    256             hadoophello    4             hello    164             hive    176             lily    84             spark    168             word    84             ww    84         想要利用shuffle排序   將需要排序的欄位  放在map的輸出的key上   輸出結果還想要  單詞--次數             reduce端輸出的時候  將map的結果進行一個反轉         map端:             獲取一行資料   切分             key:次數             value:單詞         reduce端:             將map輸過來的資料   進行對調之後  輸出就可以         降序排序:     原始型別:         hadoop中預設實現的型別  8個  都實現WritableComparable  具備比較能力 也具備序列化的能力         intwritable----             public class IntWritable implements WritableComparable<IntWritable>                 public interface WritableComparable<T> extends Writable, Comparable<T> { }              需要排序的  型別是自定義的型別   flowbean         自定義的型別  具備以下兩點:             1)序列化和反序列功能  ---Writable(write  readFields)             2)具備比較的能力   ---  comparable(comparaTo)         自定義的型別必須同時實現兩個介面             實現以下的介面:             WritableComparable-----(write  readFields  comparaTo)         需求:                              2.得出上題結果的基礎之上再加一個需求:將統計結果按照總流量倒序排序  再按手機號排序             藉助shuffle排序---map的key  將排序欄位放在map的key中             排序欄位:總流量  手機號             Text---字典順序排序的時候             1000000    13552785495             80    13552785494                 map端:                         key:總流量+手機號  --Text                     value:剩下的欄位                 reduce:                     key:手機號---text                     value:上+下+總---text                                 -----練習                 缺點:1)排序的效能  Text--String---字典順序   一個字元一個字元比較的                     流量---數字                     2)結果有問題的             自定義的類實現:FlowBean---手機號   上行流量  下行流量   總流量                 map:                     key:fb                     value:nullwritable                 reduce:                     輸出             對於自定義類的排序要想借助shuffle過程排序   自定義的類必須放在map的輸出的key                 這個類必須實現writablecomparable介面                 實現write   readFields   comparaTo                 比較規則定義在comparaTo方法中的         如果自定義的類沒有排序需求  放在map的key上了  一定要實現的writablecomparable             只要自定義的類放在map的key位置  就會預設按照他進行排序             map的key的自定義類---慎重 reducetask的並行度----分割槽

package com.ghgj.cn.testmapreduce;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
	Text mk = new Text();
	IntWritable mv = new IntWritable();
	@Override
	protected void map(LongWritable key, Text value,Context context)
			throws IOException, InterruptedException {
		//獲取每一行內容
		String line = value.toString();
		//切分
		String[] num = line.split("\t");
		//迴圈遍歷 傳送
		for(String n:num){
			mk.set("hzh");
			mv.set(Integer.parseInt(n));
			context.write(mk, mv);
		}
	}
} 


package com.ghgj.cn.testmapreduce;

import java.io.IOException;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * reduce輸出
 * key :輸出最大最小
 * value:值
 *
 */
public class MyReducer extends Reducer<Text,IntWritable, Text, DoubleWritable>{
	Text rk = new Text();
	DoubleWritable rv = new DoubleWritable();
	@Override
	protected void reduce(Text key, Iterable<IntWritable> values,
			Context context) throws IOException, InterruptedException {
		//迴圈遍歷values
		int max =0;
		int min =10000;
		int sum =0;
		int count =0;
		for(IntWritable v:values){
			int data = v.get();
			count++;
			sum+=data;
			//最大
			if(max<data){
				max=data;
			}
			//最小
			if(min>data){
				min=data;
			}
			//平均
			double avg = (double)sum/count;
			rk.set("最大值");
			rv.set(max);
			//context.write()可以呼叫多次,呼叫一次輸出一條資料
			context.write(rk, rv);
			
			rk.set("最小值");
			rv.set(min);
			context.write(rk, rv);
			
			rk.set("平均值");
			rv.set(avg);
			context.write(rk, rv);
		}
	}
	
}

package com.ghgj.cn.testmapreduce;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Driver {
	public static void main(String[] args) {
		//將mapper  和  reducer 類進行一個封裝   封裝為一個任務job(作業)
				//載入配置檔案
				Configuration conf = new Configuration();
				//啟動一個job【建立一個job物件】
				try {
					Job job = Job.getInstance(conf);
					//設定job
					//先設定整個job的主函式入口
					job.setJarByClass(Driver.class);
					//設定job的mapper的類
					job.setMapperClass(MyMapper.class);
					//設定job的reducer的類
					job.setReducerClass(MyReducer.class);
					//設定map輸出key   value的型別
					//指定了泛型這裡為什麼還要設定一次  泛型的作用範圍 編譯的時候生效  執行的時候泛型擦除
					job.setMapOutputKeyClass(Text.class);
					job.setMapOutputValueClass(IntWritable.class);
					//設定reduce的輸出的key value型別 以下設定的方法設定的是mr的最終輸出
					job.setOutputKeyClass(Text.class);
					job.setOutputValueClass(DoubleWritable.class);
					//指定需要統計的檔案的輸入路徑 FileInputFormat檔案輸入類
//					Path inpath = new Path("/wcin");
					Path inpath = new Path(args[0]);
					FileInputFormat.addInputPath(job, inpath);
					//指定輸出目錄   輸出路徑不能存在  否則會報錯,預設輸出是覆蓋式的輸出 如果目錄存在,有可能造成元資料丟失
//					Path outpath = new Path("/wc_out01");
					Path outpath = new Path(args[1]);
					FileOutputFormat.setOutputPath(job, outpath);
					//提交job 執行這一句時候job才會提交 上面做的一系列的工作都是設定job
					//job.submit
					job.waitForCompletion(true);
				} catch (IOException e) {
					e.printStackTrace();
				} catch (ClassNotFoundException e) {
					e.printStackTrace();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				//hadoop fs -cat /dataout01/part-r-00000
	}
}