1. 程式人生 > >Hdfs詳解

Hdfs詳解

合並 數據 com map 統計文件 leg reduce ilo 隨機

一.Hdfs簡介

  hdfs是一個文件系統,用於存儲文件,通過統一的命名空間——目錄樹來定位文件,並且是分布式的,由很多服務器聯合起來實現其功能,集群中的服務器各自負責角色;

  重要特征:

  1.HDFS中的文件在物理上是分塊存儲(block),塊的大小可以通過配置參數( dfs.blocksize)來規定,默認大小在hadoop2.x版本中是128M,老版本中是64M

  2.HDFS文件系統會給客戶端提供一個統一的抽象目錄樹,客戶端通過路徑來訪問文件,形如:hdfs://namenode:port/dir-a/dir-b/dir-c/file.data

  3.目錄結構及文件分塊信息(元數據)的管理由namenode節點承擔——namenode是HDFS集群主節點,負責維護整個hdfs文件系統的目錄樹,以及每一個路徑(文件)所對應的block塊信息(block的id,及所在的datanode服務器)

4.文件的各個block的存儲管理由datanode節點承擔---- datanode是HDFS集群從節點,每一個block都可以在多個datanode上存儲多個副本(副本數量也可以通過參數設置dfs.replication)

5.HDFS是設計成適應一次寫入,多次讀出的場景,且不支持文件的修改

二.Hdfs的shell(命令行客戶端)操作

  Hdfs提供shell命令行客戶端,使用方法如下:

技術分享

常用命令參數介紹:

-help

功能:輸出這個命令參數手冊

-ls

功能:顯示目錄信息

示例: hadoop fs -ls hdfs://hadoop-server01:9000/

備註:這些參數中,所有的hdfs路徑都可以簡寫

-->hadoop fs -ls / 等同於上一條命令的效果

-mkdir

功能:在hdfs上創建目錄

示例:hadoop fs -mkdir -p /aaa/bbb/cc/dd

-moveFromLocal

功能:從本地剪切粘貼到hdfs

示例:hadoop fs - moveFromLocal /home/hadoop/a.txt /aaa/bbb/cc/dd

-moveToLocal

功能:從hdfs剪切粘貼到本地

示例:hadoop fs - moveToLocal /aaa/bbb/cc/dd /home/hadoop/a.txt

--appendToFile

功能:追加一個文件到已經存在的文件末尾

示例:hadoop fs -appendToFile ./hello.txt hdfs://hadoop-server01:9000/hello.txt

可以簡寫為:

Hadoop fs -appendToFile ./hello.txt /hello.txt

-cat

功能:顯示文件內容

示例:hadoop fs -cat /hello.txt

-tail

功能:顯示一個文件的末尾

示例:hadoop fs -tail /weblog/access_log.1

-text

功能:以字符形式打印一個文件的內容

示例:hadoop fs -text /weblog/access_log.1

-chgrp

-chmod

-chown

功能:linux文件系統中的用法一樣,對文件所屬權限

示例:

hadoop fs -chmod 666 /hello.txt

hadoop fs -chown someuser:somegrp /hello.txt

-copyFromLocal

功能:從本地文件系統中拷貝文件到hdfs路徑去

示例:hadoop fs -copyFromLocal ./jdk.tar.gz /aaa/

-copyToLocal

功能:從hdfs拷貝到本地

示例:hadoop fs -copyToLocal /aaa/jdk.tar.gz

-cp

功能:從hdfs的一個路徑拷貝hdfs的另一個路徑

示例: hadoop fs -cp /aaa/jdk.tar.gz /bbb/jdk.tar.gz.2

-mv

功能:在hdfs目錄中移動文件

示例: hadoop fs -mv /aaa/jdk.tar.gz /

-get

功能:等同於copyToLocal,就是從hdfs下載文件到本地

示例:hadoop fs -get /aaa/jdk.tar.gz

-getmerge

功能:合並下載多個文件

示例:比如hdfs的目錄 /aaa/下有多個文件:log.1, log.2,log.3,...

hadoop fs -getmerge /aaa/log.* ./log.sum

-put

功能:等同於copyFromLocal

示例:hadoop fs -put /aaa/jdk.tar.gz /bbb/jdk.tar.gz.2

-rm

功能:刪除文件或文件夾

示例:hadoop fs -rm -r /aaa/bbb/

-rmdir

功能:刪除空目錄

