1. 程式人生 > >十三、根據HDFS提供的API,實現檔案上傳、下載、刪除、重新命名、移動

十三、根據HDFS提供的API,實現檔案上傳、下載、刪除、重新命名、移動

一、

根據HDFS提供的API,實現以下功能:
針對檔案: 上傳、下載、刪除、重新命名、移動

package HdfsApi;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

public class HdfsApi {
    //獲取FS
	public static FileSystem getFs() throws IOException{
		//獲取配置檔案
		Configuration conf = new  Configuration();
		//獲取檔案系統
		FileSystem fs = FileSystem.get(conf);
		return fs;
	}
	//讀取檔案
	public static void readFile(String src) throws IOException{
		// 獲取 FileSystem
				FileSystem fs = getFs();
				//讀的路徑
				Path readPath = new Path(src);
				FSDataInputStream inStream = null;
				try {
					//開啟輸入流
					inStream = fs.open(readPath);
					IOUtils.copyBytes(inStream, System.out, 4096, false);
				} catch (Exception e) {
					e.printStackTrace();
				}finally{
					IOUtils.closeStream(inStream);
				}
			}
	public static void putFlie() throws IOException
	{  //獲取filesystem
		FileSystem fs=getFs();
		//上傳到路徑
		File readpath=new File("/opt/app/hadoop/wc.input");
		//本地路徑
		Path putpath=new Path("/data/put-wc.input");
		FileInputStream in=new FileInputStream(readpath);
		FSDataOutputStream out =fs.create(putpath);
		try {
		IOUtils.copyBytes(in, out, 4096, false);

		} catch (Exception e) {
			e.printStackTrace();
		}finally{
			IOUtils.closeStream(in);
			IOUtils.closeStream(out);
		}
	}
	//上傳檔案 
	public static void writeFile(String src,String dst) throws IOException{
		//獲取filesystem
		FileSystem fs = getFs();
		//本地路徑
		File inFile = new File(src);
		//目標路徑
		Path outFile = new Path(dst);
		FileInputStream inStream = null;
		FSDataOutputStream outStream = null;
		try {
			//開啟輸入流
			inStream = new FileInputStream(inFile);
			//開啟輸出流
			outStream = fs.create(outFile);
					
			IOUtils.copyBytes(inStream, outStream,4096, false);
		} catch (Exception e) {
			// TODO: handle exception
		}finally{
			IOUtils.closeStream(inStream);
			IOUtils.closeStream(outStream);
		}
		
	}
	//下載檔案 
		public static void downLoad(String src,String dst) throws IOException{
			FileSystem fs = getFs();
			//設定下載地址和目標地址
			fs.copyToLocalFile(new Path(src), new Path(dst));
			fs.close();
		}
		//重新命名和移動
		public static void renameMV(String src,String dst) throws IOException{
			FileSystem fs = getFs();
			fs.rename(new Path(src), new Path(dst));
			fs.close();
		}
		//刪除檔案
		public static void delete(String fileName) throws IOException{
			FileSystem fs = getFs(); 
			fs.deleteOnExit(new Path(fileName));
		}
		//獲取檔案列表
		public static void listFile(String dirName) throws IOException{
			FileSystem fs = getFs();
			FileStatus[]  fileStatuses = fs.listStatus(new Path(dirName));
				for(FileStatus fileName:fileStatuses){
				System.out.println(fileName.getPath().getName());
			}
		}
		//建立目錄
		public static void mkdir() throws  IOException 
		{
			FileSystem fs = getFs(); 
			fs.mkdirs(new Path("/data/bb"));
			fs.close();
		}
		//建立目錄
				public static void deletedir() throws  IOException 
				{
					FileSystem fs = getFs(); 
				    fs.delete(new Path("/data/bb"), true);
					fs.close();
				}
         
	public static void main(String[] args) throws IOException {
		
		
		
		
		
	}

}

二、實現把本地某個目錄下面所有小檔案合併上傳到HDFS檔案系統

package HdfsApi;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

public class PutMerge {
	public static void main(String[] args) throws IOException {
		//step 1 :  獲取  FileSystem
		Configuration conf = new Configuration();
		LocalFileSystem localFs = FileSystem.getLocal(conf);
		FileSystem dfs = FileSystem.get(conf);
		
		//step 2  : 設定輸入輸出 in/out Path
		Path inPath = new Path("/opt/app/hadoop-2.5.0/etc/hadoop");
		Path outPath = new Path("/data2/putmerge.xml");
		FileStatus[] fileStatuses = localFs.listStatus(inPath);

		OutputStream outStream = null;
		InputStream inStream = null;
		try {
				//step 4 : 開啟輸出流
			outStream = dfs.create(outPath);
			for(FileStatus fileName:fileStatuses){
				//step 3 :  開啟輸入流
				inStream = localFs.open(fileName.getPath());
				IOUtils.copyBytes(inStream, outStream, 4096, false);
				IOUtils.closeStream(inStream);
				System.out.println(fileName.getPath());
			}
			
		} catch (Exception e) {
			e.printStackTrace();
		}finally{
			IOUtils.closeStream(outStream);
			System.out.println("PutMerge Success");
		}
		

	}

}