掌握HDFS的Java API接口訪問
阿新 • • 發佈:2017-08-08
lock getpath println bsp paths bar 下載 param syntax
HDFS設計的主要目的是對海量數據進行存儲,也就是說在其上能夠存儲很大量文件(可以存儲TB級的文件)。HDFS將這些文件分割之後,存儲在不同的DataNode上, HDFS 提供了兩種訪問接口:Shell接口和Java API 接口,對HDFS裏面的文件進行操作,具體每個Block放在哪臺DataNode上面,對於開發者來說是透明的。
1、獲取文件系統
1 /** 2 * 獲取文件系統 3 * 4 * @return FileSystem 5 */ 6 public static FileSystem getFileSystem() { 7 //讀取配置文件 8 Configuration conf = new Configuration(); 9 // 文件系統 10 FileSystem fs = null; 11 12 String hdfsUri = HDFSUri; 13 if(StringUtils.isBlank(hdfsUri)){ 14 // 返回默認文件系統 如果在 Hadoop集群下運行,使用此種方法可直接獲取默認文件系統 15 try { 16 fs = FileSystem.get(conf); 17 } catch (IOException e) { 18 logger.error("", e); 19 } 20 }else{ 21 // 返回指定的文件系統,如果在本地測試,需要使用此種方法獲取文件系統 22 try { 23 URI uri = new URI(hdfsUri.trim()); 24 fs = FileSystem.get(uri,conf); 25 } catch (URISyntaxException | IOException e) { 26 logger.error("", e); 27 } 28 } 29 30 return fs; 31 }
2、創建文件目錄
1 /** 2 * 創建文件目錄 3 * 4 * @param path 5 */ 6 public static void mkdir(String path) { 7 try { 8 // 獲取文件系統 9 FileSystem fs = getFileSystem(); 10 11 String hdfsUri = HDFSUri; 12 if(StringUtils.isNotBlank(hdfsUri)){ 13 path = hdfsUri + path; 14 } 15 16 // 創建目錄 17 fs.mkdirs(new Path(path)); 18 19 //釋放資源 20 fs.close(); 21 } catch (IllegalArgumentException | IOException e) { 22 logger.error("", e); 23 } 24 }
3、刪除文件或者文件目錄
1 /** 2 * 刪除文件或者文件目錄 3 * 4 * @param path 5 */ 6 public static void rmdir(String path) { 7 try { 8 // 返回FileSystem對象 9 FileSystem fs = getFileSystem(); 10 11 String hdfsUri = HDFSUri; 12 if(StringUtils.isNotBlank(hdfsUri)){ 13 path = hdfsUri + path; 14 } 15 16 // 刪除文件或者文件目錄 delete(Path f) 此方法已經棄用 17 fs.delete(new Path(path),true); 18 19 // 釋放資源 20 fs.close(); 21 } catch (IllegalArgumentException | IOException e) { 22 logger.error("", e); 23 } 24 }
3、根據filter獲取目錄下的文件
1 /** 2 * 根據filter獲取目錄下的文件 3 * 4 * @param path 5 * @param pathFilter 6 * @return String[] 7 */ 8 public static String[] ListFile(String path,PathFilter pathFilter) { 9 String[] files = new String[0]; 10 11 try { 12 // 返回FileSystem對象 13 FileSystem fs = getFileSystem(); 14 15 String hdfsUri = HDFSUri; 16 if(StringUtils.isNotBlank(hdfsUri)){ 17 path = hdfsUri + path; 18 } 19 20 FileStatus[] status; 21 if(pathFilter != null){ 22 // 根據filter列出目錄內容 23 status = fs.listStatus(new Path(path),pathFilter); 24 }else{ 25 // 列出目錄內容 26 status = fs.listStatus(new Path(path)); 27 } 28 29 // 獲取目錄下的所有文件路徑 30 Path[] listedPaths = FileUtil.stat2Paths(status); 31 // 轉換String[] 32 if (listedPaths != null && listedPaths.length > 0){ 33 files = new String[listedPaths.length]; 34 for (int i = 0; i < files.length; i++){ 35 files[i] = listedPaths[i].toString(); 36 } 37 } 38 // 釋放資源 39 fs.close(); 40 } catch (IllegalArgumentException | IOException e) { 41 logger.error("", e); 42 } 43 44 return files; 45 }
4、文件上傳至 HDFS
1 /** 2 * 文件上傳至 HDFS 3 * 4 * @param delSrc 5 * @param overwrite 6 * @param srcFile 7 * @param destPath 8 */ 9 public static void copyFileToHDFS(boolean delSrc, boolean overwrite,String srcFile,String destPath) { 10 // 源文件路徑是Linux下的路徑,如果在 windows 下測試,需要改寫為Windows下的路徑,比如D://hadoop/djt/weibo.txt 11 Path srcPath = new Path(srcFile); 12 13 // 目的路徑 14 String hdfsUri = HDFSUri; 15 if(StringUtils.isNotBlank(hdfsUri)){ 16 destPath = hdfsUri + destPath; 17 } 18 Path dstPath = new Path(destPath); 19 20 // 實現文件上傳 21 try { 22 // 獲取FileSystem對象 23 FileSystem fs = getFileSystem(); 24 fs.copyFromLocalFile(srcPath, dstPath); 25 fs.copyFromLocalFile(delSrc,overwrite,srcPath, dstPath); 26 //釋放資源 27 fs.close(); 28 } catch (IOException e) { 29 logger.error("", e); 30 } 31 }
5、從 HDFS 下載文件
1 /** 2 * 從 HDFS 下載文件 3 * 4 * @param srcFile 5 * @param destPath 6 */ 7 public static void getFile(String srcFile,String destPath) { 8 // 源文件路徑 9 String hdfsUri = HDFSUri; 10 if(StringUtils.isNotBlank(hdfsUri)){ 11 srcFile = hdfsUri + srcFile; 12 } 13 Path srcPath = new Path(srcFile); 14 15 // 目的路徑是Linux下的路徑,如果在 windows 下測試,需要改寫為Windows下的路徑,比如D://hadoop/djt/ 16 Path dstPath = new Path(destPath); 17 18 try { 19 // 獲取FileSystem對象 20 FileSystem fs = getFileSystem(); 21 // 下載hdfs上的文件 22 fs.copyToLocalFile(srcPath, dstPath); 23 // 釋放資源 24 fs.close(); 25 } catch (IOException e) { 26 logger.error("", e); 27 } 28 }
6、獲取 HDFS 集群節點信息
1 /** 2 * 獲取 HDFS 集群節點信息 3 * 4 * @return DatanodeInfo[] 5 */ 6 public static DatanodeInfo[] getHDFSNodes() { 7 // 獲取所有節點 8 DatanodeInfo[] dataNodeStats = new DatanodeInfo[0]; 9 10 try { 11 // 返回FileSystem對象 12 FileSystem fs = getFileSystem(); 13 14 // 獲取分布式文件系統 15 DistributedFileSystem hdfs = (DistributedFileSystem)fs; 16 17 dataNodeStats = hdfs.getDataNodeStats(); 18 } catch (IOException e) { 19 logger.error("", e); 20 } 21 return dataNodeStats; 22 }
7、查找某個文件在 HDFS集群的位置
1 /** 2 * 查找某個文件在 HDFS集群的位置 3 * 4 * @param filePath 5 * @return BlockLocation[] 6 */ 7 public static BlockLocation[] getFileBlockLocations(String filePath) { 8 // 文件路徑 9 String hdfsUri = HDFSUri; 10 if(StringUtils.isNotBlank(hdfsUri)){ 11 filePath = hdfsUri + filePath; 12 } 13 Path path = new Path(filePath); 14 15 // 文件塊位置列表 16 BlockLocation[] blkLocations = new BlockLocation[0]; 17 try { 18 // 返回FileSystem對象 19 FileSystem fs = getFileSystem(); 20 // 獲取文件目錄 21 FileStatus filestatus = fs.getFileStatus(path); 22 //獲取文件塊位置列表 23 blkLocations = fs.getFileBlockLocations(filestatus, 0, filestatus.getLen()); 24 } catch (IOException e) { 25 logger.error("", e); 26 } 27 return blkLocations; 28 }
8、文件重命名
1 /** 2 * 文件重命名 3 * 4 * @param srcPath 5 * @param dstPath 6 */ 7 public boolean rename(String srcPath, String dstPath){ 8 boolean flag = false; 9 try { 10 // 返回FileSystem對象 11 FileSystem fs = getFileSystem(); 12 13 String hdfsUri = HDFSUri; 14 if(StringUtils.isNotBlank(hdfsUri)){ 15 srcPath = hdfsUri + srcPath; 16 dstPath = hdfsUri + dstPath; 17 } 18 19 flag = fs.rename(new Path(srcPath), new Path(dstPath)); 20 } catch (IOException e) { 21 logger.error("{} rename to {} error.", srcPath, dstPath); 22 } 23 24 return flag; 25 }
9、判斷目錄是否存在
1 /** 2 * 判斷目錄是否存在 3 * 4 * @param srcPath 5 * @param dstPath 6 */ 7 public boolean existDir(String filePath, boolean create){ 8 boolean flag = false; 9 10 if (StringUtils.isEmpty(filePath)){ 11 return flag; 12 } 13 14 try{ 15 Path path = new Path(filePath); 16 // FileSystem對象 17 FileSystem fs = getFileSystem(); 18 19 if (create){ 20 if (!fs.exists(path)){ 21 fs.mkdirs(path); 22 } 23 } 24 25 if (fs.isDirectory(path)){ 26 flag = true; 27 } 28 }catch (Exception e){ 29 logger.error("", e); 30 } 31 32 return flag; 33 }
10 查看HDFS文件的最後修改時間
- public void testgetModifyTime() throws Exception {
- Configuration conf = new Configuration();
- FileSystem hdfs = FileSystem.get(conf);
- Path dst = new Path(hdfsPath);
- FileStatus files[] = hdfs.listStatus(dst);
- for (FileStatus file : files) {
- System.out.println(file.getPath() + "\t"
- + file.getModificationTime());
- System.out.println(file.getPath() + "\t"
- + new Date(file.getModificationTime()));
- }
- // 查看HDFS文件是否存在
- public void testExists() throws Exception {
- Configuration conf = new Configuration();
- FileSystem hdfs = FileSystem.get(conf);
- Path dst = new Path(hdfsPath + "file01.txt");
- boolean ok = hdfs.exists(dst);
- System.out.println(ok ? "文件存在" : "文件不存在");
- }
- // 獲取HDFS集群上所有節點名稱
- public void testGetHostName() throws Exception {
- Configuration conf = new Configuration();
- DistributedFileSystem hdfs = (DistributedFileSystem) FileSystem
- .get(conf);
- DatanodeInfo[] dataNodeStats = hdfs.getDataNodeStats();
- for (DatanodeInfo dataNode : dataNodeStats) {
- System.out.println(dataNode.getHostName() + "\t"
- + dataNode.getName());
- }
- }
掌握HDFS的Java API接口訪問