1. 程式人生 > >hadoop實戰(三) 使用HDFS操作檔案

hadoop實戰(三) 使用HDFS操作檔案

一、hadoop簡介

   HADOOP叢集具體來說包含兩個叢集:HDFS叢集和YARN叢集,兩者邏輯上分離,但物理上常在一起
   HDFS叢集:負責海量資料的儲存,叢集中的角色主要有 NameNode / DataNode
   YARN叢集:負責海量資料運算時的資源排程,叢集中的角色主要有 ResourceManager /NodeManager

二、hdfs的工作機制

(一)、概述

   1. HDFS叢集分為兩大角色:NameNode、DataNode
   2. NameNode負責管理整個檔案系統的元資料
   3. DataNode 負責管理使用者的檔案資料塊


   4. 檔案會按照固定的大小(blocksize)切成若干塊後分布式儲存在若干臺datanode上
   5. 每一個檔案塊可以有多個副本,並存放在不同的datanode上
   6. Datanode會定期向Namenode彙報自身所儲存的檔案block資訊,而namenode則會負責保持檔案的副本數量
   7. HDFS的內部工作機制對客戶端保持透明,客戶端請求訪問HDFS都是通過向namenode申請來進行

(二)、HDFS寫資料流程

1、概述

   客戶端要向HDFS寫資料,首先要跟namenode通訊以確認可以寫檔案並獲得接收檔案block的datanode,然後,客戶端按順序將檔案逐個block傳遞給相應datanode,並由接收到block的datanode負責向其他datanode複製block的副本

2、詳細步驟解析

   1、根namenode通訊請求上傳檔案,namenode檢查目標檔案是否已存在,父目錄是否存在

   2、namenode返回是否可以上傳
   3、client請求第一個 block該傳輸到哪些datanode伺服器上
   4、namenode返回3個datanode伺服器ABC
   5、client請求3臺dn中的一臺A上傳資料(本質上是一個RPC呼叫,建立pipeline),A收到請求會繼續呼叫B,然後B呼叫C,將整個pipeline建立完成,逐級返回客戶端
   6、client開始往A上傳第一個block(先從磁碟讀取資料放到一個本地記憶體快取),以packet為單位,A收到一個packet就會傳給B,B傳給C;A每傳一個packet會放入一個應答佇列等待應答


   7、當一個block傳輸完成之後,client再次請求namenode上傳第二個block的伺服器。

(三)、HDFS讀資料流程

1、概述

   客戶端將要讀取的檔案路徑傳送給namenode,namenode獲取檔案的元資訊(主要是block的存放位置資訊)返回給客戶端,客戶端根據返回的資訊找到相應datanode逐個獲取檔案的block並在客戶端本地進行資料追加合併從而獲得整個檔案

2、詳細步驟解析

   1、跟namenode通訊查詢元資料,找到檔案塊所在的datanode伺服器

   2、挑選一臺datanode(就近原則,然後隨機)伺服器,請求建立socket流
   3、datanode開始傳送資料(從磁盤裡面讀取資料放入流,以packet為單位來做校驗)
   4、客戶端以packet為單位接收,現在本地快取,然後寫入目標檔案

(四)、NAMENODE工作機制

1、NAMENODE職責

   負責客戶端請求的響應
   元資料的管理(查詢,修改)

2、元資料管理

   namenode對資料的管理採用了三種儲存形式:

   (1)記憶體元資料(NameSystem)
   (2)磁碟元資料映象檔案
   (3)資料操作日誌檔案(可通過日誌運算出元資料)

(1)元資料儲存機制

   A、記憶體中有一份完整的元資料(記憶體meta data)    B、磁碟有一個“準完整”的元資料映象(fsimage)檔案(在namenode的工作目錄中)    C、用於銜接記憶體metadata和持久化元資料映象fsimage之間的操作日誌(edits檔案)注:當客戶端對hdfs中的檔案進行新增或者修改操作,操作記錄首先被記入edits日誌檔案中,當客戶端操作成功後,相應的元資料會更新到記憶體meta.data中

(2)元資料手動檢視

   可以通過hdfs的一個工具來檢視edits中的資訊

bin/hdfs oev -i edits -o edits.xml
bin/hdfs oiv -i fsimage_0000000000000000087 -p XML -o fsimage.xml

(3)元資料的checkpoint

   每隔一段時間,會由secondary namenode將namenode上積累的所有edits和一個最新的fsimage下載到本地,並載入到記憶體進行merge(這個過程稱為checkpoint) ##### checkpoint操作的觸發條件配置引數:
dfs.namenode.checkpoint.check.period=60  #檢查觸發條件是否滿足的頻率,60秒
dfs.namenode.checkpoint.dir=file://${hadoop.tmp.dir}/dfs/namesecondary
#以上兩個引數做checkpoint操作時,secondary namenode的本地工作目錄
dfs.namenode.checkpoint.edits.dir=${dfs.namenode.checkpoint.dir}