示例:hadoop fs -rmdir /aaa/bbb/ccc

-df

功能:統計文件系統的可用空間信息

示例:hadoop fs -df -h /

-du

功能:統計文件夾的大小信息

示例:

hadoop fs -du -s -h /aaa/*

-count

功能:統計一個指定目錄下的文件節點數量

示例:hadoop fs -count /aaa/

-setrep

功能:設置hdfs中文件的副本數量

示例:hadoop fs -setrep 3 /aaa/jdk.tar.gz

三.Hdfs的工作機制

  1.概述

    (1)HDFS集群分為兩大角色:NameNode、DataNode

    (2)NameNode負責管理整個文件系統的元數據

    (3)DataNode 負責管理用戶的文件數據塊

    (4)文件會按照固定的大小(blocksize)切成若幹塊後分布式存儲在若幹臺datanode上

    (5)每一個文件塊可以有多個副本,並存放在不同的datanode上

    (6)Datanode會定期向Namenode匯報自身所保存的文件block信息,而namenode則會負責保持文件的副本數量

    (7)HDFS的內部工作機制對客戶端保持透明,客戶端請求訪問HDFS都是通過向namenode申請來進行

  2.HDFS寫數據流程

    (1)客戶端要向HDFS寫數據,首先要跟namenode通信以確認可以寫文件並獲得接收文件block的datanode,然後,客戶端按順序將文件逐個block傳遞給相應datanode,並由接收到block的datanode負責向其他datanode復制block的副本

    技術分享

  (2)詳細步驟解析

    1.根namenode通信請求上傳文件,namenode檢查目標文件是否已存在,父目錄是否存在

    2.namenode返回是否可以上傳

    3.client請求第一個 block該傳輸到哪些datanode服務器上

    4.namenode返回3個datanode服務器ABC

    5.client請求3臺dn中的一臺A上傳數據(本質上是一個RPC調用,建立pipeline),A收到請求會繼續調用B,然後B調用C,將真個pipeline建立完成,逐級返回客戶端

    6.client開始往A上傳第一個block(先從磁盤讀取數據放到一個本地內存緩存),以packet為單位,A收到一個packet就會傳給B,B傳給C;A每傳一個packet會放入一個應答隊列等待應答

    7.當一個block傳輸完成之後,client再次請求namenode上傳第二個block的服務器。

   3.HDFS讀數據流程

    (1)客戶端將要讀取的文件路徑發送給namenode,namenode獲取文件的元信息(主要是block的存放位置信息)返回給客戶端,客戶端根據返回的信息找到相應datanode逐個獲取文件的block並在客戶端本地進行數據追加合並從而獲得整個文件

    技術分享

    (2)詳細步驟解析

      1.跟namenode通信查詢元數據,找到文件塊所在的datanode服務器

      2.挑選一臺datanode(就近原則,然後隨機)服務器,請求建立socket流

      3.datanode開始發送數據(從磁盤裏面讀取數據放入流,以packet為單位來做校驗)

      4.客戶端以packet為單位接收,現在本地緩存,然後寫入目標文件

四.Namenode工作機制

  1.Namenode工作職責:負責客戶端請求的響應,元數據的管理(查詢,修改)

  2.元數據管理 : namenode對數據的管理采用了三種存儲形式:內存元數據(NameSystem) 磁盤元數據鏡像文件數據操作日誌文件(可通過日誌運算出元數據)

  3.元數據存儲機制:

          A、內存中有一份完整的元數據(內存meta data)

          B、磁盤有一個“準完整”的元數據鏡像(fsimage)文件(在namenode的工作目錄中)

          C、用於銜接內存metadata和持久化元數據鏡像fsimage之間的操作日誌(edits文件註:當客戶端對hdfs中的文件進行新增或者修改操作,操作記錄首先被記入edits日誌文件中,當客戶端操作成功後,相應的元數據會更新到內存meta.data

  4.元數據的checkpoint:每隔一段時間,會由secondary namenode將namenode上積累的所有edits和一個最新的fsimage下載到本地,並加載到內存進行merge(這個過程稱為checkpoint)

    技術分享

    checkpoint操作的觸發條件配置參數:

      dfs.namenode.checkpoint.check.period=60 #檢查觸發條件是否滿足的頻率,60秒

      dfs.namenode.checkpoint.dir=file://${hadoop.tmp.dir}/dfs/namesecondary

      #以上兩個參數做checkpoint操作時,secondary namenode的本地工作目錄

      dfs.namenode.checkpoint.edits.dir=${dfs.namenode.checkpoint.dir}

      dfs.namenode.checkpoint.max-retries=3 #最大重試次數

      dfs.namenode.checkpoint.period=3600 #兩次checkpoint之間的時間間隔3600秒

      dfs.namenode.checkpoint.txns=1000000 #兩次checkpoint之間最大的操作記錄

    checkpoint的附帶作用

      namenode和secondary namenode的工作目錄存儲結構完全相同,所以,當namenode故障退出需要重新恢復時,可以從secondary namenode的工作目錄中將fsimage拷貝到namenode的工作目錄,以恢復namenode的元數據

五.Datanode工作機制

  1.Datanode工作職責:存儲管理用戶的文件塊數據,定期向namenode匯報自身所持有的block信息(通過心跳信息上報)

    <property>

   <name>dfs.blockreport.intervalMsec</name>

   <value>3600000</value>

  <description>Determines block reporting interval in milliseconds.</description>

  </property>

  2.Datanode掉線判斷時限參數

   datanode進程死亡或者網絡故障造成datanode無法與namenode通信,namenode不會立即把該節點判定為死亡,要經過一段時間,這段時間暫稱作超時時長。HDFS默認的超時時長為10分鐘+30秒。如果定義超時時間為timeout,則超時時長的計算公式為:

timeout = 2 * heartbeat.recheck.interval + 10 * dfs.heartbeat.interval。 而默認的heartbeat.recheck.interval 大小為5分鐘,dfs.heartbeat.interval默認為3秒。

需要註意的是hdfs-site.xml 配置文件中的heartbeat.recheck.interval的單位為毫秒,dfs.heartbeat.interval的單位為秒。所以,舉個例子,如果heartbeat.recheck.interval設置為5000(毫秒),dfs.heartbeat.interval設置為3(秒,默認),則總的超 時時間為40秒。

  <property>

    <name>heartbeat.recheck.interval</name>

   <value>2000</value>

  </property>

  <property>

   <name>dfs.heartbeat.interval</name>

  <value>1</value>

  </property>

六. Hdfs的java操作

  1.搭建開發環境

    (1)引入依賴

      <dependency>

      <groupId>org.apache.hadoop</groupId>

      <artifactId>hadoop-client</artifactId>

      <version>2.6.1</version>

      </dependency>

    (2)window下開發的說明 建議在linux下進行hadoop應用的開發,不會存在兼容性問題。如在window上做客戶端應用開發,需要設置以下環境:

      A、在windows的某個目錄下解壓一個hadoop的安裝包

      B、將安裝包下的lib和bin目錄用對應windows版本平臺編譯的本地庫替換

      C、在window系統中配置HADOOP_HOME指向你解壓的安裝包

      D、在windows系統的path變量中加入hadoop的bin目錄

  2.HDFS客戶端操作數據代碼示例:

    (1) 文件的增刪改查

public class HdfsClient {

FileSystem fs = null;

@Before

public void init() throws Exception {

// 構造一個配置參數對象,設置一個參數:我們要訪問的hdfs的URI

// 從而FileSystem.get()方法就知道應該是去構造一個訪問hdfs文件系統的客戶端,以及hdfs的訪問地址

// new Configuration();的時候,它就會去加載jar包中的hdfs-default.xml

// 然後再加載classpath下的hdfs-site.xml

Configuration conf = new Configuration();

conf.set("fs.defaultFS", "hdfs://hdp-node01:9000");

/**

* 參數優先級: 1、客戶端代碼中設置的值 2、classpath下的用戶自定義配置文件 3、然後是服務器的默認配置

*/

