1. 程式人生 > >讀HDFS書筆記---5.1 DFSClient實現

讀HDFS書筆記---5.1 DFSClient實現

5.1   DFSClient實現

        5.1.1 構造方法

        5.1.2 關閉方法

        5.1.3 檔案系統管理與配置方法

        5.1.4 HDFS檔案與目錄操作方法

        5.1.5 HDFS檔案讀寫方法

 

5.1 DFSClient實現

HDFS目前提供了三個客戶端介面,這三個介面分別是DistributedFileSystem、FsShell和DFSAdmin。

DistributedFileSystem:用來給使用者開發基於HDFS的應用程式提供API

FsShell:使用者通過shell命令執行常見的檔案系統操作,呼叫的就是該介面,例如建立檔案、刪除檔案、建立目錄等

DFSAdmin:系統管理員管理HDFS的工具,例如執行升級、管理安全模式等操作。

DFSClient類實現了分散式系統客戶端功能,是使用者使用HDFS各項功能的起點。DFSClient會連線到HDFS,對外提供管理檔案/目錄、讀寫檔案以及管理與配置HDFS系統等功能。

對於管理檔案/目錄以及管理與配置HDFS系統這兩個功能,DFSClient不需要與Datanode進行互動,而是直接通過遠端介面ClientProtocol呼叫Namenode提供的服務即可。而檔案的讀寫功能,除了呼叫ClientProtocol與Namenode互動外,還需要通過流式介面DataTransferProtocol與Datanode互動傳輸資料。

 

DFSClient提供的介面方法有以下幾類:

(1)、DFSClient的構造方法和關閉方法

(2)、管理與配置檔案系統相關方法

(3)、操作HDFS檔案與目錄方法

(4)、讀寫HDFS檔案方法

 

5.1.1 構造方法

有5個建構函式,其中引數為Configuration的建構函式已經被廢棄,其他4個建構函式都呼叫了最後一個建構函式,如下圖

最後一個建構函式的程式碼如下:

/** 
   * Create a new DFSClient connected to the given nameNodeUri or rpcNamenode.
   * If HA is enabled and a positive value is set for 
   * {@link DFSConfigKeys#DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY} in the
   * configuration, the DFSClient will use {@link LossyRetryInvocationHandler}
   * as its RetryInvocationHandler. Otherwise one of nameNodeUri or rpcNamenode 
   * must be null.
   */
  @VisibleForTesting
  public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,
      Configuration conf, FileSystem.Statistics stats)
    throws IOException {
    // Copy only the required DFSClient configuration
    this.dfsClientConf = new Conf(conf);
    if (this.dfsClientConf.useLegacyBlockReaderLocal) {
      LOG.debug("Using legacy short-circuit local reads.");
    }
    this.conf = conf;//HDFS配置資訊
    this.stats = stats;//Client狀態統計資訊,包括Client讀、寫位元組數等
    this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
    //當Client讀寫資料時,如果Datanode出現故障,是否進行Datanode替換的策略
    this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf);

    //獲取當前使用者資訊
    this.ugi = UserGroupInformation.getCurrentUser();
    
    this.authority = nameNodeUri == null? "null": nameNodeUri.getAuthority();
    this.clientName = "DFSClient_" + dfsClientConf.taskId + "_" + 
        DFSUtil.getRandom().nextInt()  + "_" + Thread.currentThread().getId();
    provider = DFSUtil.createKeyProvider(conf);
    if (LOG.isDebugEnabled()) {
      if (provider == null) {
        LOG.debug("No KeyProvider found.");
      } else {
        LOG.debug("Found KeyProvider: " + provider.toString());
      }
    }
    int numResponseToDrop = conf.getInt(
        DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY,
        DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT);
    NameNodeProxies.ProxyAndInfo<ClientProtocol> proxyInfo = null;
    AtomicBoolean nnFallbackToSimpleAuth = new AtomicBoolean(false);
    if (numResponseToDrop > 0) {
      // This case is used for testing.
      LOG.warn(DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY
          + " is set to " + numResponseToDrop
          + ", this hacked client will proactively drop responses");
      proxyInfo = NameNodeProxies.createProxyWithLossyRetryHandler(conf,
          nameNodeUri, ClientProtocol.class, numResponseToDrop,
          nnFallbackToSimpleAuth);
    }
    
    if (proxyInfo != null) {
      this.dtService = proxyInfo.getDelegationTokenService();
      this.namenode = proxyInfo.getProxy();
    } else if (rpcNamenode != null) {
      // This case is used for testing.
      Preconditions.checkArgument(nameNodeUri == null);
      this.namenode = rpcNamenode;
      dtService = null;
    } else {
      Preconditions.checkArgument(nameNodeUri != null,
          "null URI");
      proxyInfo = NameNodeProxies.createProxy(conf, nameNodeUri,
          ClientProtocol.class, nnFallbackToSimpleAuth);
      this.dtService = proxyInfo.getDelegationTokenService();
      this.namenode = proxyInfo.getProxy();
    }

    String localInterfaces[] =
      conf.getTrimmedStrings(DFSConfigKeys.DFS_CLIENT_LOCAL_INTERFACES);
    //本地介面地址
    localInterfaceAddrs = getLocalInterfaceAddrs(localInterfaces);
    if (LOG.isDebugEnabled() && 0 != localInterfaces.length) {
      LOG.debug("Using local interfaces [" +
      Joiner.on(',').join(localInterfaces)+ "] with addresses [" +
      Joiner.on(',').join(localInterfaceAddrs) + "]");
    }
    
    //讀取資料後,是否立即從作業系統緩衝區中刪除
    Boolean readDropBehind = (conf.get(DFS_CLIENT_CACHE_DROP_BEHIND_READS) == null) ?
        null : conf.getBoolean(DFS_CLIENT_CACHE_DROP_BEHIND_READS, false);
    //預讀取位元組數
    Long readahead = (conf.get(DFS_CLIENT_CACHE_READAHEAD) == null) ?
        null : conf.getLong(DFS_CLIENT_CACHE_READAHEAD, 0);
    //寫資料後,是否立即從作業系統緩衝區中刪除
    Boolean writeDropBehind = (conf.get(DFS_CLIENT_CACHE_DROP_BEHIND_WRITES) == null) ?
        null : conf.getBoolean(DFS_CLIENT_CACHE_DROP_BEHIND_WRITES, false);
    this.defaultReadCachingStrategy =
        new CachingStrategy(readDropBehind, readahead);
    this.defaultWriteCachingStrategy =
        new CachingStrategy(writeDropBehind, readahead);
    this.clientContext = ClientContext.get(
        conf.get(DFS_CLIENT_CONTEXT, DFS_CLIENT_CONTEXT_DEFAULT),
        dfsClientConf);
    
    /*hedgedReadThresholdMillis儲存觸發"hedged read"機制的時長,
當Client發現一個數據塊讀取操作太慢時(讀取時長超過hedgedReadThresholdMillis),
那麼Client會啟動另外一個併發操作讀取資料塊的另外一個副本,
之後Client會返回先完成讀取副本的資料。*/
    this.hedgedReadThresholdMillis = conf.getLong(
        DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS,
        DFSConfigKeys.DEFAULT_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS);
    int numThreads = conf.getInt(
        DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE,
        DFSConfigKeys.DEFAULT_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE);
    if (numThreads > 0) {
      this.initThreadsNumForHedgedReads(numThreads);
    }
    //簡單驗證安全層 
    this.saslClient = new SaslDataTransferClient(
      conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf),
      TrustedChannelResolver.getInstance(conf), nnFallbackToSimpleAuth);
  }

