1. 程式人生 > >如何恢復未釋放租約的HDFS文件

如何恢復未釋放租約的HDFS文件

too cat converter lean amr table response mean mis

之前有文章介紹過HDFS租約帶來的問題,導致spark應用無法正常讀取文件,只能將異常文件找出並且刪除後,任務才能繼續執行。

但是刪除文件實在是下下策,而且文件本身其實並未損壞,只是因為已經close的客戶端沒有及時的釋放租約導致。

按照Hadoop官網的說法,HDFS會啟動一個單獨的線程,專門處理未及時釋放的租約,自動釋放超過“硬超時”(默認1小時)仍未釋放的租約,但是從問題的現象上來看,這個線程並沒有正常的工作,甚至懷疑這個線程是否沒有啟動,我使用的是CDH集群,可能與相關的設置有關,這一點需要確認。

如果Hadoop沒有自動清理租約,我們有辦法手動的刷新租約嗎?答案是肯定的。

在網上查看資料時,發現HDFS源碼中的DistributedFileSystem類提供了一個叫做recoverLease的方法,可以主動的刷新租約。但是非常奇怪,既然已經為外界提供了這個接口,為什麽不提供shell指令給用戶使用呢?為什麽只能通過代碼的方式調用呢?我使用的是hadoop-2.6.0,也許後期的版本有所更新,這一點也需要求證。

下面看一下這個方法的源碼:

/** 
   * Start the lease recovery of a file
   *
   * @param f a file
   * @return true if the file is already closed
   * @throws IOException if an error occurs
   */
  public boolean recoverLease(final Path f) throws IOException {
    Path absF = fixRelativePart(f);
    return new FileSystemLinkResolver<Boolean>() {
      @Override
      
public Boolean doCall(final Path p) throws IOException, UnresolvedLinkException { return dfs.recoverLease(getPathName(p)); } @Override public Boolean next(final FileSystem fs, final Path p) throws IOException { if (fs instanceof DistributedFileSystem) { DistributedFileSystem myDfs
= (DistributedFileSystem)fs; return myDfs.recoverLease(p); } throw new UnsupportedOperationException("Cannot recoverLease through" + " a symlink to a non-DistributedFileSystem: " + f + " -> " + p); } }.resolve(this, absF); }

有興趣的朋友可以下載hadoop源碼來仔細推敲一下內部的實現原理,這裏我們只說如何調用,解決我們的問題:

    public static void recoverLease(String path) throws IOException {
        DistributedFileSystem fs = new DistributedFileSystem();
        Configuration conf = new Configuration();
        fs.initialize(URI.create(path), conf);
        fs.recoverLease(new Path(path));
        fs.close();
    }

這是我編寫的一個調用改接口的簡單的封裝方法,需要註意的是,此處傳入的path,必須是包含文件系統以及namenode和端口號的全路徑,比如:

hdfs://namenode1:9000/xxx/xxx.log

如果只需要恢復單個文件,調用上述方法即可,但是通常情況下,我們需要對一個目錄進行遞歸的處理,即恢復指定目錄下所有租約異常的文件。

這個時候,我們需要先找出指定目錄下所有租約異常的文件,形成一個Set或者List,然後再遍歷這個容器,對每個文件進行恢復。

尋找文件列表的方法如下:

public static Set<String> getOpenforwriteFileList(String dir) throws IOException {
        /*拼接URL地址,發送給namenode監聽的dfs.namenode.http-address端口,獲取所需數據*/
        StringBuilder url = new StringBuilder();
        url.append("/fsck?ugi=").append("dev");
        url.append("&openforwrite=1");

        /*獲得namenode的主機名以及dfs.namenode.http-address監聽端口,例如:http://hadoopnode1:50070*/
        Path dirpath;
        URI namenodeAddress;
        dirpath = HDFSUtil.getResolvedPath(dir);
        namenodeAddress = HDFSUtil.getDFSHttpAddress(dirpath);

        url.insert(0, namenodeAddress);
        try {
            url.append("&path=").append(URLEncoder.encode(
                    Path.getPathWithoutSchemeAndAuthority(new Path(dir)).toString(), "UTF-8"));
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }

        Configuration conf = new Configuration();
        URLConnectionFactory connectionFactory = URLConnectionFactory.newDefaultURLConnectionFactory(conf);
        URL path = null;
        try {
            path = new URL(url.toString());
        } catch (MalformedURLException e) {
            e.printStackTrace();
        }

        URLConnection connection;
        BufferedReader input = null;
        try {
            connection = connectionFactory.openConnection(path, UserGroupInformation.isSecurityEnabled());
            InputStream stream = connection.getInputStream();
            input = new BufferedReader(new InputStreamReader(stream, "UTF-8"));
        } catch (IOException | AuthenticationException e) {
            e.printStackTrace();
        }

        if (input == null) {
            System.err.println("Cannot get response from namenode, url = " + url);
            return null;
        }

        String line;
        Set<String> resultSet = new HashSet<>();
        try {
            while ((line = input.readLine()) != null) {
                if (line.contains("MISSING") || line.contains("OPENFORWRITE")) {
                    String regEx = "/[^ ]*";
                    Pattern pattern = Pattern.compile(regEx);
                    Matcher matcher = pattern.matcher(line);
                    while (matcher.find()) {
                        resultSet.add(matcher.group().replaceAll(":", ""));
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            input.close();
        }

        return resultSet;

    }

其實獲取租約異常列表的方法是我從HDFS源碼的org.apache.hadoop.hdfs.tools.DFSck中仿照而來的,通過向NameNode的dfs.namenode.http-address端口通信,獲取openforwrite狀態的文件列表,然後通過正則匹配以及字符串切割,獲取所需的內容。

順便提一句,由於此代碼是Java代碼,並且返回的Set類型為java.util.Set,如果在Scala代碼中調用,則需要將Set類型轉化為scala.collection.immutable.Set,具體方法如下:

    /*獲取需要被恢復租約的文件列表,返回類型為java.util.Set*/
    val javaFilesSet = HDFSUtil.getOpenforwriteFileList(hdfsPrefix + recoverDirPath)
    if (null == javaFilesSet || javaFilesSet.isEmpty) {
      println("No files need to recover lease : " + hdfsPrefix + recoverDirPath)
      return
    }

    /*將java.util.Set轉換成scala.collection.immutable.Set*/
    import scala.collection.JavaConverters._
    val filesSet = javaFilesSet.asScala.toSet

至此,利用以上兩個方法,即可獲取指定目錄下的所有租約異常的文件列表,然後遍歷調用租約恢復接口,即可實現批量恢復。

如何恢復未釋放租約的HDFS文件