MapReduce程式設計【WordCount】
課程回顧: wordcount程式: 單機版: 統計的6個檔案 定義一個方法------讀取每一個小檔案進行統計 這個方法呼叫了6次 定義了一個最終統計的方法 這個方法呼叫了1次 最大值 最小值 平均值 map-----分 reduce----彙總 mapreduce的程式設計: mapper<輸入的key---預設是每一行的偏移量,輸入的value---每一行的內容, 輸出的key,輸出的value>{ map(偏移量,一行內容,context){ } } reducer<輸入key,輸入value,輸出key,輸出value>{ reduce(一組的中的相同的key,一組中的所有value的迭代器,上下文物件){ } } driver-----main mapreduce的執行模式: 1.打jar包的方式執行 生產用的 缺點:不便於本地除錯 2.本地模式 本地的程式碼除錯 1.路徑 全路徑 2.許可權 System.setProperty("HADOOP_USER_NAME", "hadoop"); 3.本地模式執行的時候提交到yarn叢集上 不用 配置太麻煩---環境 修改原始碼 mapreduce----分散式的平行計算 maptask的並行度:map階段的分散式執行 整個map階段的任務被分成了幾個小任務執行 每一個小任務就稱為一個maptask任務 maptask的概念:執行mapper的任務 並行度:同時執行的任務數量 同時可以執行的maptask的個數 wordcount-----只執行一個maptask 執行多個maptask 注意:一個maptask任務只能執行在一個節點上,一個節點上可以執行多個maptask任務的 整個任務被分成幾個maptask任務,取決於 節點 資料量 一個maptask--------計算的任務量 4個檔案進行資料統計 大小1.42kb 3個從節點 datanode nodemanager maptask任務數量---3? 4? ----mapper---java---jps 執行wordcount的時候 發現 map階段執行的時候 4個yarnchild yarnchild-----maptask任務----mapper類 1個yarnchild-----1個maptask任務 maptask任務和資料有關的 和資料的檔案個數有關? 4份資料:1T --- 假設和資料的檔案個數有關----4maptask----1T 節點:10個 最多執行在4個節點上 剩下的6個節點 沒有事情做--負載不均衡 1)負載不均衡 2)資料儲存的 分塊儲存 1t---8192塊 分散儲存在10個節點上 每一個maptask要訪問8192個數據塊 跨節點訪問資料 maptask任務和檔案的個數無關 maptask任務應該資料量有關 一個maptask任務對應多少資料量? wc---1個檔案 500M 實際儲存: blk01:0-127M blk02:128-255M blk03:256-383M blk04:384-500M 對應100M資料量---一個maptask任務 處理100M的資料 maptask01:0-99M blk01 maptask02:100-199M blk01 blk02 maptask03:200-299M blk02 blk03 ....... 有極大的可能跨資料塊 跨節點訪問資料 效能低 不合理的 對應的資料量200M---一個maptask任務 處理200M資料量 maptask01:0-199M blk01 blk02 .... 有極大的可能跨資料塊 跨節點訪問資料 不合理 分析 maptask對應的資料量128M的時候最合理的 maptask01:blk01 maptask02:blk02 maptask03:blk03 maptask04:blk04 底層實現是否是這樣的? 事實上一個maptask任務對應的資料量 一個切片大小---split 切片:邏輯上的概念(邏輯切塊) 邏輯切片 僅僅是一個偏移量的劃分 一個maptask任務對應資料量就是一個邏輯切片 一個邏輯切片假設100M 意思就是這個邏輯切片對應的資料0-99M資料 不會真正的切分資料 理論上切片的大小就是資料塊的大小 檔案輸入類: FileInputFormat getSplits() //獲取最終的切片大小 //128M 1 long_max protected long computeSplitSize(long blockSize, long minSize, long maxSize) { //Math.min(maxSize====long_max, blockSize==128*1024*1024M) ====blocksize //Math.max(minSize===1,blocksize===128M) ======== blocksize return Math.max(minSize, Math.min(maxSize, blockSize)); } } 修改切片大小: 小於128M-----修改max 大於128M-----修改min 修改的方式: 1.修改配置檔案 mapred-site.xml 增加以下的兩項配置 max--- mapreduce.input.fileinputformat.split.maxsize min---- mapreduce.input.fileinputformat.split.minsize 一般不使用 2.程式碼中修改 FileInputFormat.setMaxInputSplitSize(job, 1*1024); 總結:maptask的並行度 和切片大小有關 一個切片----1個maptask任務 預設的時候1個切片的大小就是一個數據塊的大小 block split的區別? block:hdfs的資料儲存的切分單元 預設128M 物理切分 split:mapreduce中maptask任務的時候 進行資料劃分的單元 邏輯上的概念 沒有真正的切分資料 理論上預設的split的大小===blocksize 當前的檔案 不夠128M 則單獨成一個邏輯切片 最後一個切片 最大大小 blockSize/splitSize*1.1 128M*1.1=140.8M maptask任務在哪一個節點執行---yarn資源分配決定的 maptask任務 執行MyMapper的類的程式碼
自定義的類---Writable介面 實現了自己的一套 序列化反序列化的規則 數值 Writable hadoop內建的序列化 反序列化的型別: intwritable longwritable text nullwritable bytewritable doublewritable floatwritable booleanwritable 使用者自定義的型別 作為mapreduce的傳輸的類怎麼辦? 具備序列化 和 反序列的能力 實現Writable介面 案例: 1、 統計每一個使用者(手機號)所耗費的總上行流量、總下行流量,總流量 關鍵字:每一----分組 分組:手機號 map端: 取到每一行的內容 拆分每一個欄位 傳送 key:手機號 value:上行流量+下行流量 兩種傳送方式: 1)拼接字串---練習 2)自定義一個類 FlowBean reduce端: 迴圈遍歷values 求和 mapreduce程式中 輸出的key -- value的分隔 實現Writable 介面 重寫:write() readFields方法
mapreduce的排序:shuffle(快速排序 歸併排序) mapreduce:中會預設按照map輸出的key進行排序 預設升序 如果map輸出key是數值型別 按照大小排序的 升序排序的 如果map輸出的key是字串型別的 按照順序進行排序的 升序排序的 需求: 對wordcount的結果進行排序 按照單詞出現的次數進行排序 shuffle過程中 可以進行排序的 利用這個排序為我們排序 hadoop 256 hadoophello 4 hello 164 hive 176 lily 84 spark 168 word 84 ww 84 想要利用shuffle排序 將需要排序的欄位 放在map的輸出的key上 輸出結果還想要 單詞--次數 reduce端輸出的時候 將map的結果進行一個反轉 map端: 獲取一行資料 切分 key:次數 value:單詞 reduce端: 將map輸過來的資料 進行對調之後 輸出就可以 降序排序: 原始型別: hadoop中預設實現的型別 8個 都實現WritableComparable 具備比較能力 也具備序列化的能力 intwritable---- public class IntWritable implements WritableComparable<IntWritable> public interface WritableComparable<T> extends Writable, Comparable<T> { } 需要排序的 型別是自定義的型別 flowbean 自定義的型別 具備以下兩點: 1)序列化和反序列功能 ---Writable(write readFields) 2)具備比較的能力 --- comparable(comparaTo) 自定義的型別必須同時實現兩個介面 實現以下的介面: WritableComparable-----(write readFields comparaTo) 需求: 2.得出上題結果的基礎之上再加一個需求:將統計結果按照總流量倒序排序 再按手機號排序 藉助shuffle排序---map的key 將排序欄位放在map的key中 排序欄位:總流量 手機號 Text---字典順序排序的時候 1000000 13552785495 80 13552785494 map端: key:總流量+手機號 --Text value:剩下的欄位 reduce: key:手機號---text value:上+下+總---text -----練習 缺點:1)排序的效能 Text--String---字典順序 一個字元一個字元比較的 流量---數字 2)結果有問題的 自定義的類實現:FlowBean---手機號 上行流量 下行流量 總流量 map: key:fb value:nullwritable reduce: 輸出 對於自定義類的排序要想借助shuffle過程排序 自定義的類必須放在map的輸出的key 這個類必須實現writablecomparable介面 實現write readFields comparaTo 比較規則定義在comparaTo方法中的 如果自定義的類沒有排序需求 放在map的key上了 一定要實現的writablecomparable 只要自定義的類放在map的key位置 就會預設按照他進行排序 map的key的自定義類---慎重 reducetask的並行度----分割槽
package com.ghgj.cn.testmapreduce;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
Text mk = new Text();
IntWritable mv = new IntWritable();
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
//獲取每一行內容
String line = value.toString();
//切分
String[] num = line.split("\t");
//迴圈遍歷 傳送
for(String n:num){
mk.set("hzh");
mv.set(Integer.parseInt(n));
context.write(mk, mv);
}
}
}
package com.ghgj.cn.testmapreduce;
import java.io.IOException;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
* reduce輸出
* key :輸出最大最小
* value:值
*
*/
public class MyReducer extends Reducer<Text,IntWritable, Text, DoubleWritable>{
Text rk = new Text();
DoubleWritable rv = new DoubleWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
//迴圈遍歷values
int max =0;
int min =10000;
int sum =0;
int count =0;
for(IntWritable v:values){
int data = v.get();
count++;
sum+=data;
//最大
if(max<data){
max=data;
}
//最小
if(min>data){
min=data;
}
//平均
double avg = (double)sum/count;
rk.set("最大值");
rv.set(max);
//context.write()可以呼叫多次,呼叫一次輸出一條資料
context.write(rk, rv);
rk.set("最小值");
rv.set(min);
context.write(rk, rv);
rk.set("平均值");
rv.set(avg);
context.write(rk, rv);
}
}
}
package com.ghgj.cn.testmapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Driver {
public static void main(String[] args) {
//將mapper 和 reducer 類進行一個封裝 封裝為一個任務job(作業)
//載入配置檔案
Configuration conf = new Configuration();
//啟動一個job【建立一個job物件】
try {
Job job = Job.getInstance(conf);
//設定job
//先設定整個job的主函式入口
job.setJarByClass(Driver.class);
//設定job的mapper的類
job.setMapperClass(MyMapper.class);
//設定job的reducer的類
job.setReducerClass(MyReducer.class);
//設定map輸出key value的型別
//指定了泛型這裡為什麼還要設定一次 泛型的作用範圍 編譯的時候生效 執行的時候泛型擦除
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//設定reduce的輸出的key value型別 以下設定的方法設定的是mr的最終輸出
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
//指定需要統計的檔案的輸入路徑 FileInputFormat檔案輸入類
// Path inpath = new Path("/wcin");
Path inpath = new Path(args[0]);
FileInputFormat.addInputPath(job, inpath);
//指定輸出目錄 輸出路徑不能存在 否則會報錯,預設輸出是覆蓋式的輸出 如果目錄存在,有可能造成元資料丟失
// Path outpath = new Path("/wc_out01");
Path outpath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outpath);
//提交job 執行這一句時候job才會提交 上面做的一系列的工作都是設定job
//job.submit
job.waitForCompletion(true);
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
//hadoop fs -cat /dataout01/part-r-00000
}
}