dfs.namenode.checkpoint.max-retries=3  #最大重試次數
dfs.namenode.checkpoint.period=3600  #兩次checkpoint之間的時間間隔3600秒
dfs.namenode.checkpoint.txns=1000000 #兩次checkpoint之間最大的操作記錄
checkpoint的附帶作用:
   namenode和secondary namenode的工作目錄儲存結構完全相同,所以,當namenode故障退出需要重新恢復時,可以從secondary namenode的工作目錄中將fsimage拷貝到namenode的工作目錄,以恢復namenode的元資料

(五)、DATANODE的工作機制

1、概述

(1)Datanode工作職責:

   儲存管理使用者的檔案塊資料
   定期向namenode彙報自身所持有的block資訊(通過心跳資訊上報)(這點很重要,因為,當叢集中發生某些block副本失效時,叢集如何恢復block初始副本數量的問題)

<property>
	<name>dfs.blockreport.intervalMsec</name>
	<value>3600000</value>
	<description>Determines block reporting interval in milliseconds.</description>
</property>

(2)Datanode掉線判斷時限引數:

   datanode程序死亡或者網路故障造成datanode無法與namenode通訊,namenode不會立即把該節點判定為死亡,要經過一段時間,這段時間暫稱作超時時長。HDFS預設的超時時長為10分鐘+30秒。如果定義超時時間為timeout,則超時時長的計算公式為:
timeout  = 2 * heartbeat.recheck.interval + 10 * dfs.heartbeat.interval。
而預設的heartbeat.recheck.interval 大小為5分鐘,dfs.heartbeat.interval預設為3秒。
   需要注意的是hdfs-site.xml 配置檔案中的heartbeat.recheck.interval的單位為毫秒,dfs.heartbeat.interval的單位為秒。所以,舉個例子,如果heartbeat.recheck.interval設定為5000(毫秒),dfs.heartbeat.interval設定為3(秒,預設),則總的超時時間為40秒。
<property>
        <name>heartbeat.recheck.interval</name>
        <value>2000</value>
</property>
<property>
        <name>dfs.heartbeat.interval</name>
        <value>1</value>
</property>

(六)、HDFS的java操作

   hdfs在生產應用中主要是客戶端的開發,其核心步驟是從hdfs提供的api中構造一個HDFS的訪問客戶端物件,然後通過該客戶端物件操作(增刪改查)HDFS上的檔案
   首先是依賴:

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.6.1</version>
        </dependency>
        <!--<dependency>-->
            <!--<groupId>org.springframework.boot</groupId>-->
            <!--<artifactId>spring-boot-starter-test</artifactId>-->
            <!--<scope>test</scope>-->
        <!--</dependency>-->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.10</version>
        </dependency>
   常見檔案操作:
package com.hadoop.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.junit.Before;
import org.junit.Test;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;

public class HdfsClient {

FileSystem fs = null;

@Before
public void init() throws Exception {
    // 構造一個配置引數物件,設定一個引數:我們要訪問的hdfs的URI
    // 從而FileSystem.get()方法就知道應該是去構造一個訪問hdfs檔案系統的客戶端,以及hdfs的訪問地址
    // new Configuration();的時候,它就會去載入jar包中的hdfs-default.xml
    // 然後再載入classpath下的hdfs-site.xml
    Configuration conf = new Configuration();
    conf.set("fs.defaultFS", "hdfs://mini1:9000");
    /**
     * 引數優先順序: 1、客戶端程式碼中設定的值 2、classpath下的使用者自定義配置檔案 3、然後是伺服器的預設配置
     */
    conf.set("dfs.replication", "3");
    // 獲取一個hdfs的訪問客戶端,根據引數,這個例項應該是DistributedFileSystem的例項
    // fs = FileSystem.get(conf);

    // 如果這樣去獲取,那conf裡面就可以不要配"fs.defaultFS"引數,而且,這個客戶端的身份標識已經是hadoop使用者
    fs = FileSystem.get(new URI("hdfs://mini1:9000"), conf, "root");


}
/**
 * 往hdfs上傳檔案
 *
 * @throws Exception
 */
@Test
public void testAddFileToHdfs() throws Exception {

    // 要上傳的檔案所在的本地路徑
    Path src = new Path("/home/youjin/ruanjian/apache-tomcat-8.0.53.tar.gz");
    // 要上傳到hdfs的目標路徑
    Path dst = new Path("/apache-tomcat-8.0.53.tar.gz");
    fs.copyFromLocalFile(src, dst);
    fs.close();
}
/**
 * 從hdfs中複製檔案到本地檔案系統
 *
 * @throws IOException
 * @throws IllegalArgumentException
 */
@Test
public void testDownloadFileToLocal() throws IllegalArgumentException, IOException {
    fs.copyToLocalFile(new Path("/aaa"), new Path("/home/youjin/test"));
    fs.close();
}

@Test
public void testMkdirAndDeleteAndRename() throws IllegalArgumentException, IOException {

    // 建立目錄
    fs.mkdirs(new Path("/a1/b1/c1"));

    // 刪除資料夾 ,如果是非空資料夾,引數2必須給值true
    fs.delete(new Path("/aaa"), true);

    // 重新命名檔案或資料夾
    fs.rename(new Path("/a1"), new Path("/a2"));

}

/**
 * 檢視目錄資訊,只顯示檔案
 *
 * @throws IOException
 * @throws IllegalArgumentException
 * @throws FileNotFoundException
 */
@Test
public void testListFiles() throws FileNotFoundException, IllegalArgumentException, IOException {

    // 思考:為什麼返回迭代器,而不是List之類的容器
    RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);

