hdfs api 實戰
阿新 • • 發佈:2018-12-07
直接上程式碼:
package com.linewell.hdfs; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.URI; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.Progressable; public class hdfsTest { private static String HDFSUri = "hdfs://192.168.72.129:9000"; public static void main(String[] args) { //mkdir("/local/user"); //在hdfs上建立目錄 //System.out.println(existDir("/la", false)); //判斷hdfs上是否存在該目錄或檔案並列印 //getFile("/local/abc.txt", "D://aaa.txt");// 從hdfs下載檔案到本地系統 try { copyFileToHDFS("D://abc.txt","/local/user/bbb.txt" );//從本地系統上傳檔案到hdfs上 } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * 刪除檔案或者檔案目錄 * * @param path * */ public static void rmdir(String path){ FileSystem fs = getFileSystem(); path = HDFSUri + path; try { fs.delete(new Path(path), true); } catch (IllegalArgumentException | IOException e) { // TODO Auto-generated catch block e.printStackTrace(); }finally { try { fs.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } /** * 從 HDFS 下載檔案到本地 * * @param srcFile HDFS檔案路徑 * @param destPath 本地路徑 */ public static void getFile(String srcFile,String dsetFile){ Configuration conf =new Configuration(); //構建filesystem try { FileSystem fs = FileSystem.get(URI.create(HDFSUri), conf); InputStream is = fs.open(new Path(srcFile)); IOUtils.copyBytes(is, new FileOutputStream(new File(dsetFile)), conf,true); fs.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * 本地檔案上傳至 HDFS * * @param srcFile 原始檔 路徑 * @param destPath hdfs路徑 */ public static void copyFileToHDFS(String srcFile,String destPath)throws Exception{ FileInputStream fis=new FileInputStream(new File(srcFile));//讀取本地檔案 Configuration conf=new Configuration(); FileSystem fs=FileSystem.get(URI.create(HDFSUri), conf); OutputStream os=fs.create(new Path(destPath)); //copy IOUtils.copyBytes(fis, os, 4096, true); //ture 結束後關閉fis和os System.out.println("拷貝完成..."); fs.close(); } /* * 判斷目錄是否存在 * @param filepath 目錄路徑 * @param Create 判斷是否建立目錄 * * */ public static boolean existDir(String filePath,boolean Create){ boolean flag =false; if(StringUtils.isEmpty(filePath)){ return flag; } try { Path path =new Path(filePath); FileSystem fs =getFileSystem(); if(Create){ if(!fs.exists(path)){ fs.mkdirs(path); } } if(fs.isDirectory(path)){ flag=true; } }catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } return flag; } /** * 建立檔案目錄 * * @param path 檔案路徑 */ public static void mkdir(String path) { try { Configuration conf =new Configuration(); FileSystem fs =FileSystem.get(URI.create(HDFSUri),conf); System.out.println("filepath="+path); fs.mkdirs(new Path(path)); fs.close(); } catch (IllegalArgumentException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * 獲取檔案系統 * * @return FileSystem 檔案系統 */ public static FileSystem getFileSystem() { //讀取配置檔案 Configuration conf = new Configuration(); // 檔案系統 FileSystem fs = null; String hdfsUri = HDFSUri; if(StringUtils.isBlank(hdfsUri)){ // 返回預設檔案系統 如果在 Hadoop叢集下執行,使用此種方法可直接獲取預設檔案系統 try { fs = FileSystem.get(conf); } catch (IOException e) { e.printStackTrace(); } }else{ // 返回指定的檔案系統,如果在本地測試,需要使用此種方法獲取檔案系統 try { URI uri = new URI(hdfsUri.trim()); fs = FileSystem.get(uri,conf); } catch (Exception e) { e.printStackTrace(); } } return fs; } }