conf.set("dfs.replication", "3");

// 獲取一個hdfs的訪問客戶端,根據參數,這個實例應該是DistributedFileSystem的實例

// fs = FileSystem.get(conf);

// 如果這樣去獲取,那conf裏面就可以不要配"fs.defaultFS"參數,而且,這個客戶端的身份標識已經是hadoop用戶

fs = FileSystem.get(new URI("hdfs://hdp-node01:9000"), conf, "hadoop");

}

/**

* 往hdfs上傳文件

*

* @throws Exception

*/

@Test

public void testAddFileToHdfs() throws Exception {

// 要上傳的文件所在的本地路徑

Path src = new Path("g:/redis-recommend.zip");

// 要上傳到hdfs的目標路徑

Path dst = new Path("/aaa");

fs.copyFromLocalFile(src, dst);

fs.close();

}

/**

* 從hdfs中復制文件到本地文件系統

*

* @throws IOException

* @throws IllegalArgumentException

*/

@Test

public void testDownloadFileToLocal() throws IllegalArgumentException, IOException {

fs.copyToLocalFile(new Path("/jdk-7u65-linux-i586.tar.gz"), new Path("d:/"));

fs.close();

}

@Test

public void testMkdirAndDeleteAndRename() throws IllegalArgumentException, IOException {

// 創建目錄

fs.mkdirs(new Path("/a1/b1/c1"));

// 刪除文件夾 ,如果是非空文件夾,參數2必須給值true

fs.delete(new Path("/aaa"), true);

// 重命名文件或文件夾

fs.rename(new Path("/a1"), new Path("/a2"));

}