    while (listFiles.hasNext()) {
        LocatedFileStatus fileStatus = listFiles.next();
        System.out.println(fileStatus.getPath().getName());
        System.out.println(fileStatus.getBlockSize());
        System.out.println(fileStatus.getPermission());
        System.out.println(fileStatus.getLen());
        BlockLocation[] blockLocations = fileStatus.getBlockLocations();
        for (BlockLocation bl : blockLocations) {
            System.out.println("block-length:" + bl.getLength() + "--" + "block-offset:" + bl.getOffset());
            String[] hosts = bl.getHosts();
            for (String host : hosts) {
                System.out.println(host);
            }
        }
        System.out.println("--------------為angelababy列印的分割線--------------");
    }
}


/**
 * 檢視檔案及資料夾資訊
 *
 * @throws IOException
 * @throws IllegalArgumentException
 * @throws FileNotFoundException
 */
@Test
public void testListAll() throws FileNotFoundException, IllegalArgumentException, IOException {

    FileStatus[] listStatus = fs.listStatus(new Path("/"));

    String flag = "d--             ";
    for (FileStatus fstatus : listStatus) {
        if (fstatus.isFile())  flag = "f--         ";
        System.out.println(flag + fstatus.getPath().getName());
    }
}
}
   通過流的方式訪問hdfs:
package com.hadoop.hdfs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IOUtils;
import org.junit.Before;
import org.junit.Test;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;

public class StreamAccess {
    FileSystem fs = null;

@Before
public void init() throws Exception {

    Configuration conf = new Configuration();
    fs = FileSystem.get(new URI("hdfs://mini1:9000"), conf, "root");

}
@Test
public void testDownLoadFileToLocal() throws IllegalArgumentException, IOException {

    //先獲取一個檔案的輸入流----針對hdfs上的
    FSDataInputStream in = fs.open(new Path("/apache-tomcat-8.0.53.tar.gz"));

    //再構造一個檔案的輸出流----針對本地的
    FileOutputStream out = new FileOutputStream(new File("/home/youjin/test/apache-tomcat-8.0.53.tar.gz"));

    //再將輸入流中資料傳輸到輸出流
    IOUtils.copyBytes(in, out, 4096);


}

/**
 * hdfs支援隨機定位進行檔案讀取,而且可以方便地讀取指定長度
 * 用於上層分散式運算框架併發處理資料
 * @throws IllegalArgumentException
 * @throws IOException
 */
@Test
public void testRandomAccess() throws IllegalArgumentException, IOException{
    //先獲取一個檔案的輸入流----針對hdfs上的
    FSDataInputStream in = fs.open(new Path("/iloveyou.txt"));


    //可以將流的起始偏移量進行自定義
    in.seek(22);

    //再構造一個檔案的輸出流----針對本地的
    FileOutputStream out = new FileOutputStream(new File("/home/youjin/test/iloveyou.line.2.txt"));

    IOUtils.copyBytes(in,out,19L,true);

}



/**
 * 顯示hdfs上檔案的內容
 * @throws IOException
 * @throws IllegalArgumentException
 */
@Test
public void testCat() throws IllegalArgumentException, IOException{

    FSDataInputStream in = fs.open(new Path("/iloveyou.txt"));

    IOUtils.copyBytes(in, System.out, 1024);
}

/**
 * 獲取一個檔案的所有block位置資訊,然後讀取指定block中的內容
 * @throws IllegalArgumentException
 * @throws IOException
 */

@Test
public void testCat1() throws IllegalArgumentException, IOException{

    FSDataInputStream in = fs.open(new Path("/wordcount/input/somewords.txt"));
    //拿到檔案資訊
    FileStatus[] listStatus = fs.listStatus(new Path("/wordcount/input/somewords.txt"));
    //獲取這個檔案的所有block的資訊
    BlockLocation[] fileBlockLocations = fs.getFileBlockLocations(listStatus[0], 0L, listStatus[0].getLen());
    //第一個block的長度
    long length = fileBlockLocations[0].getLength();
    //第一個block的起始偏移量
    long offset = fileBlockLocations[0].getOffset();

    System.out.println(length);
    System.out.println(offset);

    //獲取第一個block寫入輸出流
//		IOUtils.copyBytes(in, System.out, (int)length);
    byte[] b = new byte[4096];

    FileOutputStream os = new FileOutputStream(new File("/home/youjin/test/block0"));
    while(in.read(offset, b, 0, 4096)!=-1){
        os.write(b);
        offset += 4096;
        if(offset>=length) return;
    };
    os.flush();
    os.close();
    in.close();
}
}