Hdfs詳解
一.Hdfs簡介
hdfs是一個文件系統,用於存儲文件,通過統一的命名空間——目錄樹來定位文件,並且是分布式的,由很多服務器聯合起來實現其功能,集群中的服務器各自負責角色;
重要特征:
1.HDFS中的文件在物理上是分塊存儲(block),塊的大小可以通過配置參數( dfs.blocksize)來規定,默認大小在hadoop2.x版本中是128M,老版本中是64M
2.HDFS文件系統會給客戶端提供一個統一的抽象目錄樹,客戶端通過路徑來訪問文件,形如:hdfs://namenode:port/dir-a/dir-b/dir-c/file.data
3.目錄結構及文件分塊信息(元數據)的管理由namenode節點承擔——namenode是HDFS集群主節點,負責維護整個hdfs文件系統的目錄樹,以及每一個路徑(文件)所對應的block塊信息(block的id,及所在的datanode服務器)
4.文件的各個block的存儲管理由datanode節點承擔---- datanode是HDFS集群從節點,每一個block都可以在多個datanode上存儲多個副本(副本數量也可以通過參數設置dfs.replication)
5.HDFS是設計成適應一次寫入,多次讀出的場景,且不支持文件的修改
二.Hdfs的shell(命令行客戶端)操作
Hdfs提供shell命令行客戶端,使用方法如下:
常用命令參數介紹:
-help 功能:輸出這個命令參數手冊 |
-ls 功能:顯示目錄信息 示例: hadoop fs -ls hdfs://hadoop-server01:9000/ 備註:這些參數中,所有的hdfs路徑都可以簡寫 -->hadoop fs -ls / 等同於上一條命令的效果 |
-mkdir 功能:在hdfs上創建目錄 示例:hadoop fs -mkdir -p /aaa/bbb/cc/dd |
-moveFromLocal 功能:從本地剪切粘貼到hdfs 示例:hadoop fs - moveFromLocal /home/hadoop/a.txt /aaa/bbb/cc/dd -moveToLocal 功能:從hdfs剪切粘貼到本地 示例:hadoop fs - moveToLocal /aaa/bbb/cc/dd /home/hadoop/a.txt |
--appendToFile 功能:追加一個文件到已經存在的文件末尾 示例:hadoop fs -appendToFile ./hello.txt hdfs://hadoop-server01:9000/hello.txt 可以簡寫為: Hadoop fs -appendToFile ./hello.txt /hello.txt
|
-cat 功能:顯示文件內容 示例:hadoop fs -cat /hello.txt
-tail 功能:顯示一個文件的末尾 示例:hadoop fs -tail /weblog/access_log.1 -text 功能:以字符形式打印一個文件的內容 示例:hadoop fs -text /weblog/access_log.1 |
-chgrp -chmod -chown 功能:linux文件系統中的用法一樣,對文件所屬權限 示例: hadoop fs -chmod 666 /hello.txt hadoop fs -chown someuser:somegrp /hello.txt |
-copyFromLocal 功能:從本地文件系統中拷貝文件到hdfs路徑去 示例:hadoop fs -copyFromLocal ./jdk.tar.gz /aaa/ -copyToLocal 功能:從hdfs拷貝到本地 示例:hadoop fs -copyToLocal /aaa/jdk.tar.gz |
-cp 功能:從hdfs的一個路徑拷貝hdfs的另一個路徑 示例: hadoop fs -cp /aaa/jdk.tar.gz /bbb/jdk.tar.gz.2
-mv 功能:在hdfs目錄中移動文件 示例: hadoop fs -mv /aaa/jdk.tar.gz / |
-get 功能:等同於copyToLocal,就是從hdfs下載文件到本地 示例:hadoop fs -get /aaa/jdk.tar.gz -getmerge 功能:合並下載多個文件 示例:比如hdfs的目錄 /aaa/下有多個文件:log.1, log.2,log.3,... hadoop fs -getmerge /aaa/log.* ./log.sum |
-put 功能:等同於copyFromLocal 示例:hadoop fs -put /aaa/jdk.tar.gz /bbb/jdk.tar.gz.2
|
-rm 功能:刪除文件或文件夾 示例:hadoop fs -rm -r /aaa/bbb/
-rmdir 功能:刪除空目錄 示例:hadoop fs -rmdir /aaa/bbb/ccc |
-df 功能:統計文件系統的可用空間信息 示例:hadoop fs -df -h /
-du 功能:統計文件夾的大小信息 示例: hadoop fs -du -s -h /aaa/*
|
-count 功能:統計一個指定目錄下的文件節點數量 示例:hadoop fs -count /aaa/
|
-setrep 功能:設置hdfs中文件的副本數量 示例:hadoop fs -setrep 3 /aaa/jdk.tar.gz
|
三.Hdfs的工作機制
1.概述
(1)HDFS集群分為兩大角色:NameNode、DataNode
(2)NameNode負責管理整個文件系統的元數據
(3)DataNode 負責管理用戶的文件數據塊
(4)文件會按照固定的大小(blocksize)切成若幹塊後分布式存儲在若幹臺datanode上
(5)每一個文件塊可以有多個副本,並存放在不同的datanode上
(6)Datanode會定期向Namenode匯報自身所保存的文件block信息,而namenode則會負責保持文件的副本數量
(7)HDFS的內部工作機制對客戶端保持透明,客戶端請求訪問HDFS都是通過向namenode申請來進行
2.HDFS寫數據流程
(1)客戶端要向HDFS寫數據,首先要跟namenode通信以確認可以寫文件並獲得接收文件block的datanode,然後,客戶端按順序將文件逐個block傳遞給相應datanode,並由接收到block的datanode負責向其他datanode復制block的副本
(2)詳細步驟解析
1.根namenode通信請求上傳文件,namenode檢查目標文件是否已存在,父目錄是否存在
2.namenode返回是否可以上傳
3.client請求第一個 block該傳輸到哪些datanode服務器上
4.namenode返回3個datanode服務器ABC
5.client請求3臺dn中的一臺A上傳數據(本質上是一個RPC調用,建立pipeline),A收到請求會繼續調用B,然後B調用C,將真個pipeline建立完成,逐級返回客戶端
6.client開始往A上傳第一個block(先從磁盤讀取數據放到一個本地內存緩存),以packet為單位,A收到一個packet就會傳給B,B傳給C;A每傳一個packet會放入一個應答隊列等待應答
7.當一個block傳輸完成之後,client再次請求namenode上傳第二個block的服務器。
3.HDFS讀數據流程
(1)客戶端將要讀取的文件路徑發送給namenode,namenode獲取文件的元信息(主要是block的存放位置信息)返回給客戶端,客戶端根據返回的信息找到相應datanode逐個獲取文件的block並在客戶端本地進行數據追加合並從而獲得整個文件
(2)詳細步驟解析
1.跟namenode通信查詢元數據,找到文件塊所在的datanode服務器
2.挑選一臺datanode(就近原則,然後隨機)服務器,請求建立socket流
3.datanode開始發送數據(從磁盤裏面讀取數據放入流,以packet為單位來做校驗)
4.客戶端以packet為單位接收,現在本地緩存,然後寫入目標文件
四.Namenode工作機制
1.Namenode工作職責:負責客戶端請求的響應,元數據的管理(查詢,修改)
2.元數據管理 : namenode對數據的管理采用了三種存儲形式:內存元數據(NameSystem) 磁盤元數據鏡像文件數據操作日誌文件(可通過日誌運算出元數據)
3.元數據存儲機制:
A、內存中有一份完整的元數據(內存meta data)
B、磁盤有一個“準完整”的元數據鏡像(fsimage)文件(在namenode的工作目錄中)
C、用於銜接內存metadata和持久化元數據鏡像fsimage之間的操作日誌(edits文件)註:當客戶端對hdfs中的文件進行新增或者修改操作,操作記錄首先被記入edits日誌文件中,當客戶端操作成功後,相應的元數據會更新到內存meta.data中
4.元數據的checkpoint:每隔一段時間,會由secondary namenode將namenode上積累的所有edits和一個最新的fsimage下載到本地,並加載到內存進行merge(這個過程稱為checkpoint)
checkpoint操作的觸發條件配置參數:
dfs.namenode.checkpoint.check.period=60 #檢查觸發條件是否滿足的頻率,60秒
dfs.namenode.checkpoint.dir=file://${hadoop.tmp.dir}/dfs/namesecondary
#以上兩個參數做checkpoint操作時,secondary namenode的本地工作目錄
dfs.namenode.checkpoint.edits.dir=${dfs.namenode.checkpoint.dir}
dfs.namenode.checkpoint.max-retries=3 #最大重試次數
dfs.namenode.checkpoint.period=3600 #兩次checkpoint之間的時間間隔3600秒
dfs.namenode.checkpoint.txns=1000000 #兩次checkpoint之間最大的操作記錄
checkpoint的附帶作用
namenode和secondary namenode的工作目錄存儲結構完全相同,所以,當namenode故障退出需要重新恢復時,可以從secondary namenode的工作目錄中將fsimage拷貝到namenode的工作目錄,以恢復namenode的元數據
五.Datanode工作機制
1.Datanode工作職責:存儲管理用戶的文件塊數據,定期向namenode匯報自身所持有的block信息(通過心跳信息上報)
<property>
<name>dfs.blockreport.intervalMsec</name>
<value>3600000</value>
<description>Determines block reporting interval in milliseconds.</description>
</property>
2.Datanode掉線判斷時限參數
datanode進程死亡或者網絡故障造成datanode無法與namenode通信,namenode不會立即把該節點判定為死亡,要經過一段時間,這段時間暫稱作超時時長。HDFS默認的超時時長為10分鐘+30秒。如果定義超時時間為timeout,則超時時長的計算公式為:
timeout = 2 * heartbeat.recheck.interval + 10 * dfs.heartbeat.interval。 而默認的heartbeat.recheck.interval 大小為5分鐘,dfs.heartbeat.interval默認為3秒。
需要註意的是hdfs-site.xml 配置文件中的heartbeat.recheck.interval的單位為毫秒,dfs.heartbeat.interval的單位為秒。所以,舉個例子,如果heartbeat.recheck.interval設置為5000(毫秒),dfs.heartbeat.interval設置為3(秒,默認),則總的超 時時間為40秒。
<property>
<name>heartbeat.recheck.interval</name>
<value>2000</value>
</property>
<property>
<name>dfs.heartbeat.interval</name>
<value>1</value>
</property>
六. Hdfs的java操作
1.搭建開發環境
(1)引入依賴
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.1</version>
</dependency>
(2)window下開發的說明 建議在linux下進行hadoop應用的開發,不會存在兼容性問題。如在window上做客戶端應用開發,需要設置以下環境:
A、在windows的某個目錄下解壓一個hadoop的安裝包
B、將安裝包下的lib和bin目錄用對應windows版本平臺編譯的本地庫替換
C、在window系統中配置HADOOP_HOME指向你解壓的安裝包
D、在windows系統的path變量中加入hadoop的bin目錄
2.HDFS客戶端操作數據代碼示例:
(1) 文件的增刪改查
public class HdfsClient {
FileSystem fs = null;
@Before public void init() throws Exception {
// 構造一個配置參數對象,設置一個參數:我們要訪問的hdfs的URI // 從而FileSystem.get()方法就知道應該是去構造一個訪問hdfs文件系統的客戶端,以及hdfs的訪問地址 // new Configuration();的時候,它就會去加載jar包中的hdfs-default.xml // 然後再加載classpath下的hdfs-site.xml Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://hdp-node01:9000"); /** * 參數優先級: 1、客戶端代碼中設置的值 2、classpath下的用戶自定義配置文件 3、然後是服務器的默認配置 */ conf.set("dfs.replication", "3");
// 獲取一個hdfs的訪問客戶端,根據參數,這個實例應該是DistributedFileSystem的實例 // fs = FileSystem.get(conf);
// 如果這樣去獲取,那conf裏面就可以不要配"fs.defaultFS"參數,而且,這個客戶端的身份標識已經是hadoop用戶 fs = FileSystem.get(new URI("hdfs://hdp-node01:9000"), conf, "hadoop");
}
/** * 往hdfs上傳文件 * * @throws Exception */ @Test public void testAddFileToHdfs() throws Exception {
// 要上傳的文件所在的本地路徑 Path src = new Path("g:/redis-recommend.zip"); // 要上傳到hdfs的目標路徑 Path dst = new Path("/aaa"); fs.copyFromLocalFile(src, dst); fs.close(); }
/** * 從hdfs中復制文件到本地文件系統 * * @throws IOException * @throws IllegalArgumentException */ @Test public void testDownloadFileToLocal() throws IllegalArgumentException, IOException { fs.copyToLocalFile(new Path("/jdk-7u65-linux-i586.tar.gz"), new Path("d:/")); fs.close(); }
@Test public void testMkdirAndDeleteAndRename() throws IllegalArgumentException, IOException {
// 創建目錄 fs.mkdirs(new Path("/a1/b1/c1"));
// 刪除文件夾 ,如果是非空文件夾,參數2必須給值true fs.delete(new Path("/aaa"), true);
// 重命名文件或文件夾 fs.rename(new Path("/a1"), new Path("/a2"));
}
/** * 查看目錄信息,只顯示文件 * * @throws IOException * @throws IllegalArgumentException * @throws FileNotFoundException */ @Test public void testListFiles() throws FileNotFoundException, IllegalArgumentException, IOException {
// 思考:為什麽返回叠代器,而不是List之類的容器 RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);
while (listFiles.hasNext()) { LocatedFileStatus fileStatus = listFiles.next(); System.out.println(fileStatus.getPath().getName()); System.out.println(fileStatus.getBlockSize()); System.out.println(fileStatus.getPermission()); System.out.println(fileStatus.getLen()); BlockLocation[] blockLocations = fileStatus.getBlockLocations(); for (BlockLocation bl : blockLocations) { System.out.println("block-length:" + bl.getLength() + "--" + "block-offset:" + bl.getOffset()); String[] hosts = bl.getHosts(); for (String host : hosts) { System.out.println(host); } } System.out.println("--------------為angelababy打印的分割線--------------"); } }
/** * 查看文件及文件夾信息 * * @throws IOException * @throws IllegalArgumentException * @throws FileNotFoundException */ @Test public void testListAll() throws FileNotFoundException, IllegalArgumentException, IOException {
FileStatus[] listStatus = fs.listStatus(new Path("/"));
String flag = "d-- "; for (FileStatus fstatus : listStatus) { if (fstatus.isFile()) flag = "f-- "; System.out.println(flag + fstatus.getPath().getName()); } } } |
(2)通過流的方式訪問hdfs
/** * 相對那些封裝好的方法而言的更底層一些的操作方式 * 上層那些mapreduce spark等運算框架,去hdfs中獲取數據的時候,就是調的這種底層的api * @author * */ public class StreamAccess {
FileSystem fs = null;
@Before public void init() throws Exception {
Configuration conf = new Configuration(); fs = FileSystem.get(new URI("hdfs://hdp-node01:9000"), conf, "hadoop");
}
@Test public void testDownLoadFileToLocal() throws IllegalArgumentException, IOException{
//先獲取一個文件的輸入流----針對hdfs上的 FSDataInputStream in = fs.open(new Path("/jdk-7u65-linux-i586.tar.gz"));
//再構造一個文件的輸出流----針對本地的 FileOutputStream out = new FileOutputStream(new File("c:/jdk.tar.gz"));
//再將輸入流中數據傳輸到輸出流 IOUtils.copyBytes(in, out, 4096);
}
/** * hdfs支持隨機定位進行文件讀取,而且可以方便地讀取指定長度 * 用於上層分布式運算框架並發處理數據 * @throws IllegalArgumentException * @throws IOException */ @Test public void testRandomAccess() throws IllegalArgumentException, IOException{ //先獲取一個文件的輸入流----針對hdfs上的 FSDataInputStream in = fs.open(new Path("/iloveyou.txt"));
//可以將流的起始偏移量進行自定義 in.seek(22);
//再構造一個文件的輸出流----針對本地的 FileOutputStream out = new FileOutputStream(new File("c:/iloveyou.line.2.txt"));
IOUtils.copyBytes(in,out,19L,true);
}
/** * 顯示hdfs上文件的內容 * @throws IOException * @throws IllegalArgumentException */ @Test public void testCat() throws IllegalArgumentException, IOException{
FSDataInputStream in = fs.open(new Path("/iloveyou.txt"));
IOUtils.copyBytes(in, System.out, 1024); } } |
Hdfs詳解