Hadoop On Yarn Mapreduce執行原理與常用資料壓縮格式
我們通過提交jar包,進行MapReduce處理,那麼整個執行過程分為五個環節:
1、向client端提交MapReduce job.
2、隨後yarn的ResourceManager進行資源的分配.
3、由NodeManager進行載入與監控containers.
4、通過applicationMaster與ResourceManager進行資源的申請及狀態的互動,由NodeManagers進行MapReduce執行時job的管理.
5、通過hdfs進行job配置檔案、jar包的各節點分發。
Job 提交過程
job的提交通過 呼叫submit()方法 建立一個 JobSubmitter 例項,並 呼叫submitJobInternal() 方法。整個job的執行過程如下:
1、向ResourceManager申請application ID,此ID為該MapReduce的jobId。
2、檢查output的路徑是否正確,是否已經被建立。
3、計算input的splits。
4、拷貝執行job 需要的jar包、配置檔案以及計算input的split 到各個節點。
5、在ResourceManager中呼叫submitAppliction()方法,執行job
Job 初始化過程
1、當resourceManager收到了submitApplication()方法的呼叫通知後,scheduler開始分配Container,隨之ResouceManager傳送applicationMaster程序,告知每個nodeManager管理器。
2、 由applicationMaster決定 如何執行tasks,如果job資料量比較小,applicationMaster便選擇 將tasks執行在一個JVM中 。那麼如何判別這個job是大是小呢?當一個job的 mappers數量小於10個 , 只有一個reducer或者讀取的檔案大小要小於一個HDFS block時 ,(可通過修改配置項mapreduce.job.ubertask.maxmaps,mapreduce.job.ubertask.maxreduces以及mapreduce.job.ubertask.maxbytes 進行調整)
3、在執行tasks之前,applicationMaster將會 呼叫setupJob()方法 ,隨之建立output的輸出路徑(這就能夠解釋,不管你的mapreduce一開始是否報錯,輸出路徑都會建立)
Task 任務分配
1、接下來applicationMaster向ResourceManager請求containers用於執行map與reduce的tasks(step 8),這裡map task的優先順序要高於reduce task,當所有的map tasks結束後,隨之進行sort(這裡是shuffle過程後面再說),最後進行reduce task的開始。(這裡有一點,當map tasks執行了百分之5%的時候,將會請求reduce,具體下面再總結)
2、執行tasks的是需要消耗記憶體與CPU資源的, 預設情況下,map和reduce的task資源分配為1024MB與一個核 ,(可修改執行的最小與最大引數配置,mapreduce.map.memory.mb,mapreduce.reduce.memory.mb,mapreduce.map.cpu.vcores,mapreduce.reduce.reduce.cpu.vcores.)
Task 任務執行
1、這時一個task已經被ResourceManager分配到一個container中,由applicationMaster告知nodemanager啟動container,這個task將會被一個 主函式為YarnChild 的Java application執行,但在執行task之前, 首先定位task需要的jar包、配置檔案以及載入在快取中的檔案 。
2、YarnChild運行於一個專屬的JVM中,所以 任何一個map或reduce任務出現問題,都不會影響整個nodemanager的crash或者hang 。
3、每個task都可以在相同的JVM task中完成,隨之將完成的處理資料寫入臨時檔案中。
Mapreduce資料流
執行進度與狀態更新
1、MapReduce是一個較長執行時間的批處理過程,可以是一小時、幾小時甚至幾天,那麼Job的執行狀態監控就非常重要。每個job以及 每個task都有一個包含job(running,successfully completed,failed)的狀態 ,以及value的計數器,狀態資訊及描述資訊(描述資訊一般都是在程式碼中加的列印資訊),那麼,這些資訊是如何與客戶端進行通訊的呢?
2、當一個task開始執行,它將會保持執行記錄,記錄task完成的比例,對於map的任務,將會記錄其執行的百分比,對於reduce來說可能複雜點,但系統依舊會估計reduce的完成比例。當一個map或reduce任務執行時, 子程序會持續每三秒鐘與applicationMaster進行互動 。
Job 完成
最終,applicationMaster會收到一個job完成的通知,隨後改變job的狀態為successful。最終,applicationMaster與task containers被清空。
Shuffle與Sort
從map到reduce的過程,被稱之為shuffle過程,MapReduce使到reduce的資料一定是經過key的排序的,那麼shuffle是如何運作的呢?
當map任務將資料output時, 不僅僅是將結果輸出到磁碟,它是將其寫入記憶體緩衝區域,並進行一些預分類 。
1、The Map Side
首先map任務的 output過程是一個環狀的記憶體緩衝區,緩衝區的大小預設為100MB(可通過修改配置項mpareduce.task.io.sort.mb進行修改),當寫入記憶體的大小到達一定比例 ,預設為80% (可通過mapreduce.map.sort.spill.percent配置項修改),便開始寫入磁碟。
在寫入磁碟之前,執行緒將會指定資料寫入與reduce相應的patitions中,最終傳送給reduce.在每個partition中 ,後臺執行緒將會在記憶體中進行Key的排序 ,( 如果程式碼中有combiner方法,則會在output時就進行sort排序 ,這裡,如果只有少於3個寫入磁碟的檔案,combiner將會在outputfile前啟動,如果只有一個或兩個,那麼將不會呼叫)
這裡 將map輸出的結果進行壓縮會大大減少磁碟IO與網路傳輸的開銷 (配置引數mapreduce.map .output.compress 設定為true,如果使用第三方壓縮jar,可通過mapreduce.map.output.compress.codec進行設定)
隨後這些paritions輸出檔案將會通過HTTP傳送至reducers,傳送的最大啟動執行緒通過mapreduce.shuffle.max.threads進行配置。
2、The Reduce Side
首先上面每個節點的map都將結果寫入了本地磁碟中,現在reduce需要將map的結果通過叢集拉取過來,這裡要注意的是, 需要等到所有map任務結束後,reduce才會對map的結果進行拷貝 ,由於reduce函式有少數幾個複製執行緒,以至於它 可以同時拉取多個map的輸出結果。預設的為5個執行緒 (可通過修改配置mapreduce.reduce.shuffle.parallelcopies來修改其個數)
這裡有個問題,那麼reducers怎麼知道從哪些機器拉取資料呢?
當所有map的任務結束後, applicationMaster通過心跳機制(heartbeat mechanism),由它知道mapping的輸出結果與機器host ,所以 reducer會定時的通過一個執行緒訪問applicationmaster請求map的輸出結果 。
Map的結果將會被拷貝到reduce task的JVM的記憶體中(記憶體大小可在mapreduce.reduce.shuffle.input.buffer.percent中設定)如果不夠用,則會寫入磁碟。當記憶體緩衝區的大小到達一定比例時(可通過mapreduce.reduce.shuffle.merge.percent設定)或map的輸出結果檔案過多時(可通過配置mapreduce.reduce.merge.inmen.threshold),將會除法合併(merged)隨之寫入磁碟。
這時要注意, 所有的map結果這時都是被壓縮過的,需要先在記憶體中進行解壓縮,以便後續合併它們 。(合併最終檔案的數量可通過mapreduce.task.io.sort.factor進行配置) 最終reduce進行運算進行輸出。
這裡附帶的整理了下Parquet儲存結構與SequenceFile儲存結構的特點
Parquet
Parquet是面向分析型業務的列式儲存格式,由Twitter和Cloudera合作開發,2015年5月從Apache的孵化器裡畢業成為Apache頂級專案,那麼這裡就總結下Parquet資料結構到底是什麼樣的呢?
一個Parquet檔案是 由一個header以及一個或多個block塊組成,以一個footer結尾。header中只包含一個4個位元組的數字PAR1用來識別整個Parquet檔案格式。檔案中所有的metadata都存在於footer中 。footer中的metadata包含了格式的版本資訊,schema資訊、key-value paris以及所有block中的metadata資訊。footer中最後兩個欄位為一個以4個位元組長度的footer的metadata,以及同header中包含的一樣的PAR1。
讀取一個Parquet檔案時,需要完全讀取Footer的meatadata,Parquet格式檔案不需要讀取sync markers這樣的標記分割查詢,因為所有block的邊界都儲存於footer的metadata中(因為metadata的寫入是在所有blocks塊寫入完成之後的,所以吸入操作包含的所有block的位置資訊都是存在於記憶體直到檔案close)
這裡注意,不像sequence files以及Avro資料格式檔案的header以及sync markers是用來分割blocks。Parquet格式檔案不需要sync markers,因此block的邊界儲存與footer的meatada中。
在Parquet檔案中,每一個block都具有一組Row group,她們是由一組Column chunk組成的列資料。繼續往下,每一個column chunk中又包含了它具有的pages。每個page就包含了來自於相同列的值.Parquet同時使用更緊湊形式的編碼,當寫入Parquet檔案時,它會自動基於column的型別適配一個合適的編碼,比如,一個boolean形式的值將會被用於run-length encoding。
另一方面,Parquet檔案對於每個page支援標準的壓縮演算法比如支援Snappy,gzip以及LZO壓縮格式,也支援不壓縮。
Parquet格式的資料型別:
Hadoop SequenceFile
在一些應用中,我們需要一種特殊的資料結構來儲存資料,並進行讀取,這裡就分析下為什麼用SequenceFile格式檔案。
Hadoop提供的SequenceFile檔案格式提供一對key,value形式的不可變的資料結構。同時,HDFS和MapReduce job使用SequenceFile檔案可以使檔案的讀取更加效率。
SequenceFile的格式
SequenceFile的格式是 由一個header 跟隨一個或多個記錄組成 。前三個位元組是一個Bytes SEQ代表著版本號,同時header也包括key的名稱,value class , 壓縮細節,metadata,以及Sync markers。Sync markers的作用在於可以讀取任意位置的資料。
在recourds中,又分為是否壓縮格式。當沒有被壓縮時,key與value使用Serialization序列化寫入SequenceFile。當選擇壓縮格式時,record的壓縮格式與沒有壓縮其實不盡相同,除了value的bytes被壓縮,key是不被壓縮的。
在Block中,它使所有的資訊進行壓縮,壓縮的最小大小由配置檔案中,io.seqfile.compress.blocksize配置項決定。
SequenceFile的MapFile
一個MapFile可以通過SequenceFile的地址,進行分類查詢的格式。使用這個格式的優點在於,首先會將SequenceFile中的地址都載入入記憶體,並且進行了key值排序,從而提供更快的資料查詢。
寫SequenceFile檔案:
將key按100-1以IntWritable object進行倒敘寫入sequence file,value為Text objects格式。在將key和value寫入Sequence File前,首先將每行所在的位置寫入(writer.getLength())
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import java.io.IOException;import java.net.URI;
public class SequenceFileWriteDemo {
private static final String[] DATA = {
“One, two, buckle my shoe”,
“Three, four, shut the door”,
“Five, six, pick up sticks”,
“Seven, eight, lay them straight”,
“Nine, ten, a big fat hen”
};
public static void main(String[] args) throws IOException {
String uri = args[0];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
Path path = new Path(uri);
IntWritable key = new IntWritable();
Text value = new Text();
SequenceFile.Writer writer = null;
try {
writer = SequenceFile.createWriter(fs, conf, path,
key.getClass(), value.getClass());
for (int i = 0; i < 100; i++) {
key.set(100 – i);
value.set(DATA[i % DATA.length]);
System.out.printf(“[%s]\t%s\t%s\n”, writer.getLength(), key, value);
writer.append(key, value);
}
} finally {
IOUtils.closeStream(writer);
}
}
}
讀取SequenceFile檔案:
首先需要建立SequenceFile.Reader例項,隨後通過呼叫next()函式進行每行結果集的迭代(需要依賴序列化).
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
import java.io.IOException;import java.net.URI;
public class SequenceFileReadDemo {
public static void main(String[] args) throws IOException {
String uri = args[0];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
Path path = new Path(uri);
SequenceFile.Reader reader = null;
try {
reader = new SequenceFile.Reader(fs, path, conf);
Writable key = (Writable)
ReflectionUtils.newInstance(reader.getKeyClass(), conf);
Writable value = (Writable)
ReflectionUtils.newInstance(reader.getValueClass(), conf);
long position = reader.getPosition();
while (reader.next(key, value)) {//同步記錄的邊界
String syncSeen = reader.syncSeen() ? “*” : “”;
System.out.printf(“[%s%s]\t%s\t%s\n”, position, syncSeen, key, value);
position = reader.getPosition(); // beginning of next record
}
} finally {
IOUtils.closeStream(reader);
}
}
}