1. 程式人生 > >Hadoop系列008-HDFS的資料流

Hadoop系列008-HDFS的資料流

本人微信公眾號,歡迎掃碼關注!

HDFS的資料流

1 HDFS寫資料流程

1.1 剖析檔案寫入

1)客戶端向namenode請求上傳檔案,namenode檢查目標檔案是否已存在,父目錄是否存在。

2)namenode返回是否可以上傳。

3)客戶端請求第一個 block上傳到哪幾個datanode伺服器上。

4)namenode返回3個datanode節點,分別為dn1、dn2、dn3。

5)客戶端請求dn1上傳資料,dn1收到請求會繼續呼叫dn2,然後dn2呼叫dn3,將這個通訊管道建立完成

6)dn1、dn2、dn3逐級應答客戶端

7)客戶端開始往dn1上傳第一個block(先從磁碟讀取資料放到一個本地記憶體快取),以packet為單位,dn1收到一個packet就會傳給dn2,dn2傳給dn3;dn1每傳一個packet會放入一個應答佇列等待應答

8)當一個block傳輸完成之後,客戶端再次請求namenode上傳第二個block的伺服器。(重複執行3-7步)

1.2 網路拓撲概念

在本地網路中,兩個節點被稱為“彼此近鄰”是什麼意思?在海量資料處理中,其主要限制因素是節點之間資料的傳輸速率——頻寬很稀缺。這裡的想法是將兩個節點間的頻寬作為距離的衡量標準。

節點距離:兩個節點到達最近的共同祖先的距離總和。

例如,假設有資料中心d1機架r1中的節點n1。該節點可以表示為/d1/r1/n1。利用這種標記,這裡給出四種距離描述。

Distance(/d1/r1/n1, /d1/r1/n1)=0(同一節點上的程序)
Distance(/d1/r1/n1, /d1/r1/n2)=2(同一機架上的不同節點)
Distance(/d1/r1/n1, /d1/r3/n2)=4(同一資料中心不同機架上的節點)
Distance(/d1/r1/n1, /d2/r4/n2)=6(不同資料中心的節點)

1.3 機架感知(副本節點選擇)

1.3.1 官方地址

http://hadoop.apache.org/docs/r2.7.3/hadoop-project-dist/hadoop-common/RackAwareness.html

1.3.2 低版本Hadoop複本節點選擇

  • 第一個複本在client所處的節點上。如果客戶端在叢集外,隨機選一個。
  • 第二個複本和第一個複本位於不相同機架的隨機節點上。
  • 第三個複本和第二個複本位於相同機架,節點隨機。
1.3.3 Hadoop2.7.2副本節點選擇

  • 第一個副本在client所處的節點上。如果客戶端在叢集外,隨機選一個。
  • 第二個副本和第一個副本位於相同機架,隨機節點。
  • 第三個副本位於不同機架,隨機節點。
1.3.4 自定義機架感知
  • (0)環境準備

    • (a)資料節點的量

      [rack1]:hadoop102、hadoop103

      [rack2]:hadoop104、hadoop105

    • (b)增加一個數據節點

      (1)克隆一個節點

      (2)啟動新節點

      (3)修改克隆的ip和主機名

      (4)在hadoop102上ssh到新節點

      (5)修改xsync.sh和xcall.sh檔案

      (6)修改hadoop102 slaves檔案,再分發

  • (1)建立類實現DNSToSwitchMapping介面

    public class MyDNSToSwichMapping implements DNSToSwitchMapping {
      // 傳遞的是客戶端的ip列表,返回機架感知的路徑列表
      public List<String> resolve(List<String> names) {
    
          ArrayList<String> lists = new ArrayList<String>();
          if (names != null && names.size() > 0) {
              for (String name : names) {
                  int ip = 0;
                    // 獲取ip地址
                  if (name.startsWith("hadoop")) {
                      String no = name.substring(6);
                      // hadoop102
                      ip = Integer.parseInt(no);
                  } else if (name.startsWith("192")) {
                      // 192.168.10.102
                      ip = Integer.parseInt(name.substring(name.lastIndexOf(".") + 1));
                  }
    
                    // 定義機架
                  if (ip < 104) {
                      lists.add("/rack1/" + ip);
                  } else {
                      lists.add("/rack2/" + ip);
                  }
              }
          }
    
            // 把ip地址打印出來
          try {
              FileOutputStream fos = new FileOutputStream("/home/atguigu/name.txt");
    
              for (String name : lists) {
                  fos.write((name + "\r\n").getBytes());
              }
              fos.close();
          } catch (Exception e) {
              e.printStackTrace();
          }
          return lists;
      }
      public void reloadCachedMappings() {
      }
      public void reloadCachedMappings(List<String> names) {
      }
    }
  • (2)配置core-site.xml

    • 預設的:

      <!-- Topology Configuration -->
      <property>
        <name>net.topology.node.switch.mapping.impl</name>
        <value>org.apache.hadoop.net.ScriptBasedMapping</value>
      </property>
    • 配置後的

      <!-- Topology Configuration -->
      <property>
        <name>net.topology.node.switch.mapping.impl</name>
        <value>com.atguigu.hdfs.MyDNSToSwichMapping</value>
      </property>
  • (3)分發core-site.xml

    xsync /opt/module/hadoop-2.7.2/etc/hadoop/core-site.xml
  • (4)編譯程式,打成jar,分發到所有節點的hadoop的classpath下

    cd /opt/module/hadoop-2.7.2/share/hadoop/common/lib
    xsync MyDNSSwitchToMapping.jar
  • (5)重新啟動叢集

  • (6)在名稱節點hadoop103主機上檢視名稱

  • (7)檢視結果

    • (1)在hadoop105節點上傳檔案到hdfs檔案系統,檢視複本存放位置

    • (2)在hadoop102節點上傳檔案到hdfs檔案系統,檢視複本存放位置

    • (3)結論

      第一個副本在client所處的節點上。如果客戶端在叢集外,隨機選一個。

      第二個副本和第一個副本位於相同機架,隨機節點。

      第三個副本位於不同機架,隨機節點。

2 HDFS讀資料流程

1)客戶端向namenode請求下載檔案,namenode通過查詢元資料,找到檔案塊所在的datanode地址。

2)挑選一臺datanode(就近原則,然後隨機)伺服器,請求讀取資料。

3)datanode開始傳輸資料給客戶端(從磁盤裡面讀取資料放入流,以packet為單位來做校驗)。

4)客戶端以packet為單位接收,先在本地快取,然後寫入目標檔案。

3 一致性模型

3.1 debug除錯如下程式碼

    @Test
    public void writeFile() throws Exception{
        // 1 建立配置資訊物件
        Configuration configuration = new Configuration();
        fs = FileSystem.get(configuration);
        
        // 2 建立檔案輸出流
        Path path = new Path("hdfs://hadoop102:8020/user/atguigu/hello.txt");
        FSDataOutputStream fos = fs.create(path);
        
        // 3 寫資料
        fos.write("hello".getBytes());
//      fos.flush();
        fos.hflush();
//      
//      fos.write("welcome to atguigu".getBytes());
//      fos.hsync();
        
        fos.close();
    }

3.2 總結

  • 寫入資料時,如果希望資料被其他client立即可見,呼叫如下方法
  • FsDataOutputStream.hflus(); //清理客戶端緩衝區資料,被其他client立即可見
  • FsDataOutputStream.hsync(); //清理客戶端緩衝區資料,被其他client不能立即可見