hadoop hdfs 上傳下載檔案
阿新 • • 發佈:2019-02-14
上傳檔案:
package uploadfile; import java.io.*; import java.net.URI; import java.util.Date; import org.apache.hadoop.fs.*; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.conf.Configuration; public class UploadFile { public static void main(String[] args) { try{ long begin = System.currentTimeMillis(); System.out.println("*******************開始 時間 " + new Date() + "************************"); String localSrc = args[0]; //D://cloudra cdh4.txt String dst=args[1]; System.out.println(localSrc); System.out.println(dst); InputStream in = new BufferedInputStream(new FileInputStream(localSrc)); // byte[] a=new byte[1024]; // if(in.read(a)!=-1){ // String as = new String(a); // System.out.println(as); // } Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(dst),conf); OutputStream out = fs.create(new Path(dst)); IOUtils.copyBytes(in,out,4096,true); long end = System.currentTimeMillis(); long excutetime = end - begin ; System.out.println("*******************結束 時間 " + new Date() + "************************"); System.out.println(); System.out.println("===================="); System.out.println("消耗時間為:" + excutetime + " 毫秒"); System.out.println(" " + excutetime/1000 + "秒"); System.out.println(" " + excutetime/60000 + "分"); System.out.println("===================="); System.out.println(); }catch(Exception e){ e.printStackTrace(); } } }
下載檔案:
package uploadfile; import java.io.BufferedOutputStream; import java.io.FileOutputStream; import java.io.OutputStream; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; /** * * * * * *Description: 檢視Hadoop檔案系統中的檔案,利用hadoop FileSystem介面中的FSDataInputStream * * FSDataInputStream還具有流定位的能力,可以從檔案的任意位置開始讀取 * * * * @author charles.wang * * @created May 26, 2012 12:28:49 PM * */ public class DownloadFile { /** * @param args */ public static void main(String[] args) throws Exception{ //第一個引數傳遞進來的是Hadoop檔案系統中的某個檔案的URI,以hdfs://ip 的theme開頭 String uri = args[0]; //讀取Hadoop檔案系統的配置 Configuration conf = new Configuration(); conf.set("Hadoop.job.ugi", "hadoop-user,hadoop-user"); //FileSystem是使用者操作HDFS的核心類,它獲得URI對應的HDFS檔案系統 FileSystem fs = FileSystem.get(URI.create(uri),conf); FSDataInputStream in = null; try{ //實驗一:輸出全部檔案內容 System.out.println("實驗一:輸出全部檔案內容"); //讓FileSystem開啟一個uri對應的FSDataInputStream檔案輸入流,讀取這個檔案 in = fs.open( new Path(uri) ); //用Hadoop的IOUtils工具方法來讓這個檔案的指定位元組複製到標準輸出流上 //IOUtils.copyBytes(in, System.out,50,false); //輸出到控制檯上 OutputStream out = new BufferedOutputStream(new FileOutputStream("D:\\aassdcc.txt")); IOUtils.copyBytes(in, out,4096,true); System.out.println("Download success!!!"); // //實驗二:展示FSDataInputStream檔案輸入流的流定位能力,用seek進行定位 // System.out.println("實驗二:展示FSDataInputStream檔案輸入流的流定位能力,用seek進行定位"); // // //假如我們要吧檔案輸出3次 // //第一次輸入全部內容,第二次輸入從第20個字元開始的內容,第3次輸出從第40個字元開始的內容 // for (int i=1;i<=3;i++){ // in.seek(0+20*(i-1)); // System.out.println("流定位第 "+i+" 次:" ); // IOUtils.copyBytes(in, System.out,4096,false); // // } }catch(Exception e){ e.printStackTrace(); }finally{ IOUtils.closeStream(in); } } }
總結:
這裡採用的方法是通過 FsUrlStreamHandlerFactory 例項呼叫URL 中的setURLStreamHandlerFactory 方法。由於JAVA 虛擬機器只能用一次上述方法,因此 通常在靜態方法中呼叫上述方法。這個限制意味首如果程式的其他元件--如不受你控制的第三方元件--已經聲明瞭一個URL例項,你將無法再使用上述方法從Hadoop 中讀取資料。
我們可以呼叫Hadoop 中簡潔的IOUtils 類,並在finally子句中關閉資料流,同時也可以在輸入流和輸出流之間複製資料。copyBytes方法的最後兩個引數,第一個用於設定複製的緩衝區大小,第二個用於設定複製結束後是否關閉資料流。