1. 程式人生 > >io 流操作hdfs

io 流操作hdfs

hdfs 檔案上傳

本地   -------->    檔案系統物件   -------->    hdfs 檔案系統

           輸入流                               輸出流  

// 將流從本地 上傳到  hdfs 檔案系統。
@Test public void ioPut() throws IOException, InterruptedException, URISyntaxException{ //1 獲取檔案系統物件 Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://192.168.59.11:9000"), conf, "ljs"); //2 輸出流 流到 hdfs FSDataOutputStream fo = fs.create(new
Path("/user/ljs/job")); //3 定義輸出流 來自 本地磁碟 InputStream is = new FileInputStream(new File("d:/job.txt")); //4 流對接 IOUtils.copyBytes(is, fo, 100); IOUtils.closeStream(fs); //5 關閉流 }

 

 

hdfs 檔案下載

本地 <-----------  檔案系統物件  < --------------------  hdfs檔案系統 叢集

           輸出流                                     輸出流

@Test
    public void ioGet() throws IOException, InterruptedException, URISyntaxException{
        
        // 1 建立檔案系統
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(new URI("hdfs://192.168.59.11:9000"),
                conf, "ljs");
        // 2 建立 輸入流  來著 hdfs 檔案系統
        FSDataInputStream fis = fs.open(new Path("/user/ljs/cook.txt"));
        // 3 建立輸出流   送到 本地磁碟。
        OutputStream os = new FileOutputStream(new File("d:/cook.txt"));
        // 4  流對接
        IOUtils.copyBytes(fis, os, 20);
        // 5 關閉流
        IOUtils.closeStream(fis);
     }

 

 

定位檔案讀取

 

讀取第一塊   128M  

    
    // 從hdfs檔案系統中獲取第一塊block (128M)
    @Test
    public void fileSeek1() throws IOException, InterruptedException, URISyntaxException{
        // 1  獲取檔案系統物件
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(new URI("hdfs://192.168.59.11:9000"), conf, "ljs");
        // 2 建立開啟輸入流  
        FSDataInputStream fis = fs.open(new Path("/user/ljs/hadoop-2.7.2.tar.gz"));
        // 3 建立 輸出流 
        OutputStream fos = new FileOutputStream(new File("d:/hadoop-2.7.2.tar.gz.part01"));
        // 4 流對接
        byte [] buff = new byte[1024]; // 1kB;
        for(int i =0; i < 1024*128; ++i){
            fis.read(buff);
            fos.write(buff);    
        }
        // 5 關閉流
        fis.close();
        fos.close();
    }

 

 

從第二塊檔案開始讀取

@Test
	public void fileSeek02() throws IOException, InterruptedException, URISyntaxException{
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(new URI("hdfs://192.168.59.11:9000"), conf, "ljs");
		FSDataInputStream fis = fs.open(new Path("/user/ljs/hadoop-2.7.2.tar.gz"));
		OutputStream fos = new FileOutputStream(new File("d:/hadoop-2.7.2.tar.gz.part02"));
		
		fis.seek(1024*1024*128);
		IOUtils.copyBytes(fis, fos,1024);
		
		fis.close();
		fos.close();
		
	}