Hadoop2.x深入詳解
HDFS分散式檔案系統詳解
NameNode:是一箇中心伺服器,單一節點,負責管理檔案系統的名稱空間以及客戶端對檔案的訪問;NameNode負責檔案元資料的操作,DataNode負責處理檔案內容的讀寫請求,跟檔案內容相關的資料流不經過NameNode,只會詢問他與哪一個DataNode聯絡,否則NameNode會成為系統的瓶頸;副本存放在哪些DataNode上有NameNode來控制,根據全域性情況做出塊放置決定,讀取檔案NameNode儘量讓使用者先讀取最近的副本,降低塊消耗和讀取時延;NameNode全權管理資料塊的複製,它週期性的從叢集的每個Datanode接手心跳訊號和塊狀態報告,接收到心跳訊號意味著該DataNode節點正常工作,塊狀態報告包含了一個DataNode上所有資料塊的列表。
一個數據塊在DataNode以檔案儲存在磁碟上,包括兩個檔案,一個數據本身,一個是元資料包括資料塊的長度,塊資料的校驗和,以及時間戳。DataNode啟動後向NameNode註冊,通過後,週期性(1個小時)的向NameNode上報所有的塊資訊。心跳是每3秒一次,心跳返回結果帶有NameNode給該DataNode的命令如複製塊資料到另一臺機器,或者刪除某個資料塊。如果超過10分鐘沒有收到某個DataNode的心跳,則認為該節點不可用。叢集執行中可以安全加入和退出一些機器。
在core-site.xml配置檔案中配置的資料儲存目錄/data/tmp/dfs下,有如下資料夾:data(datanode存放資料)、name(namenode存放資料)、namesecondary(secondarynamenode存放資料)。
檔案切成塊預設大小128M,以塊為單位,每個塊有多個副本儲存在不同的機器上,副本數可在檔案生成時指定預設是3。(配置或者命令都可以)
可以建立、刪除、移動或者重新命名檔案,當檔案建立、寫入和關閉之後不能修改檔案內容。
1、資料損壞處理
當DataNode讀取block的時候,它會計算checksum,如果計算後的checksum與block建立時值不一樣,說明該block已經損壞。Client讀取其他DataNode上的block。NameNode標記該塊已經損壞,然後複製block達到預期設定的檔案備份數。並刪除損壞的塊。DataNode在其檔案建立後三週驗證其checksum。
2、HDFS互動式shell使用
(1)檔案操作
$ bin/hdfs dfs ……
-chgrp
-chmod
-chown
-ls
-mkdir
-put/get
-rm
-rmdir
-test
-text
(2)叢集管理
-report 叢集狀態資訊 50070埠也可以看
-safemode 安全模式
-refresh 新增節點後重新整理
3、HDFS java APIpublic class Test {
static final String PATH = "hdfs://hadoop-senior.ibeifeng.com:8020/";
static final String DIR = "hdfs://hadoop-senior.ibeifeng.com:8020/d1";
static final String FILE = "/d1/hello";
public static void main(String[] args) throws Exception {
//獲取fileSystem
FileSystem fileSystem = getFileSystem();
// 建立資料夾 hadoop dfs -mkdir /d1
mkdir(fileSystem);
// 刪除資料夾 hadoop dfs -rm /d1
remove(fileSystem);
// 上傳檔案 hadoop dfs -put src des
putData(fileSystem);
// 下載檔案 hadoop dfs -get src des
getData(fileSystem);
// 瀏覽資料夾hadoop dfs -ls /
list(fileSystem);
}
private static void list(FileSystem fileSystem) throws IOException {
FileStatus[] listStatus = fileSystem.listStatus(new Path("/"));
for (FileStatus fileStatus : listStatus) {
String idDir = fileStatus.isDir() ? "資料夾" : "檔案";
String permission = fileStatus.getPermission().toString();
short replication = fileStatus.getReplication();
long len = fileStatus.getLen();
String path = fileStatus.getPath().toString();
System.out.println(idDir + "\t" + permission + "\t" + replication
+ "\t" + len + "\t" + path);
}
}
private static void getData(FileSystem fileSystem) throws IOException {
FSDataInputStream in = fileSystem.open(new Path(FILE));
IOUtils.copyBytes(in, System.out, 1024, true);
}
private static void putData(FileSystem fileSystem) throws IOException {
FSDataOutputStream out = fileSystem.create(new Path(FILE));
FileInputStream in = new FileInputStream("C:/readme.txt");
IOUtils.copyBytes(in, out, 1024, true);
}
private static void remove(FileSystem fileSystem) throws IOException {
fileSystem.delete(new Path(DIR), true);
}
private static void mkdir(FileSystem fileSystem) throws IOException {
fileSystem.mkdirs(new Path(DIR));
}
private static FileSystem getFileSystem() throws IOException,
URISyntaxException {
FileSystem fileSystem = FileSystem.get(new URI(PATH),
new Configuration());
return fileSystem;
}
}
4、NameNode啟動過程詳解NameNode的資料存放在兩個地方
記憶體當中
本地磁碟
fsimage 格式化檔案系統就是為了生成這個檔案
edits
第一次啟動,格式化HDFS後,目的生成fsimage
Start NameNode執行這個命令後,會read fsimage 檔案
Start DataNode向NameNode註冊,block report(塊彙報)
接下來執行命令
create dir /user/beifeng/tmp -> write [edits]
put files /user/beifeg/tmp(*-site.xml) -> write [edits]
delete file /user/beifeng/tmp/core-site.xml -> write [edits]
第二次啟動
Start NameNode執行這個命令後,會read fsimage;read edits;
生成一個新的fsimage (合併fsimage和edits)
生成新的edits 這個時候edits為null
Start DataNode向NameNode註冊;block report(塊彙報)
接下來執行命令
create dir /user/beifeng/tmp -> write [edits]
put files /user/beifeg/tmp(*-site.xml) -> write [edits]
delete file /user/beifeng/tmp/core-site.xml -> write [edits]
5、SecondaryNameNode輔助功能詳解(HA之後不啟動) NameNode中的編輯日誌檔案太大的話如何處理。定期(預設一個小時一次)合併fsimage和edits生成新的fsimage 給NameNode
6、安全模式safemode
等待DataNodes向NameNode傳送block report,(datanodes blocks)/(total blocks) = 99.99%s時,安全模式結束。在安全模式下可以檢視檔案系統的檔案,不能改變檔案系統的命令空間,如建立、上傳、刪除檔案等操作。
$ bin/hdfs dfsadmin –safemode
enter
leave
get
yarn架構詳解
Yarn的架構
在Yarn中,JobTracker被分為兩部分:ResouceManager(RM)和ApplicationMaster(AM)
MR v1主要由三部分組成:程式設計模型(API)、資料處理引擎(MapTask和ReduceTask)和執行環境(JobTracker和TaskTracker);
Yarn繼承了MR v1的程式設計模型和處理引擎,改變的只是執行環境,所以對程式設計沒有什麼影響。
RM負責資源排程,AM負責任務排程:RM負責整個叢集的資源管理與排程;NodeManager(NM)負責單個節點的資源管理與排程;NM定時的通過心跳的形式與RM進行通訊,報告節點的健康狀態與記憶體使用情況;AM通過與RM互動獲取資源,然後通過與NM互動,啟動計算任務。
在Yarn的框架管理中,無論是AM從RM申請資源,還是NM管理自己所在節點的資源,都是通過Container進行的。Container是Yarn的資源抽象,此處的資源包含記憶體和CPU等。
- ResourceManager:全域性資源管理器,整個叢集只有一個,負責叢集資源的統一管理和排程分配。
功能:處理客戶端請求;啟動/監控ApplicationMaster;監控NodeManager;資源分配和排程。 - NodeManager:整個叢集有多個,負責單節點資源管理和使用。
功能:單個節點上的資源管理和任務管理;處理來自ResourceManager的命令;處理來自ApplicationMaster的命令。
NodeManger管理抽象容器,這些容器代表著可供一個特定應用程式使用的針對每個節點的資源。定時的想RM彙報本節點上資源使用情況和各個Container的執行狀態。 - ApplicationMaster:管理一個在Yarn內執行的應用程式的每個例項。
功能:資料切分;為應用程式申請資源,並進一步分配給內部任務;任務監控和容錯。負責協調來自ResourceManager的資源,開通過NodeManager監視容器的執行和資源使用(CPU、記憶體等資源的分配) - Container:Yarn中的資源抽象,封裝某個節點上多維度資源,如記憶體、CPU、磁碟網路等,當AM向RM申請資源時,RM向AM返回的資源便是用Container表示的。Yarn會為每個任務分配一個Container,且該任務只能使用該Container中描述的資源。
功能:對任務執行環境的抽象;描述一系列資訊;任務執行資源(節點、記憶體、CPU);任務啟動命令;任務執行環境
Yarn資源管理和排程
資源排程和資源隔離是Yarn作為一個資源管理系統,最重要和最基礎的兩個功能,資源排程由ResourceManager完成,而資源隔離由各個NodeManager實現。
ResourceManager將某個NodeManager上資源分配給任務(這就是所謂的“資源排程”)後,NodeManager需按照要求為任務提供相應的資源,甚至保證這些資源應具有獨佔性,為任務執行提供基礎的保證,這就是所謂的資源隔離。
當談及到資源時,我們通常指記憶體、CPU和IO三種資源。Hadoop YARN同時支援記憶體和CPU兩種資源的排程。記憶體資源的多少會決定任務的生死,如果記憶體不夠,任務可能會執行失敗,相比之下,CPU資源則不同,它只會決定任務執行的快慢,不會對生死產生影響。
Yarn執行使用者配置每個節點上可用的實體記憶體資源,注意:這裡是“可用的”,因為一個節點上的記憶體會被若干個服務共享,比如一部分給yarn,一部分給hdfs,一部分給hbase等等。
Yarn的執行流程:
- 使用者向Yarn中提交應用程式,其中包括AM程式,啟動AM的命令,使用者程式等。
- RM為該程式分配第一個Container,並與對應的NM通訊,要求它在這個Container中啟動應用程式AM。
- AM首先向RM註冊,這樣使用者可以直接通過RM檢視應用程式的執行狀態,然後將為各個任務申請資源,並監控它的執行狀態,直到執行結束。重複4-7步驟。
- AM採用輪詢的方式通過RPC協議向RM申請和領取資源
- 一旦AM申請到資源後,便與對應的NM通訊,要求它啟動任務
- NM為任務設定好執行環境(包括環境變數,JAR包、二進位制程式等)後,將任務啟動命令寫到一個指令碼中,並通過執行該指令碼啟動任務。
- 應用程式執行完成後,AM向RM登出並關閉自己。
配置CPU和記憶體的大小:
<property>
<name>yarn.nodemanager.resource.memory-mb </name>
<value>4096</value> 4G記憶體 預設8G
</property>
<property>
<name> yarn.nodemanager.resource.cpu-vcores</name>
<value>4 </value> 4核 預設8核
</property>
MapReduce程式設計模型
一種分散式計算模型,解決海量資料的計算問題。MapReduce將整個平行計算過程抽象到兩個函式:
1、Map:對一些獨立元素組成的列表的每一個元素進行指定的操作,可以高度並行。
1.1 讀取輸入檔案內容,解析成key、value對。對輸入檔案的每一行,解析成key、value對。每一個鍵值對呼叫一次map函式。
1.2 寫自己的邏輯,對輸入的key、value處理,轉換成新的key、value輸出。
1.3 對輸出的key、value進行分割槽。
1.4 對不同分割槽的資料,按照key進行排序、分組。相同key的value放到一個集合中。
1.5 (可選)分組後的資料進行歸約。
2、Reduce:對一個列表的元素進行合併。
2.1 對多個map任務的輸出,按照不同的分割槽,通過網路copy到不同的reduce節點。
2.2 對多個map任務的輸出進行合併、排序。寫reduce函式自己的邏輯,對輸入的key、value處理,轉換成新的key、value輸出。
2.3 把reduce的輸出儲存到檔案中。
一個簡單的MapReduce程式只需要指定map(),reduce()、input、output,剩下的事由框架完成。
MapReduce程式設計例項(wordcount)
public class WordCountApp extends Configured implements Tool{
static final String INPUT_PATH = "hdfs://hadoop-senior.ibeifeng.com:8020/hello";
static final String OUT_PATH = "hdfs://hadoop-senior.ibeifeng.com:8020/out";
public static void main(String[] args) throws Exception {
ToolRunner.run(new WordCountApp(), args);
}
public static void run(String[] args) throws Exception {
INPUT_PATH = args[0];
OUT_PATH = args[1];
Configuration conf = new Configuration();
final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
final Path outPath = new Path(OUT_PATH);
if (fileSystem.exists(outPath)) {
fileSystem.delete(outPath, true);
}
final Job job = new Job(conf, WordCountApp.class.getSimpleName());
//打成jar包執行
job.setJarByClass(WordCountApp.class);
// 1.1指定讀取的檔案位於哪裡
FileInputFormat.setInputPaths(job, INPUT_PATH);
// 指定如何對輸入檔案進行格式化,把輸入檔案每一行解析成鍵值對
job.setInputFormatClass(TextInputFormat.class);
// 1.2 指定自定義的map類
job.setMapperClass(MyMapper.class);
// map輸出的<k,v>型別。如果<k3,v3>的型別與<k2,v2>型別一致,則可以省略
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
// 1.3 分割槽
//job.setPartitionerClass(HashPartitioner.class);
// 設定reduce任務數,有一個reduce任務執行
job.setNumReduceTasks(1);
// 1.4 TODO 排序、分組
// 1.5 TODO 規約
// 2.2 指定自定義reduce類
job.setReducerClass(MyReducer.class);
// 指定reduce的輸出型別
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
// 2.3 指定寫出到哪裡
FileOutputFormat.setOutputPath(job, outPath);
// 指定輸出檔案的格式化類
job.setOutputFormatClass(TextOutputFormat.class);
//把job提交執行 true表示列印結果資訊
job.waitForCompletion(true);
}
/**
* KEYIN 即k1:表示行的偏移量
* VALUEIN 即v1:表示行文字內容
* KEYOUT 即k2:表示行中出現的單詞
* VALUEOUT 即v:2表示行中出現的單詞的次數,固定值1
*/
static class MyMapper extends
Mapper<LongWritable, Text, Text, LongWritable> {
private Text word = new Text();
private LongWritable one = new LongWritable(1);
protected void map(LongWritable k1, Text v1, Context ctx)
throws java.io.IOException, InterruptedException {
StringTokenizer st = new StringTokenizer(v1, "\t");
while(st.hasMoreTokens()){
word.set(st.nextToken());
ctx.write(word, one);
}
};
}
/**
* KEYIN 即k2:表示行中出現的單詞
* VALUEIN 即v2:表示行中出現的單詞的次數
* KEYOUT 即k3:表示文字中出現的不同單詞
* VALUEOUT 即v3:表示文字中出現的不同單詞的總次數
*/
static class MyReducer extends
Reducer<Text, LongWritable, Text, LongWritable> {
protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s,Context ctx)
throws java.io.IOException, InterruptedException {
long times = 0L;
for (LongWritable count : v2s) {
times += count.get();
}
ctx.write(k2, new LongWritable(times));
};
}
}
自定義資料型別實現KPI
自定義的型別必須實現Writable,如果需要排序還必須實現 Comparable
public class KpiApp extends Configured implements Tool{
static final String INPUT_PATH = "hdfs://hadoop-senior.ibeifeng.com:8020/wlan";
static final String OUT_PATH = "hdfs://hadoop-senior.ibeifeng.com:8020/out";
public static void main(String[] args) throws Exception {
ToolRunner.run(new KpiApp(), args);
}
public static void run(String[] args) throws Exception {
INPUT_PATH = args[0];
OUT_PATH = args[1];
Configuration conf = new Configuration();
final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
final Path outPath = new Path(OUT_PATH);
if (fileSystem.exists(outPath)) {
fileSystem.delete(outPath, true);
}
final Job job = new Job(conf, KpiApp.class.getSimpleName());
//打成jar包執行
job.setJarByClass(KpiApp.class);
// 1.1指定讀取的檔案位於哪裡
FileInputFormat.setInputPaths(job, INPUT_PATH);
// 指定如何對輸入檔案進行格式化,把輸入檔案每一行解析成鍵值對
job.setInputFormatClass(TextInputFormat.class);
// 1.2 指定自定義的map類
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(KpiWritable.class);
// 1.3 分割槽
//job.setPartitionerClass(HashPartitioner.class);
// 設定reduce任務數,有一個reduce任務執行
//job.setNumReduceTasks(1);
// 1.4 TODO 排序、分組
// 1.5 TODO 合併、規約
// 2.2 指定自定義reduce類
job.setReducerClass(MyReducer.class);
// 指定reduce的輸出型別
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(KpiWritable.class);
// 2.3 指定寫出到哪裡
FileOutputFormat.setOutputPath(job, outPath);
// 指定輸出檔案的格式化類
job.setOutputFormatClass(TextOutputFormat.class);
//把job提交執行
job.waitForCompletion(true);
}
static class MyMapper extends Mapper<LongWritable, Text, Text, KpiWritable> {
@Override
protected void map(LongWritable key,Text value,Context context)
throws IOException, InterruptedException {
String[] splited = value.toString().split("\t");
String msisdn = splited[1];
Text k2 = new Text(msisdn);
KpiWritable v2 = new KpiWritable(splited[6], splited[7],splited[8], splited[9]);
context.write(k2, v2);
}
}
static class MyReducer extends Reducer<Text, KpiWritable, Text, KpiWritable> {
@Override
protected void reduce(Text k2,Iterable<KpiWritable> v2s,Context context)
throws IOException, InterruptedException {
long upPackNum = 0L;
long downPackNum = 0L;
long upPayLoad = 0L;
long downPayLoad = 0L;
for (KpiWritable kpiWritable : v2s) {
upPackNum += kpiWritable.upPackNum;
downPackNum += kpiWritable.downPackNum;
upPayLoad += kpiWritable.upPayLoad;
downPayLoad += kpiWritable.downPayLoad;
}
KpiWritable v3 = new KpiWritable(upPackNum + "", downPackNum + "",upPayLoad + "", downPayLoad + "");
context.write(k2, v3);
}
}
}
class KpiWritable implements Writable {
long upPackNum;
long downPackNum;
long upPayLoad;
long downPayLoad;
public KpiWritable() {
}
public KpiWritable(String upPackNum, String downPackNum, String upPayLoad,String downPayLoad) {
this.upPackNum = Long.parseLong(upPackNum);
this.downPackNum = Long.parseLong(downPackNum);
this.upPayLoad = Long.parseLong(upPayLoad);
this.downPayLoad = Long.parseLong(downPayLoad);
}
@Override
public void readFields(DataInput in) throws IOException {
this.upPackNum = in.readLong();
this.downPackNum = in.readLong();
this.upPayLoad = in.readLong();
this.downPayLoad = in.readLong();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upPackNum);
out.writeLong(downPackNum);
out.writeLong(upPayLoad);
out.writeLong(downPayLoad);
}
@Override
public String toString() {
return upPackNum + "\t" + downPackNum + "\t" + upPayLoad + "\t"+ downPayLoad;
}
}
Combiners程式設計
/**
* 問:為什麼使用Combiner?
* 答:Combiner發生在Map端,對資料進行規約處理,資料量變小了,傳送到reduce端的資料量變小了,傳輸時間變短,作業的整體時間變短。
*
* 問:為什麼Combiner不作為MR執行的標配,而是可選步驟哪?
* 答:因為不是所有的演算法都適合使用Combiner處理,例如求平均數。
*
* 問:Combiner本身已經執行了reduce操作,為什麼在Reducer階段還要執行reduce操作哪?
* 答:combiner操作發生在map端的,處理一個任務所接收的檔案中的資料,不能跨map任務執行;只有reduce可以接收多個map任務處理的資料。
*
*/
//1.5 TODO 規約
job.setCombinerClass(MyCombiner.class);
static class MyCombiner extends Reducer<Text, LongWritable, Text, LongWritable>{
protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s, Context ctx)
throws java.io.IOException ,InterruptedException {
long times = 0L;
for (LongWritable count : v2s) {
times += count.get();
}
ctx.write(k2, new LongWritable(times));
}
}
Partitioner程式設計
/**
* 分割槽的例子必須打成jar執行
* 用處: 1.根據業務需要,產生多個輸出檔案
* 2.多個reduce任務在執行,提高整體job的執行效率
*/
//1.3 指定分割槽類
job.setPartitionerClass(KpiPartitioner.class);
job.setNumReduceTasks(2);
static class KpiPartitioner extends HashPartitioner<Text, LongWritable>{
@Override
public int getPartition(Text key, LongWritable value, int numReduceTasks) {
return (key.toString().length()==11)?0:1;
}
}
Shuffle執行流程詳解
MR執行過程
Step1:
Input輸入
InputFormat讀取資料轉換成<key,value>
FileInputFormat
TextInputFormat
DBInputFormat
Step2:
Map階段
Map(KEYIN,VALUEIN,KEYOUT,VALUEOUT)
預設情況下:
KEYIN:偏移量 LongWritable
VALUE:內容 TEXT
Step3:
Shuffle階段
Map端輸出,output<key,value>:
output輸出首先是到記憶體,後面spill(溢寫到磁碟),可能有很多檔案。
分割槽 partition (指定哪些map給哪些reduce處理)
排序sort
很多小檔案,spill
合併merge
排序sort
最後變成大檔案 –>Map Task執行機器的本地磁碟
Reduce端輸入
Reduce Task會到Map Task執行機器上拷貝要處理的資料。
合併merge
排序sort
分組group :將相同key的value放在一起
總的來說:
分割槽:partitioner
排序:sort
拷貝:copy
分組:group
壓縮:compress(map階段大檔案)
合併:combiner(Map任務端的reduce,不是所有情況都適用)
Step4:
Reduce階段
Reduce(KEYIN,VALUEIN,KEYOUT,VALUEOUT)
Step5:
Output輸出
OutputFormat
FileOutputFormat
TextOutputFormat:預設情況每個<key,value>輸出一行,key、value的中間分隔符為\t,
預設呼叫key和value的toString()方法。
DBOutputFormat
MapReduce調優
1、Shuffle過程
compress壓縮調優 (Map端輸出壓縮)Combiner調優 (Map端輸出組合)
2、Map階段
Map階段排序小檔案合併因子配置
<property>
<name>mapreduce.task.io.sort.factor </name>
<value>10</value>
</property>
Map階段輸出到記憶體大小設定
<property>
<name>mapreduce.task.io.sort.mb </name>
<value>10</value>
</property>
達到記憶體多少開始spill到磁碟
<property>
<name>mapreduce.map.sort.spill.percent </name>
<value>0.8</value>
</property>
在沒有reduce的情況下(job.setNumReduceTask(0)),直接寫入硬碟,把資料設定很小如果設定reducer任務數為0,map端不會執行combiner,sort,merge操作,會直接輸出無序結果(讀一行,輸一行),
輸出的檔案數量,與map task的數量匹配(一個Input split對應一個map task)。
3、reduce階段
設定reduce任務
通常情況一個block就對應一個map任務進行處理,reduce任務如果人工不去設定的話就一個。reduceReduce任務個數該如何設定,程式中通過job.setNumReduceTask(2)2個。Reduce任務的數量是根據程式慢慢調。