如何恢復未釋放租約的HDFS文件
之前有文章介紹過HDFS租約帶來的問題,導致spark應用無法正常讀取文件,只能將異常文件找出並且刪除後,任務才能繼續執行。
但是刪除文件實在是下下策,而且文件本身其實並未損壞,只是因為已經close的客戶端沒有及時的釋放租約導致。
按照Hadoop官網的說法,HDFS會啟動一個單獨的線程,專門處理未及時釋放的租約,自動釋放超過“硬超時”(默認1小時)仍未釋放的租約,但是從問題的現象上來看,這個線程並沒有正常的工作,甚至懷疑這個線程是否沒有啟動,我使用的是CDH集群,可能與相關的設置有關,這一點需要確認。
如果Hadoop沒有自動清理租約,我們有辦法手動的刷新租約嗎?答案是肯定的。
在網上查看資料時,發現HDFS源碼中的DistributedFileSystem類提供了一個叫做recoverLease的方法,可以主動的刷新租約。但是非常奇怪,既然已經為外界提供了這個接口,為什麽不提供shell指令給用戶使用呢?為什麽只能通過代碼的方式調用呢?我使用的是hadoop-2.6.0,也許後期的版本有所更新,這一點也需要求證。
下面看一下這個方法的源碼:
/** * Start the lease recovery of a file * * @param f a file * @return true if the file is already closed * @throws IOException if an error occurs */ public boolean recoverLease(final Path f) throws IOException { Path absF = fixRelativePart(f); return new FileSystemLinkResolver<Boolean>() { @Overridepublic Boolean doCall(final Path p) throws IOException, UnresolvedLinkException { return dfs.recoverLease(getPathName(p)); } @Override public Boolean next(final FileSystem fs, final Path p) throws IOException { if (fs instanceof DistributedFileSystem) { DistributedFileSystem myDfs= (DistributedFileSystem)fs; return myDfs.recoverLease(p); } throw new UnsupportedOperationException("Cannot recoverLease through" + " a symlink to a non-DistributedFileSystem: " + f + " -> " + p); } }.resolve(this, absF); }
有興趣的朋友可以下載hadoop源碼來仔細推敲一下內部的實現原理,這裏我們只說如何調用,解決我們的問題:
public static void recoverLease(String path) throws IOException { DistributedFileSystem fs = new DistributedFileSystem(); Configuration conf = new Configuration(); fs.initialize(URI.create(path), conf); fs.recoverLease(new Path(path)); fs.close(); }
這是我編寫的一個調用改接口的簡單的封裝方法,需要註意的是,此處傳入的path,必須是包含文件系統以及namenode和端口號的全路徑,比如:
hdfs://namenode1:9000/xxx/xxx.log
如果只需要恢復單個文件,調用上述方法即可,但是通常情況下,我們需要對一個目錄進行遞歸的處理,即恢復指定目錄下所有租約異常的文件。
這個時候,我們需要先找出指定目錄下所有租約異常的文件,形成一個Set或者List,然後再遍歷這個容器,對每個文件進行恢復。
尋找文件列表的方法如下:
public static Set<String> getOpenforwriteFileList(String dir) throws IOException { /*拼接URL地址,發送給namenode監聽的dfs.namenode.http-address端口,獲取所需數據*/ StringBuilder url = new StringBuilder(); url.append("/fsck?ugi=").append("dev"); url.append("&openforwrite=1"); /*獲得namenode的主機名以及dfs.namenode.http-address監聽端口,例如:http://hadoopnode1:50070*/ Path dirpath; URI namenodeAddress; dirpath = HDFSUtil.getResolvedPath(dir); namenodeAddress = HDFSUtil.getDFSHttpAddress(dirpath); url.insert(0, namenodeAddress); try { url.append("&path=").append(URLEncoder.encode( Path.getPathWithoutSchemeAndAuthority(new Path(dir)).toString(), "UTF-8")); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } Configuration conf = new Configuration(); URLConnectionFactory connectionFactory = URLConnectionFactory.newDefaultURLConnectionFactory(conf); URL path = null; try { path = new URL(url.toString()); } catch (MalformedURLException e) { e.printStackTrace(); } URLConnection connection; BufferedReader input = null; try { connection = connectionFactory.openConnection(path, UserGroupInformation.isSecurityEnabled()); InputStream stream = connection.getInputStream(); input = new BufferedReader(new InputStreamReader(stream, "UTF-8")); } catch (IOException | AuthenticationException e) { e.printStackTrace(); } if (input == null) { System.err.println("Cannot get response from namenode, url = " + url); return null; } String line; Set<String> resultSet = new HashSet<>(); try { while ((line = input.readLine()) != null) { if (line.contains("MISSING") || line.contains("OPENFORWRITE")) { String regEx = "/[^ ]*"; Pattern pattern = Pattern.compile(regEx); Matcher matcher = pattern.matcher(line); while (matcher.find()) { resultSet.add(matcher.group().replaceAll(":", "")); } } } } catch (IOException e) { e.printStackTrace(); } finally { input.close(); } return resultSet; }
其實獲取租約異常列表的方法是我從HDFS源碼的org.apache.hadoop.hdfs.tools.DFSck中仿照而來的,通過向NameNode的dfs.namenode.http-address端口通信,獲取openforwrite狀態的文件列表,然後通過正則匹配以及字符串切割,獲取所需的內容。
順便提一句,由於此代碼是Java代碼,並且返回的Set類型為java.util.Set,如果在Scala代碼中調用,則需要將Set類型轉化為scala.collection.immutable.Set,具體方法如下:
/*獲取需要被恢復租約的文件列表,返回類型為java.util.Set*/ val javaFilesSet = HDFSUtil.getOpenforwriteFileList(hdfsPrefix + recoverDirPath) if (null == javaFilesSet || javaFilesSet.isEmpty) { println("No files need to recover lease : " + hdfsPrefix + recoverDirPath) return } /*將java.util.Set轉換成scala.collection.immutable.Set*/ import scala.collection.JavaConverters._ val filesSet = javaFilesSet.asScala.toSet
至此,利用以上兩個方法,即可獲取指定目錄下的所有租約異常的文件列表,然後遍歷調用租約恢復接口,即可實現批量恢復。
如何恢復未釋放租約的HDFS文件