/**

* 查看目錄信息,只顯示文件

*

* @throws IOException

* @throws IllegalArgumentException

* @throws FileNotFoundException

*/

@Test

public void testListFiles() throws FileNotFoundException, IllegalArgumentException, IOException {

// 思考:為什麽返回叠代器,而不是List之類的容器

RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);

while (listFiles.hasNext()) {

LocatedFileStatus fileStatus = listFiles.next();

System.out.println(fileStatus.getPath().getName());

System.out.println(fileStatus.getBlockSize());

System.out.println(fileStatus.getPermission());

System.out.println(fileStatus.getLen());

BlockLocation[] blockLocations = fileStatus.getBlockLocations();

for (BlockLocation bl : blockLocations) {

System.out.println("block-length:" + bl.getLength() + "--" + "block-offset:" + bl.getOffset());

String[] hosts = bl.getHosts();

for (String host : hosts) {

System.out.println(host);

}

}

System.out.println("--------------為angelababy打印的分割線--------------");

}

}

/**

* 查看文件及文件夾信息

*

* @throws IOException

* @throws IllegalArgumentException

* @throws FileNotFoundException

*/

@Test

public void testListAll() throws FileNotFoundException, IllegalArgumentException, IOException {

FileStatus[] listStatus = fs.listStatus(new Path("/"));

String flag = "d-- ";

for (FileStatus fstatus : listStatus) {

if (fstatus.isFile()) flag = "f-- ";

System.out.println(flag + fstatus.getPath().getName());

}

}

}

  (2)通過流的方式訪問hdfs

    

/**

* 相對那些封裝好的方法而言的更底層一些的操作方式

* 上層那些mapreduce spark等運算框架,去hdfs中獲取數據的時候,就是調的這種底層的api

* @author

*

*/

public class StreamAccess {

FileSystem fs = null;

@Before

public void init() throws Exception {

Configuration conf = new Configuration();

fs = FileSystem.get(new URI("hdfs://hdp-node01:9000"), conf, "hadoop");

}

@Test

public void testDownLoadFileToLocal() throws IllegalArgumentException, IOException{

//先獲取一個文件的輸入流----針對hdfs上的

FSDataInputStream in = fs.open(new Path("/jdk-7u65-linux-i586.tar.gz"));

//再構造一個文件的輸出流----針對本地的

FileOutputStream out = new FileOutputStream(new File("c:/jdk.tar.gz"));

//再將輸入流中數據傳輸到輸出流

IOUtils.copyBytes(in, out, 4096);

}

/**

* hdfs支持隨機定位進行文件讀取,而且可以方便地讀取指定長度

* 用於上層分布式運算框架並發處理數據

* @throws IllegalArgumentException

* @throws IOException

*/

@Test

public void testRandomAccess() throws IllegalArgumentException, IOException{

//先獲取一個文件的輸入流----針對hdfs上的

FSDataInputStream in = fs.open(new Path("/iloveyou.txt"));

//可以將流的起始偏移量進行自定義

in.seek(22);

//再構造一個文件的輸出流----針對本地的

FileOutputStream out = new FileOutputStream(new File("c:/iloveyou.line.2.txt"));

IOUtils.copyBytes(in,out,19L,true);

}

/**

* 顯示hdfs上文件的內容

* @throws IOException

* @throws IllegalArgumentException

*/

@Test

public void testCat() throws IllegalArgumentException, IOException{

FSDataInputStream in = fs.open(new Path("/iloveyou.txt"));

IOUtils.copyBytes(in, System.out, 1024);

}

}

Hdfs詳解