建構函式完成兩個功能:

(1)、初始化成員變數

(2)、獲取Namenode的RPCProxy引用,供DFSClient遠端呼叫Namenode的RPC方法

5.1.2 關閉方法

關閉方法為close,程式碼如下:

/**
   * Close the file system, abandoning all of the leases and files being
   * created and close connections to the namenode.
   */
  @Override
  public synchronized void close() throws IOException {
    try {
      if(clientRunning) {
        //關閉所有正在進行寫操作的IO流。
        closeAllFilesBeingWritten(false);
        //設定為false標誌,停止DFSClient對外服務
        clientRunning = false;
        //停止租約管理
        getLeaseRenewer().closeClient(this);
        // close connections to the namenode,關閉與Namenode的RPC連線
        closeConnectionToNamenode();
      }
    } finally {
      if (provider != null) {
        provider.close();
      }
    }
  }

5.1.3 檔案系統管理與配置方法

HDFS管理員通過DFSAdmin工具管理與配置HDFS,DFSAdmin也是通過持有DistributedFileSystem物件的引用,然後進一步呼叫DFSClient類提供的方法執行管理與配置操作的,整個流程比較簡單,我們用DFSClient.rollEdits()方法為例,流程圖如下:

流程圖

DFSClient檔案系統管理與配置方法對應關係圖

DFSClient檔案系統管理與配置方法對應關係圖

DFSClient中還有許多命令是直接建立與Datanode或者Namenode的RPC連線,然後呼叫對應的RPC方法實現的,例如getDatanode、shutdownDatanode()等操作。

5.1.4 HDFS 檔案與目錄操作方法

除了管理與配置HDFS檔案系統外,DFSClient的另外一個重要功能就是操作HDFS檔案與目錄,例如setPermission()、rename()、getFileInfo()、delete()等對檔案/目錄樹的增、刪、改、查等操作。

5.1.5 HDFS檔案讀寫方法

DFSClient對檔案進行讀寫操作,涉及的方法比較多,後面會進行詳細講解。