第2章 黑盒測試線上測試
date: 2020-04-22 20:15:00
updated: 2020-04-24 09:40:00
MapReduce
1.1 輸入輸出
首先都是 k, v 的形式
map 到 reduce 端是通過網路來傳輸,所以 k, v 都需要序列化和反序列化,Java 基本資料型別不支援序列化,所以需要用 MR 自己封裝的型別,比如 LongWritable, Text, IntWritable 等等
reduce 的輸入順序預設是按照 key 的大小順序(字串的字典順序)來進行處理,如果 map 輸出的 key 是一個物件,可以 implements WritableComparable 介面,自定義 key 的順序來交給 reduce
map 輸出的內容會被序列化,所以如果是輸出一個物件的話,也不會有影響
ArrayList<OrderBean> beans = new ArrayList<>(); OrderBean bean = new OrderBean(); bean.set(1); beans.add(bean); // 此時儲存的只是bean物件的一個地址引用 bean.set(2); beans.add(bean); // 由於是地址引用,在原先地址上修改了bean物件的值,那麼beans最後呈現出來的值也是最新的值,打印出來都是 2 class xxMapper extends Mapper<>{ OrderBean bean = new OrderBean(); Text k = new Text(); @Override protected void map(){ // 每一個 maptask 都會呼叫 map 方法,如果在 map 方法裡建立物件,就會佔用大量資源,由於物件寫入到上下文的時候要經過序列化,所以並不需要擔心多個maptask 在頻繁賦值造成前後影響 context.write(k, bean); } }
reduce 裡雖然只有一個key,但是迭代器values每迭代一次,key也會跟著變
執行機制:
map task(其實是叫 yarn child),拿到的是內容是 TextInputFormat,讀取每一行用的是 LineRecordReader,返回一對kv,k是偏移量,v是行的內容。頻繁的IO會降低效率,所以context(k, v) 會把結果寫入到一個 MapOutputCollector 的環形記憶體緩衝區,當內容寫到一定程度之後(緩衝區預設大小100M,spill的閾值是0.8,可以通過mapreduce.map.io.sort.spill.percent
來設定),有一個 Spiller 溢位監控的執行緒,去把緩衝區的內容通過輪詢的方式寫入到mapreduce.cluster.local.dir
寫磁碟時壓縮map端的輸出,因為這樣會讓寫磁碟的速度更快,節約磁碟空間,並減少傳給reducer的資料量。預設情況下,輸出是不壓縮的(將mapreduce.map.output.compress設定為true即可啟動)
sequenceFile 裡面儲存的是kv形式,這樣就不需要split文字內容了。 job.setOutputFormatClass(xx); 預設是 TextOutputFormat.class(同理輸入),改成 SequenceFileOutputFormat.class 就可以修改輸出檔案的型別,直接將內容轉換成二進位制
1.2 combiner
combiner 是先在 map 端做一次合併,減少傳輸到 reduce 端的資料量,因為 combiner 的輸出是 reduce 的輸入,所以 combiner 是不能對 map 的輸出進行修改,所以適用的場景比如累加、求最大值等,但是求平均數就不行了。
combiner 操作發生在 map 端的,處理一個任務所接收的檔案中的資料,做區域性聚合,不能跨 map 任務執行;只有 reduce 可以接收多個 map 任務處理的資料(也就是通過 key 的 hash 值取模分配到對應的 reduce上)
繼承的也是 reduce 父類,實現的是對 reduce 方法的覆寫,然後在啟動類新增 job.setCombinerClass();
可以用來解決一定的資料傾斜問題
1.3 啟動類配置
設定 hadoop 使用者
System.setProperty("HADOOP_USER_NAME", "root"); // 這樣寫可以在本地執行程式的時候以 root 使用者去執行 jar,從而擁有對輸入輸出路徑下檔案的讀寫許可權
job 執行時提交請求給 resource manager,分配一定的 node manager 的資源來執行 job
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://9000"); // 對應linux下hadoop安裝目錄下 core-site.xml 裡面的配置項
conf.set("mapreduce.framework.name", "yarn"); // 對應linux下hadoop安裝目錄下 mapred-site.xml 裡面的配置項,不寫的話預設值是 local
conf.set("yarn.resourcemanager.hostname", "yarn-01");
// 如果從 windows 執行的話,需要跨平臺提交
conf.set("mapreduce.app-submission.cross-platform", true);
Job job = Job.getInstance(conf);
job.setJarByClass(類名.class) | ("d:/xxx.jar");
// 還需要設定 map reduce 的 key value 的型別,以及 map reduce 對應的類,還有輸入輸出路徑,輸出路徑必須不存在
// 設定 reduceTask 個數
job.setNumReduceTasks(2);
// 提交到 yarn 並保持通訊,列印 resource manager 返回的日誌資訊
job.waitForCompletion(true);
以上是在 windows 下啟動執行,以下是在 linux 下執行
hadoop jar xx.jar 啟動類 // hadoop jar 會把本機器上 hadoop 安裝目錄下的所有jar包和配置檔案(core-site.xml)載入到執行語句的classpath下,這樣jvm就可以讀取到想要的配置
1.4 yarn
客戶端提交一個job,在yarn的resourcemanager中請求一個容器來執行,在啟動 mapreduce 的時候會首先啟動一個 MRAppMaster 類,用來管理所有的 mapreduce,需要1.5G的一個啟動記憶體,所以記憶體至少要2G以上,而 maptask 和 reducetask 都需要至少1G記憶體。並不是說會全部吃滿記憶體,但是需要這樣的一個上限值才能啟動。
會依次啟動yarnchild(maptask)和yarnchild(reducetask),提交到job的客戶端
第一代hadoop,只有一個hdfs檔案系統和mr計算排程平臺
第二代hadoop,將排程交給yarn統一管理資源
Hdfs namenode datanode 第一代 client提交job,jobtracker先從namenode裡拿到所需檔案的資料塊資訊,傳送到datanode中執行,tasktracker監督datanode狀態,反饋給jobtracker,並一直反饋給client。客戶端包括ide執行或者shell裡 hadoop jar x.jar
Yarn 分為resourcemanager nodemanager 主從結構
1.使用者向YARN中提交應用程式,其中包括AM程式、啟動AM的命令、使用者程式等。
2.RM為該應用程式分配一個container,並向對應的NM通訊,要求他在這個container中啟動ApplicationMaster。
3.AM向RM註冊,這樣使用者可以直接通過RM檢視應用程式的執行狀態,然後它將為任務申請資源,並監控其執行狀態,直到執行結束(重複4-7)
4.AM採用輪詢的方式通過RPC協議向RM申請和領取資源。
5.AM申請到資源後,便於NM通訊,要求其啟動任務。
6.NM為任務設定好執行環境(包括環境變數、JAR包、二進位制程式等)後,將任務啟動命令寫到一個指令碼中,並通過執行改指令碼啟動任務。
7.各個任務通過RPC協議向AM彙報自己的進度和狀態,讓AM隨時掌握各個任務的執行狀態,從而可以在任務失敗是重啟任務。在應用程式執行過程中,使用者可隨時通過RPC向AM查詢應用程式的當前執行狀態。
8.應用程式執行完成後,AM登出並關閉自己。
1.5 自定義型別
輸入輸出都是 k,v 的形式,k 是唯一的,但是 v 如果有很多個,那就使用一個類來封裝,但是這個類需要實現 hadoop 的序列化介面:implements Writable,重寫 write(DataOutput out) 和 readFields(DataInput in) 方法。
out.writeInt(欄位); // 一定是4個位元組
out.writeUTF("我"); // 一共是5個位元組,按照utf-8編碼,一個漢字3個位元組,然後最前面有2個位元組來聲明後面有幾個位元組是屬於這個字串的,所以可以通過這個方式來擷取位元組長度,然後反序列化字串
out,write("我".getBytes()); // 3個位元組,單純的utf-8編碼,一個漢字3個位元組,在傳輸時並不知道具體字串會有多長,不方便擷取,所以更推薦使用上面的方式
unicode 一個英文2個位元組,一個漢字2個位元組,中英文標點都是2個位元組
utf-8 一個英文1個位元組,一個漢字3個位元組,中文標點3個位元組,英文標點1個位元組
gbk 英文1個位元組,漢字2個位元組
欄位 = in.readInt();
欄位 = in.readUTF();
1.6 以求最大值為例,在reduce之後還要進行最終結果的篩選
首先在 reduce() 方法裡就不能直接用 context.write()
,這是直接寫結果的。可以建立一個類,來儲存 reduce 後的所有結果,然後在 reduceTask 處理完所有資料之後,會呼叫 cleanUp(Context context) 方法,重寫這個方法,獲取到這個類,然後進行資料篩選,把最終結果寫到 context.write() 裡
但是如果是大資料量的話,通過構建最小堆來獲取最大的幾個值
Configuration conf = context.getConfiguration();
int topN = conf.getInt("topN", 5); // 會嘗試從 conf 中獲取這個 name 對應的值,如果沒有配置的話,預設值就是5;可以在啟動類那裡用過傳參來設定這個topN是多少;或者自己寫一個xml配置檔案,通過 conf.addResource("xx.xml"),就可以識別出來
在 map 和 reduce 裡都存在 setup()、map()/reduce()、cleanup() 方法
eg:
父類 Mapper(){
run(){
setup();
try{
while(){
mapper();
}
}finally{
cleanup();
}
}
}
1.7 Partitioner 修改分發規則
分發的動作由 maptask 來執行,由 maptask 來執行 getPartitioner() 方法,然後分發到對應的 reduce 中
原始碼:
預設是按照 map 輸出的 key.hashcode() % reduceTask的數量
public class HashPartitioner<K2, V2> implements Partitioner<K2, V2>{
public void configure(JobConf job){}
public int getPartitioner(K2 key, V2 value, int numReduceTasks){
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
建立一個類,extends Partitioner 覆寫 getPartitioner() 方法,在啟動類中設定 job.setPartitionerClass(xxx.class);
public static class WordcountHashPartitioner extends Partitioner<Text, IntWritable> {
@Override
public int getPartition(Text key, IntWritable value, int numPartitions) {
String location = key.toString().split(":")[0];
return Math.abs(location.hashCode() * 127) % numPartitions;
}
}
其他的幾個分發方法:BinaryPartitioner, HashPartitioner, KeyFieldBasedPartitioner, TotalOrderPartitioner。這些分發方法只能保證大體有序
全域性排序:確保Partition之間是有序
對資料進行抽樣分層,為了保證partition分佈均勻,在相同區間的資料返回到同一個reduce,map輸出的key預設是按照字典順序實現了升序排序,如果需要降序或者輸出的key是一個物件的話,要覆寫comparable,那麼到一個reduce中的資料都是有序的。
取樣:
- 對Math.min(10, splits.length)個split(輸入分片)進行隨機取樣,對每個split取10000個樣,總共10萬個樣
- 10萬個樣排序,根據reducer的數量(n),取出間隔平均的n-1個樣
- 將這個n-1個樣寫入partitionFile(_partition.list,是一個SequenceFile),key是取的樣,值是nullValue
- 將partitionFile寫入DistributedCache
全域性排序在key是相對穩定的時候,可以隨機取樣元資料,重寫partitioner方法,將資料分到reduce上,來保證資料的連續性;還有一個方式是實現TotalOrderPartitioner類的方法,一共有三種取樣方式,動態生成一個partition file,在查詢具體key應該分到哪個reduce時底層是通過一個Tire tree來實現,使用二分法繼續查詢找到返回key在劃分陣列中的索引,找不到會返回一個和它最接近的劃分的索引。
類名稱 | 取樣方式 | 構造方法 | 效率 | 特點 |
---|---|---|---|---|
SplitSampler<K,V> | 對前n個記錄進行取樣 | 取樣總數,劃分數 | 最高 | |
RandomSampler<K,V> | 遍歷所有資料,隨機取樣 | 取樣頻率,取樣總數,劃分數 | 最低 | |
IntervalSampler<K,V> | 固定間隔取樣 | 取樣頻率,劃分數 | 中 | 對有序的資料十分適用 |
1.8 map端輸入的分割槽切片原理
在 map 方法中,有一個變數是 context,儲存了作業執行時的上下文資訊,可以通過 InputSplit inputSplit = context.getInputSplit();
方法來獲取切片資訊
在 Hadoop 讀取檔案時,會通過 getPartitions()
方法來獲取分割槽,在這個方法裡會呼叫 InputFormat 這個抽象類的 getSplits()
方法,這個方法的具體實現是在 FileInputFormat 類中對 getSplits()
方法進行覆寫。如何去切分檔案資料的思路是:
- inputPath 如果是一個資料夾的話,遍歷裡面的所有檔案,累加所有檔案的size作為totalSize
- 求一個平均檔案大小
goalSize = totalSize / numSplits; // numSplits 指的是 min(想要的分割槽個數,預設分割槽個數2個) 在spark裡面是這樣的,mr可能是讀的配置
while(fileSize / goalSize > 1.1L){fileSize -= goalSize}
如果當前檔案的大小超過平均大小的1.1倍,那麼就從平均大小對應的offset那裡進行切分檔案- 綜上,如果檔案不可切分,那麼一個分割槽就是那一整個不可切分檔案;如果檔案可切分,計算出一個理想檔案大小,然後依次判斷檔案大小和這個理想檔案大小的關係,幅度在1.1倍之內的都可以接受,否則就需要切割。
切片是在 data manager 拿到 mr 後,建立容器來執行 maptask,建立幾個要根據切片後的結果來確定。通過 getSplits() 方法,獲取所有輸入檔案(也可能是連線資料庫,那就儲存的是庫名錶名偏移量資訊),然後按照128M開始切片,不到128M的也會是一個maptask,不允許被切片的檔案比如二進位制檔案,會單獨一個切片,空檔案也會是一個切片,一個切片可以跨大檔案的多個block
當一個分片包含的多個block的時候,總會從其他節點讀取資料,也就是做不到所有的計算都是本地化。為了發揮計算本地化效能,應該儘量使InputSplit大小與塊大小相當
由於 InputSplit 是一個抽象類,用的時候,需要用它的實現方法
FileSplit inputSplit = (FileSplit)context.getInputSplit(); // 強制轉換,從大的類轉到小的子類
String fileName = inputSplit.getPath().getName();
可以用於倒排索引,獲取檔案的資訊
1.9 分組排序 WritableComparator
比如兩個 key 是否一樣,會呼叫 GroupComparator 類下的 compare(o1, o2) 方法
用bean來做key,包括orderid,amounfee兩個屬性,相同的orderid排在一起,id小的在前面
重寫分發規則 partitioner,讓orderid相同的分到同一個reduce
重寫 groupComparator,由於傳遞的key是物件,所以要指出來只要orderid相同,就會被看成同一組進行一次reduce聚合(跟上一步不一樣的地方在於,有可能存在不同的key取hashcode再取模後分發到同一個reduce上,這時候需要通過groupComparator類下的compare()方法來保證同一類orderid進行聚合)
class OrderIdGroupingComparator extends WritableComparator {
public OrderIdGroupingComparator(){
super(OrderBean.class, true); // 呼叫父類的建構函式,將 OrderBean.class 序列化,要不然下面的方法無法識別 OrderBean,true代表是執行序列化
}
@Overwrite
public int compare(WritableComparable a, WritableComparable b){
OrderBean o1 = (OrderBean)a;
OrderBean o2 = (OrderBean)b;
return o1.getOrderId().compareTo(o2.getOrderId());
}
}
1.10 合合
配置類讀取所有的庫表以及欄位關係
final HiveConf hiveconf = new HiveConf(conf, HHDriver.class);
HiveMetaStoreClient client = new HiveMetaStoreClient(hiveconf);
通過HiveMetaStoreClient連線Hive,先要拿到庫表的分割槽資訊,client.listPartitionNames,如果存在就新增,如果沒有就新建
優化:
map 中 context(庫.表,kafka中的data)
extends Partitioner 庫.表+rand() 重寫 int getPartition()
extends WritableComparator 重寫 int compare()
reduce 中 保證是同一類庫.表,輸出 mos
MultipleOutputs mos 結果輸出到多個檔案或多個資料夾,用 mos.write(String nameOutput, Key key,Value value,String baseOutputPath)代替context.write(key, value) // nameOutput 用來指定是哪一種輸出,通過別稱,可以設定輸出的不同地址、資料格式等
增量表是 text,最終表是 parquet。基於列式儲存,由於每一列的資料型別都是一樣的,因此可以針對每一列的資料型別使用更高效的壓縮演算法;在查詢時只要掃描需要查詢的列資料,不用進行全表掃描
String newStr = new String(str.getBytes("UTF-8"),"UTF-8"); 將字串按照 UTF-8 放入到bytes[]數組裡,再按照 UTF-8 的方式讀取