Java實現HadoopHA叢集的hdfs控制
阿新 • • 發佈:2018-12-05
一、HadoopHA的搭建:https://www.cnblogs.com/null-/p/10000309.html
二、pom檔案依賴:
<dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.7.4</version></dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.7.4</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.4</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>2.7.2</version> </dependency> <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.25</version> </dependency> <!-- https://mvnrepository.com/artifact/junit/junit --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> </dependencies>
三、控制程式碼:
package com.hdfs.demo; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.Iterator; import java.util.Map; import java.util.Set; /** * @author 王傳禮 */ public class HdfsDemo { /** * 根據配置獲取HDFS檔案作業系統 * * @return FileSystem */ public static FileSystem getHadoopFileSystem() { FileSystem fs = null; Configuration conf = null; //方法:本地沒有hadoop系統,但可以遠端訪問。根據給定的URI和使用者名稱,訪問hdfs的配置引數 conf = new Configuration(); //Hadoop的使用者名稱 String hdfsUserNmae = "root"; URI hdfsUri = null; try { hdfsUri = new URI("hdfs://192.168.182.135:8020"); // HDFS的訪問路徑 } catch (URISyntaxException e) { e.printStackTrace(); } try { //根據遠端的NN節點,獲取配置資訊,建立HDFS物件 fs = FileSystem.get(hdfsUri, conf, hdfsUserNmae); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return fs; } /** * 這裡的建立資料夾同shell中的mkdir -p 語序前面的資料夾不存在 * 跟java中的IO操作一樣,也只能對path物件做操作;但是這裡的Path物件是hdfs中的 * * @param fs,filepath * @return */ public static boolean myCreatePath(FileSystem fs,String filepath) { boolean b = false; Path path = new Path(filepath); try { // even the path exist,it can also create the path. b = fs.mkdirs(path); } catch (IOException e) { e.printStackTrace(); } finally { try { fs.close(); } catch (IOException e) { e.printStackTrace(); } } return b; } /** * 刪除檔案,實際上刪除的是給定path路徑的最後一個 * 跟java中一樣,也需要path物件,不過是hadoop.fs包中的。 * 實際上delete(Path p)已經過時了,更多使用delete(Path p,boolean recursive) * 後面的布林值實際上是對檔案的刪除,相當於rm -r * * @param fs * @return */ public static boolean myDropHdfsPath(FileSystem fs, String filepath) { boolean b = false; // drop the last path Path path = new Path(filepath); try { b = fs.delete(path, true); } catch (IOException e) { e.printStackTrace(); } finally { try { fs.close(); } catch (IOException e) { e.printStackTrace(); } } return b; } /** * 重新命名資料夾 * * @param hdfs * @return */ public static boolean myRename(FileSystem hdfs, String oldname, String newname) { boolean b = false; Path oldPath = new Path(oldname); Path newPath = new Path(newname); try { b = hdfs.rename(oldPath, newPath); } catch (IOException e) { e.printStackTrace(); } finally { try { hdfs.close(); } catch (IOException e) { e.printStackTrace(); } } return b; } /** * 遍歷資料夾 * public FileStatus[] listStatus(Path p) * 通常使用HDFS檔案系統的listStatus(path)來獲取改定路徑的子路徑。然後逐個判斷 * 值得注意的是: * 1.並不是總有資料夾中有檔案,有些資料夾是空的,如果僅僅做是否為檔案的判斷會有問題,必須加檔案的長度是否為0的判斷 * 2.使用getPath()方法獲取的是FileStatus物件是帶URL路徑的。使用FileStatus.getPath().toUri().getPath()獲取的路徑才是不帶url的路徑 * * @param hdfs * @param listPath 傳入的HDFS開始遍歷的路徑 * @return */ public static Set<String> recursiveHdfsPath(FileSystem hdfs, Path listPath) { /*FileStatus[] files = null; try { files = hdfs.listStatus(listPath); Path[] paths = FileUtil.stat2Paths(files); for(int i=0;i<files.length;i++){ if(files[i].isFile()){ // set.add(paths[i].toString()); set.add(paths[i].getName()); }else { recursiveHdfsPath(hdfs,paths[i]); } } } catch (IOException e) { e.printStackTrace(); logger.error(e); }*/ FileStatus[] files = null; Set<String> set = null; try { files = hdfs.listStatus(listPath); // 實際上並不是每個資料夾都會有檔案的。 if (files.length == 0) { // 如果不使用toUri(),獲取的路徑帶URL。 set.add(listPath.toUri().getPath()); } else { // 判斷是否為檔案 for (FileStatus f : files) { if (files.length == 0 || f.isFile()) { set.add(f.getPath().toUri().getPath()); } else { // 是資料夾,且非空,就繼續遍歷 recursiveHdfsPath(hdfs, f.getPath()); } } } } catch (IOException e) { e.printStackTrace(); } return set; } /** * 檔案簡單的判斷 * 是否存在 * 是否是資料夾 * 是否是檔案 * * @param fs */ public static void myCheck(FileSystem fs, String filepath) { boolean isExists = false; boolean isDirectorys = false; boolean isFiles = false; Path path = new Path(filepath); try { isExists = fs.exists(path); isDirectorys = fs.isDirectory(path); isFiles = fs.isFile(path); } catch (IOException e) { e.printStackTrace(); } finally { try { fs.close(); } catch (IOException e) { e.printStackTrace(); } } if (!isExists) { System.out.println("The path is not exist."); } else { System.out.println("The path is exist."); if (isDirectorys) { System.out.println("This is a Directory"); } else if (isFiles) { System.out.println("This is Files"); } } } /** * 獲取配置的所有資訊 * 首先,我們要知道配置檔案是哪一個 * 然後我們將獲取的配置檔案用迭代器接收 * 實際上配置中是KV對,我們可以通過java中的Entry來接收 */ public static void showAllConf() { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://node1:8020"); Iterator<Map.Entry<String, String>> it = conf.iterator(); while (it.hasNext()) { Map.Entry<String, String> entry = it.next(); System.out.println(entry.getKey() + "=" + entry.getValue()); } } /** * 檔案下載 * 注意下載的路徑的最後一個地址是下載的檔名 * copyToLocalFile(Path local,Path hdfs) * 下載命令中的引數是沒有任何布林值的,如果添加了布林是,意味著這是moveToLocalFile() *檔案下載有許可權要求 要有寫的許可權 * @param fs */ public static void getFileFromHDFS(FileSystem fs, String dfsFile, String locPath) { Path HDFSPath = new Path(dfsFile); Path localPath = new Path(locPath); try { fs.copyToLocalFile(HDFSPath, localPath); System.out.println("File download."); } catch (IOException e) { e.printStackTrace(); } finally { try { fs.close(); } catch (IOException e) { e.printStackTrace(); } } } /** * 檔案的上傳 * 注意事項同文件的上傳 * 注意如果上傳的路徑不存在會自動建立 * 如果存在同名的檔案,會覆蓋 * * @param fs */ public static void myPutFile2HDFS(FileSystem fs, String localFile, String dfsPath) { boolean pathExists = false; // 如果上傳的路徑不存在會建立 // 如果該路徑檔案已存在,就會覆蓋 Path localPath = new Path(localFile); Path hdfsPath = new Path(dfsPath); try { fs.copyFromLocalFile(localPath, hdfsPath); System.out.println("File upload."); } catch (IOException e) { e.printStackTrace(); } finally { try { fs.close(); } catch (IOException e) { e.printStackTrace(); } } } /** * hdfs之間檔案的複製 * 使用FSDataInputStream來開啟檔案open(Path p) * 使用FSDataOutputStream開建立寫到的路徑create(Path p) * 使用 IOUtils.copyBytes(FSDataInputStream,FSDataOutputStream,int buffer,Boolean isClose)來進行具體的讀寫 * 說明: * 1.java中使用緩衝區來加速讀取檔案,這裡也使用了緩衝區,但是隻要指定緩衝區大小即可,不必單獨設定一個新的陣列來接受 * 2.最後一個布林值表示是否使用完後關閉讀寫流。通常是false,如果不手動關會報錯的 * * @param hdfs */ public static void copyFileBetweenHDFS(FileSystem hdfs, String in, String out) { Path inPath = new Path(in); Path outPath = new Path(out); // byte[] ioBuffer = new byte[1024*1024*64]; // int len = 0; FSDataInputStream hdfsIn = null; FSDataOutputStream hdfsOut = null; try { hdfsIn = hdfs.open(inPath); hdfsOut = hdfs.create(outPath); IOUtils.copyBytes(hdfsIn, hdfsOut, 1024 * 1024 * 64, false); } catch (IOException e) { e.printStackTrace(); } finally { try { hdfsOut.close(); hdfsIn.close(); } catch (IOException e) { e.printStackTrace(); } } } }
四、測試程式碼
package com.hdfs.demo; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.junit.Test; import java.util.Scanner; import java.util.Set; import static org.junit.Assert.*; public class HdfsTest { Scanner sc = new Scanner(System.in); FileSystem fs = HdfsDemo.getHadoopFileSystem(); @Test public void myCreatePath() { //目錄建立測試 String path = "/usr/test/input"; System.out.println(HdfsDemo.myCreatePath(fs,path)); } @Test public void myDropHdfsPath() { // 目錄刪除 String path = "/usr/test/output"; System.out.println(HdfsDemo.myDropHdfsPath(fs,path)); } @Test public void myRename() { //檔案重新命名 String oldName = "/usr/test/input"; String newName = "/usr/test/renameInput"; System.out.println(HdfsDemo.myRename(fs,oldName,newName)); } @Test public void recursiveHdfsPath() { //遍歷資料夾 Path path = new Path("/usr/test/"); Set<String> set = HdfsDemo.recursiveHdfsPath(fs, path); for (String str : set) { System.out.println(str); } } @Test public void myCheck() { //檔案簡單的判斷 是否存在 是否是資料夾 是否是檔案 String path = "/usr/test/input/file.txt"; HdfsDemo.myCheck(fs,path); } @Test public void showAllConf() { //獲取配置的所有資訊 HdfsDemo.showAllConf(); } @Test public void getFileFromHDFS() { //檔案下載 檔案下載有許可權要求 要有寫的許可權 0644error String dfsFile = "/usr/test/input/file.txt"; String locPath = "e://temp/data/"; HdfsDemo.getFileFromHDFS(fs,dfsFile,locPath); } @Test public void myPutFile2HDFS() { //檔案的上傳 String localFile = "e://temp/file.txt"; String dfsPath = "/usr/test/input"; HdfsDemo.myPutFile2HDFS(fs,localFile,dfsPath); } @Test public void copyFileBetweenHDFS() { //hdfs之間檔案的複製 String in = "/usr/test/output"; String out = "/usr/test/input"; HdfsDemo.copyFileBetweenHDFS(fs,in,out); } }