十三、根據HDFS提供的API,實現檔案上傳、下載、刪除、重新命名、移動
阿新 • • 發佈:2019-01-06
一、
根據HDFS提供的API,實現以下功能:
針對檔案: 上傳、下載、刪除、重新命名、移動
package HdfsApi; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; public class HdfsApi { //獲取FS public static FileSystem getFs() throws IOException{ //獲取配置檔案 Configuration conf = new Configuration(); //獲取檔案系統 FileSystem fs = FileSystem.get(conf); return fs; } //讀取檔案 public static void readFile(String src) throws IOException{ // 獲取 FileSystem FileSystem fs = getFs(); //讀的路徑 Path readPath = new Path(src); FSDataInputStream inStream = null; try { //開啟輸入流 inStream = fs.open(readPath); IOUtils.copyBytes(inStream, System.out, 4096, false); } catch (Exception e) { e.printStackTrace(); }finally{ IOUtils.closeStream(inStream); } } public static void putFlie() throws IOException { //獲取filesystem FileSystem fs=getFs(); //上傳到路徑 File readpath=new File("/opt/app/hadoop/wc.input"); //本地路徑 Path putpath=new Path("/data/put-wc.input"); FileInputStream in=new FileInputStream(readpath); FSDataOutputStream out =fs.create(putpath); try { IOUtils.copyBytes(in, out, 4096, false); } catch (Exception e) { e.printStackTrace(); }finally{ IOUtils.closeStream(in); IOUtils.closeStream(out); } } //上傳檔案 public static void writeFile(String src,String dst) throws IOException{ //獲取filesystem FileSystem fs = getFs(); //本地路徑 File inFile = new File(src); //目標路徑 Path outFile = new Path(dst); FileInputStream inStream = null; FSDataOutputStream outStream = null; try { //開啟輸入流 inStream = new FileInputStream(inFile); //開啟輸出流 outStream = fs.create(outFile); IOUtils.copyBytes(inStream, outStream,4096, false); } catch (Exception e) { // TODO: handle exception }finally{ IOUtils.closeStream(inStream); IOUtils.closeStream(outStream); } } //下載檔案 public static void downLoad(String src,String dst) throws IOException{ FileSystem fs = getFs(); //設定下載地址和目標地址 fs.copyToLocalFile(new Path(src), new Path(dst)); fs.close(); } //重新命名和移動 public static void renameMV(String src,String dst) throws IOException{ FileSystem fs = getFs(); fs.rename(new Path(src), new Path(dst)); fs.close(); } //刪除檔案 public static void delete(String fileName) throws IOException{ FileSystem fs = getFs(); fs.deleteOnExit(new Path(fileName)); } //獲取檔案列表 public static void listFile(String dirName) throws IOException{ FileSystem fs = getFs(); FileStatus[] fileStatuses = fs.listStatus(new Path(dirName)); for(FileStatus fileName:fileStatuses){ System.out.println(fileName.getPath().getName()); } } //建立目錄 public static void mkdir() throws IOException { FileSystem fs = getFs(); fs.mkdirs(new Path("/data/bb")); fs.close(); } //建立目錄 public static void deletedir() throws IOException { FileSystem fs = getFs(); fs.delete(new Path("/data/bb"), true); fs.close(); } public static void main(String[] args) throws IOException { } }
二、實現把本地某個目錄下面所有小檔案合併上傳到HDFS檔案系統
package HdfsApi; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; public class PutMerge { public static void main(String[] args) throws IOException { //step 1 : 獲取 FileSystem Configuration conf = new Configuration(); LocalFileSystem localFs = FileSystem.getLocal(conf); FileSystem dfs = FileSystem.get(conf); //step 2 : 設定輸入輸出 in/out Path Path inPath = new Path("/opt/app/hadoop-2.5.0/etc/hadoop"); Path outPath = new Path("/data2/putmerge.xml"); FileStatus[] fileStatuses = localFs.listStatus(inPath); OutputStream outStream = null; InputStream inStream = null; try { //step 4 : 開啟輸出流 outStream = dfs.create(outPath); for(FileStatus fileName:fileStatuses){ //step 3 : 開啟輸入流 inStream = localFs.open(fileName.getPath()); IOUtils.copyBytes(inStream, outStream, 4096, false); IOUtils.closeStream(inStream); System.out.println(fileName.getPath()); } } catch (Exception e) { e.printStackTrace(); }finally{ IOUtils.closeStream(outStream); System.out.println("PutMerge Success"); } } }