使用javaAPI操作hdfs
阿新 • • 發佈:2017-10-05
文件系統 ole 文件 緩沖區 api println 不存在 ogg 就會
歡迎到https://github.com/huabingood/everyDayLanguagePractise查看源碼。
一.構建環境
在hadoop的安裝包中的share目錄中有hadoop所有你能想象到的內容。
進入安裝包下的share文件夾你會看到doc和hadoop文件夾。其中doc中是hadoop的整個document。而hadoop文件夾中則存放著所有開發hadoop所有用到的jar包,其依賴放到相應的lib文件夾中。
我們這次用到的是hadoop文件夾中的common以及hdfs文件夾中的內容。在創建工程時,繼續將這兩個文件夾中的jar包添加到相應的工程中,然後將文件夾中的lib文件夾的所有jar包作為依賴也添加到工程文件中(common和hdfs文件的lib文件中的jar包可能存在重復,註意去重。);其中source文件夾是該模塊的源碼包,如果想在IDEA中看到源碼,需要將這些內容也添加到工程中。
二.操作HDFS流程
- 獲取HDFS的配置,根據HDFS的配置獲取整個HDFS操作系統的內容
- 打開HDFS操作系統,進行操作
- 文件夾的操作:增刪改查
- 文件的上傳下載
- 文件的IO操作——hdfs之間的復制
三.具體的操作命令
- 根據配置獲取HDFS文件操作系統(共有三種方式)
- 方法一:直接獲取配置文件方法
通常情況下該方法用於本地有hadoop系統,可以直接進行訪問。此時僅需在配置文件中指定要操作的文件系統為hdfs即可。這裏的conf的配置文件可以設置hdfs的各種參數,並且優先級比配置文件要搞 - 方法二:指定URI路徑,進而獲取配置文件創建操作系統
通常該方法用於本地沒有hadoop系統,但是可以通過URI的方式進行訪問。此時要給給定hadoop的NN節點的訪問路徑,hadoop的用戶名,以及配置文件信息(此時會自動訪問遠程hadoop的配置文件)
- 方法一:直接獲取配置文件方法
/** * 根據配置文件獲取HDFS操作對象 * 有兩種方法: * 1.使用conf直接從本地獲取配置文件創建HDFS對象 * 2.多用於本地沒有hadoop系統,但是可以遠程訪問。使用給定的URI和用戶名,訪問遠程的配置文件,然後創建HDFS對象。 * @return FileSystem */ public FileSystem getHadoopFileSystem() { FileSystem fs= null; Configuration conf = null; // 方法一,本地有配置文件,直接獲取配置文件(core-site.xml,hdfs-site.xml) // 根據配置文件創建HDFS對象 // 此時必須指定hdsf的訪問路徑。 conf = new Configuration(); // 文件系統為必須設置的內容。其他配置參數可以自行設置,且優先級最高 conf.set("fs.defaultFS", "hdfs://huabingood01:9000"); try { // 根據配置文件創建HDFS對象 fs = FileSystem.get(conf); } catch (IOException e) { e.printStackTrace(); logger.error("",e); } // 方法二:本地沒有hadoop系統,但是可以遠程訪問。根據給定的URI和用戶名,訪問hdfs的配置參數 // 此時的conf不需任何設置,只需讀取遠程的配置文件即可。 /*conf = new Configuration(); // Hadoop的用戶名 String hdfsUserName = "huabingood"; URI hdfsUri = null; try { // HDFS的訪問路徑 hdfsUri = new URI("hdfs://huabingood01:9000"); } catch (URISyntaxException e) { e.printStackTrace(); logger.error(e); } try { // 根據遠程的NN節點,獲取配置信息,創建HDFS對象 fs = FileSystem.get(hdfsUri,conf,hdfsUserName); } catch (IOException e) { e.printStackTrace(); logger.error(e); } catch (InterruptedException e) { e.printStackTrace(); logger.error(e); }*/ // 方法三,反正我們沒有搞懂。 /*conf = new Configuration(); conf.addResource("/opt/huabingood/pseudoDistributeHadoop/hadoop-2.6.0-cdh5.10.0/etc/hadoop/core-site.xml"); conf.addResource("/opt/huabingood/pseudoDistributeHadoop/hadoop-2.6.0-cdh5.10.0/etc/hadoop/hdfs-site.xml"); try { fs = FileSystem.get(conf); } catch (IOException e) { e.printStackTrace(); logger.error(e); }*/ return fs; }
2.添加文件夾
/** * 這裏的創建文件夾同shell中的mkdir -p 語序前面的文件夾不存在 * 跟java中的IO操作一樣,也只能對path對象做操作;但是這裏的Path對象是hdfs中的 * @param fs * @return */ public boolean myCreatePath(FileSystem fs){ boolean b = false; Path path = new Path("/hyw/test/huabingood/hyw"); try { // even the path exist,it can also create the path. b = fs.mkdirs(path); } catch (IOException e) { e.printStackTrace(); logger.error(e); } finally { try { fs.close(); } catch (IOException e) { e.printStackTrace(); logger.error(e); } } return b; }
3.刪除文件夾
/** * 刪除文件,實際上刪除的是給定path路徑的最後一個 * 跟java中一樣,也需要path對象,不過是hadoop.fs包中的。 * 實際上delete(Path p)已經過時了,更多使用delete(Path p,boolean recursive) * 後面的布爾值實際上是對文件的刪除,相當於rm -r * @param fs * @return */ public boolean myDropHdfsPath(FileSystem fs){ boolean b = false; // drop the last path Path path = new Path("/huabingood/hadoop.tar.gz"); try { b = fs.delete(path,true); } catch (IOException e) { e.printStackTrace(); logger.error(e); } finally { try { fs.close(); } catch (IOException e) { e.printStackTrace(); logger.error(e); } } return b; }
4.重命名
/** * 重命名文件夾 * @param hdfs * @return */ public boolean myRename(FileSystem hdfs){ boolean b = false; Path oldPath = new Path("/hyw/test/huabingood"); Path newPath = new Path("/hyw/test/huabing"); try { b = hdfs.rename(oldPath,newPath); } catch (IOException e) { e.printStackTrace(); logger.error(e); }finally { try { hdfs.close(); } catch (IOException e) { e.printStackTrace(); logger.error(e); } } return b; }
5.文件夾的遞歸遍歷
/** * 遍歷文件夾 * public FileStatus[] listStatus(Path p) * 通常使用HDFS文件系統的listStatus(path)來獲取改定路徑的子路徑。然後逐個判斷 * 值得註意的是: * 1.並不是總有文件夾中有文件,有些文件夾是空的,如果僅僅做是否為文件的判斷會有問題,必須加文件的長度是否為0的判斷 * 2.使用getPath()方法獲取的是FileStatus對象是帶URL路徑的。使用FileStatus.getPath().toUri().getPath()獲取的路徑才是不帶url的路徑 * @param hdfs * @param listPath 傳入的HDFS開始遍歷的路徑 * @return */ public Set<String> recursiveHdfsPath(FileSystem hdfs,Path listPath){ /*FileStatus[] files = null; try { files = hdfs.listStatus(listPath); Path[] paths = FileUtil.stat2Paths(files); for(int i=0;i<files.length;i++){ if(files[i].isFile()){ // set.add(paths[i].toString()); set.add(paths[i].getName()); }else { recursiveHdfsPath(hdfs,paths[i]); } } } catch (IOException e) { e.printStackTrace(); logger.error(e); }*/ FileStatus[] files = null; try { files = hdfs.listStatus(listPath); // 實際上並不是每個文件夾都會有文件的。 if(files.length == 0){ // 如果不使用toUri(),獲取的路徑帶URL。 set.add(listPath.toUri().getPath()); }else { // 判斷是否為文件 for (FileStatus f : files) { if (files.length == 0 || f.isFile()) { set.add(f.getPath().toUri().getPath()); } else { // 是文件夾,且非空,就繼續遍歷 recursiveHdfsPath(hdfs, f.getPath()); } } } } catch (IOException e) { e.printStackTrace(); logger.error(e); } return set; }
6.文件的判斷
/** * 文件簡單的判斷 * 是否存在 * 是否是文件夾 * 是否是文件 * @param fs */ public void myCheck(FileSystem fs){ boolean isExists = false; boolean isDirectorys = false; boolean isFiles = false; Path path = new Path("/hyw/test/huabingood01"); try { isExists = fs.exists(path); isDirectorys = fs.isDirectory(path); isFiles = fs.isFile(path); } catch (IOException e){ e.printStackTrace(); logger.error(e); } finally { try { fs.close(); } catch (IOException e) { e.printStackTrace(); logger.error(e); } } if(!isExists){ System.out.println("lu jing not cun zai."); }else{ System.out.println("lu jing cun zai."); if(isDirectorys){ System.out.println("Directory"); }else if(isFiles){ System.out.println("Files"); } }
7.文件配置信息的查詢
/** * 獲取配置的所有信息 * 首先,我們要知道配置文件是哪一個 * 然後我們將獲取的配置文件用叠代器接收 * 實際上配置中是KV對,我們可以通過java中的Entry來接收 * */ public void showAllConf(){ Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://huabingood01:9000"); Iterator<Map.Entry<String,String>> it = conf.iterator(); while(it.hasNext()){ Map.Entry<String,String> entry = it.next(); System.out.println(entry.getKey()+"=" +entry.getValue()); } }
8.文件的上傳
/** * 文件下載 * 註意下載的路徑的最後一個地址是下載的文件名 * copyToLocalFile(Path local,Path hdfs) * 下載命令中的參數是沒有任何布爾值的,如果添加了布爾是,意味著這是moveToLocalFile() * @param fs */ public void getFileFromHDFS(FileSystem fs){ Path HDFSPath = new Path("/hyw/test/hadoop-2.6.0-cdh5.10.0.tar.gz"); Path localPath = new Path("/home/huabingood"); try { fs.copyToLocalFile(HDFSPath,localPath); } catch (IOException e) { e.printStackTrace(); logger.error("zhe li you cuo wu !" ,e); }finally { try { fs.close(); } catch (IOException e) { e.printStackTrace(); logger.error("zhe li you cuo wu !", e); } } }
9.文件的下載
/** * 文件的下載 * 註意事項同文件的上傳 * 註意如果上傳的路徑不存在會自動創建 * 如果存在同名的文件,會覆蓋 * @param fs */ public void myPutFile2HDFS(FileSystem fs){ boolean pathExists = false; // 如果上傳的路徑不存在會創建 // 如果該路徑文件已存在,就會覆蓋 Path localPath = new Path("/home/huabingood/繡春刀.rmbv"); Path hdfsPath = new Path("/hyw/test/huabingood/abc/efg/繡春刀.rmbv"); try { fs.copyFromLocalFile(localPath,hdfsPath); } catch (IOException e) { e.printStackTrace(); }finally { try { fs.close(); } catch (IOException e) { e.printStackTrace(); } } }
10.文件的IO操作——HDFS之間文件的復制
/** * hdfs之間文件的復制 * 使用FSDataInputStream來打開文件open(Path p) * 使用FSDataOutputStream開創建寫到的路徑create(Path p) * 使用 IOUtils.copyBytes(FSDataInputStream,FSDataOutputStream,int buffer,Boolean isClose)來進行具體的讀寫 * 說明: * 1.java中使用緩沖區來加速讀取文件,這裏也使用了緩沖區,但是只要指定緩沖區大小即可,不必單獨設置一個新的數組來接受 * 2.最後一個布爾值表示是否使用完後關閉讀寫流。通常是false,如果不手動關會報錯的 * @param hdfs */ public void copyFileBetweenHDFS(FileSystem hdfs){ Path inPath = new Path("/hyw/test/hadoop-2.6.0-cdh5.10.0.tar.gz"); Path outPath = new Path("/huabin/hadoop.tar.gz"); // byte[] ioBuffer = new byte[1024*1024*64]; // int len = 0; FSDataInputStream hdfsIn = null; FSDataOutputStream hdfsOut = null; try { hdfsIn = hdfs.open(inPath); hdfsOut = hdfs.create(outPath); IOUtils.copyBytes(hdfsIn,hdfsOut,1024*1024*64,false); /*while((len=hdfsIn.read(ioBuffer))!= -1){ IOUtils.copyBytes(hdfsIn,hdfsOut,len,true); }*/ } catch (IOException e) { e.printStackTrace(); logger.error(e); }finally { try { hdfsOut.close(); hdfsIn.close(); } catch (IOException e) { e.printStackTrace(); } } }
使用javaAPI操作hdfs