通過API訪問HDFS
一、通過 java.net.URL
1.在ubuntu下開啟eclipse
2.建立專案
3.匯入hadoop所有jar包
Build Path --->Configure Build Path ---> Add External JARs --->FileSystem --->mnt ---> hgfs --->share for linux --->hadoop2.9.0--->-lib
4.編寫程式碼
package hadoopDemo; import java.io.ByteArrayOutputStream; import java.io.InputStream; import java.net.URL; import org.apache.hadoop.fs.FsUrlStreamHandlerFactory; public class TestFileSystem { static { URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory()); } public static void main(String[] args) throws Exception { String urlString = "hdfs://ubuntucp:8020/test/a.txt"; URL url = new URL(urlString) ; InputStream is = url.openStream(); ByteArrayOutputStream baos = new ByteArrayOutputStream(); byte[] buf = new byte[1024]; int len = 0 ; while((len = is.read(buf)) != -1){ baos.write(buf, 0, len); } byte[] data = baos.toByteArray(); System.out.println(new String(data)); } }
其中
static {
URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
}
是為了讓Java程式能識別Hadoop的 hdfs URL 方案所做的額外工作。
還可以呼叫Hadoop中IOUtils 類
package hadoopDemo; import java.io.ByteArrayOutputStream; import java.io.InputStream; import java.net.URL; import org.apache.hadoop.fs.FsUrlStreamHandlerFactory; import org.apache.hadoop.io.IOUtils; public class TestFileSystem { static { URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory()); } public static void main(String[] args) throws Exception { String urlString = "hdfs://ubuntucp:8020/test/a.txt"; URL url = new URL(urlString) ; InputStream is = url.openStream(); ByteArrayOutputStream baos = new ByteArrayOutputStream(); // byte[] buf = new byte[1024]; // int len = 0 ; // while((len = is.read(buf)) != -1){ // baos.write(buf, 0, len); // } // byte[] data = baos.toByteArray(); // System.out.println(new String(data)); // is.close(); IOUtils.copyBytes(is, baos, 1024); IOUtils.closeStream(is) ; System.out.println(new String(baos.toByteArray())); } }
5.新增log4j的屬性檔案
將其粘到專案的src下
二、通過FileSystem API 讀取資料
1.首先構建單元測試
在已有工程下新建名字為test的 Source Folder 用於存放測試類原始碼(和src並列),並要求包名也相同,然後新建測試類
package hadoopDemo; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.junit.Test; public class TestFileSystemAPI { @Test public void read() throws Exception{ Configuration conf = new Configuration() ; FileSystem fs = FileSystem.get(conf) ; } }
對測試類除錯,發現變數 fs 的值為 LocalFileSystem ,這是因為用的是 /mnt/hgfs/share for linux/hadoop-2.9.0/_lib/hadoop-common-2.9.0.jar 中 core-default.xml 的預設配置。
2.新建名稱為 core-site.xml 的 File
為了便於管理可以在工程下新建 Source Folder ,把 core-site.xml 放入,如下:
core-site.xml 內容如下
<?xml version="1.0"?>
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://ubuntucp:8020/</value>
</property>
</configuration>
再次除錯測試類,發現變數 fs 的值為 DistributedFileSystem ,這樣就可以訪問了
但這樣必須要求配置檔名稱為 core-site.xml ,如果用其他名稱,需要在程式中新增指定配置檔案:
package hadoopDemo;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;
public class TestFileSystemAPI {
@Test
public void read() throws Exception{
Configuration conf = new Configuration() ;//Configuration物件封裝了客戶端或伺服器的配置
//新增指定配置檔案
conf.addResource("my-core-site.xml") ;
FileSystem fs = FileSystem.get(conf) ;
String file = "hdfs://ubuntucp:8020/test/a.txt" ;
Path path = new Path(file) ;
FSDataInputStream in = fs.open(path) ;
IOUtils.copyBytes(in, System.out, 1024, true) ;//1024是緩衝區的長度而不是流的長度,true為關閉流
}
}
直接使用 FileSystem 以標準輸出格式顯示 Hadoop 檔案系統中的檔案,如下:
package hadoopDemo;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;
public class TestFileSystemAPI {
@Test
public void read() throws Exception{
Configuration conf = new Configuration() ;//Configuration物件封裝了客戶端或伺服器的配置
FileSystem fs = FileSystem.get(conf) ;
String file = "hdfs://ubuntucp:8020/test/a.txt" ;
Path path = new Path(file) ;
FSDataInputStream in = fs.open(path) ;
IOUtils.copyBytes(in, System.out, 1024, true) ;//1024是緩衝區的長度而不是流的長度,true為關閉流
}
}
FSDataInputStream 繼承了 DataInputStream 類和實現了 Seekbale 介面,Seekable 介面支援在檔案中找到指定位置,並提供一個查詢當前位置相對於檔案起始位置偏移量(getPos()) 的查詢方法。與 java.io.InputStream 的 skip() 不同,seek() 可以移到檔案中任意一個絕對位置,skip() 則只能相對於當前位置定位到另一個新位置。
//Seekable介面
public interface Seekable {
void seek(long pos) throws IOException ;
long getPos() throws IOException ;
}
通過API實現seek操作:
package hadoopDemo;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;
public class TestFileSystemAPI {
/*
* 通過API實現seek操作
*/
@Test
public void seek() throws Exception{
Configuration conf = new Configuration() ;
FileSystem fs = FileSystem.get(conf) ;
String file = "hdfs://ubuntucp:8020/test/how.txt" ;
Path path = new Path(file) ;
FSDataInputStream in = fs.open(path) ;
IOUtils.copyBytes(in, new FileOutputStream("/home/ubuntu/Downloads/how1.jpg") , 1024, false) ;
in.seek(0) ;//重新定位到起始位置
IOUtils.copyBytes(in, new FileOutputStream("/home/ubuntu/Downloads/how2.jpg") , 1024, true) ;
}
}
FSDataInputStream 類也實現了 PositionedReadable 介面,從一個指定偏移量出讀取檔案的一部分:
//PositionedReadable介面
public interface PositionedReadable {
public int read(long position, byte[] buffer, int offset, int length)
throws IOException;
public void readFully(long position, byte[] buffer, int offset, int length)
throws IOException;
public void readFully(long position, byte[] buffer) throws IOException;
}
由上,read() 方法從檔案的指定 position 處讀取至多為 length 位元組的資料並存入緩衝區 buffer 的指定偏移量 offset 處。返回值是實際讀到的位元組數。readFully() 方法將指定 length 長度的位元組數資料讀到 buffer 中。
3.獲取檔案狀態
將單元測試改為如下:
package hadoopDemo;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;
public class TestFileSystemAPI {
@Test
public void read() throws Exception{
Configuration conf = new Configuration() ;//Configuration物件封裝了客戶端或伺服器的配置
FileSystem fs = FileSystem.get(conf) ;
String file = "hdfs://ubuntucp:8020/test/a.txt" ;
Path path = new Path(file) ;
FileStatus ft = fs.getFileStatus(path) ;//獲取檔案狀態
System.out.println("塊大小 " + ft.getBlockSize()); //得到塊大小
System.out.println("訪問時間 " + ft.getAccessTime()); //得到訪問時間
System.out.println("組 " + ft.getGroup()); //得到組
System.out.println("檔案長度bytes " + ft.getLen()); //得到長度
System.out.println("修改時間 " + ft.getModificationTime()); //得到修改時間
System.out.println("檔案擁有者 " + ft.getOwner()); //得到檔案擁有者
System.out.println("檔案複製因子 " + ft.getReplication()); //得到檔案複製
}
}
列出檔案目錄:
package hadoopDemo;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;
public class TestFileSystemAPI {
/*
* 列出檔案目錄
*/
@Test
public void listFile() throws Exception{
Configuration conf = new Configuration() ;//Configuration物件封裝了客戶端或伺服器的配置
FileSystem fs = FileSystem.get(conf) ;
String file = "hdfs://ubuntucp:8020/test/" ;
Path path = new Path(file) ;
FileStatus[] fst = fs.listStatus(path) ;
System.out.println("一種方法");
for(FileStatus ft : fst){
System.out.println(ft.getPath() + ": isFile = " + ft.isFile());
}
//工具類,直接將FileStatus[]陣列提取資料形成Path[]陣列,可以替代ft.getPath()的for迴圈
System.out.println("另一種方法");
Path[] listesPaths = FileUtil.stat2Paths(fst) ;
for(Path p : listesPaths){
System.out.println(p);
}
}
}
結果如下
4.獲取塊資訊
將單元測試改為如下:
package hadoopDemo;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;
public class TestFileSystemAPI {
@Test
public void read() throws Exception{
Configuration conf = new Configuration() ;//Configuration物件封裝了客戶端或伺服器的配置
FileSystem fs = FileSystem.get(conf) ;
String file = "hdfs://ubuntucp:8020/test/hadoop-2.9.0.tar.gz" ;
Path path = new Path(file) ;
//得到指定路徑下的檔案的狀態
FileStatus ft = fs.getFileStatus(path) ;//獲取檔案狀態,FileStatus相當於檔案或者目錄
//得到指定檔案狀態的塊位置資訊集合
BlockLocation[] location = fs.getFileBlockLocations(ft, 0, ft.getLen()) ;//一個檔案被切割成兩塊,則BlockLocation[]就有兩個BlockLocation元素
for(BlockLocation block : location){
System.out.println(block.getHosts()) ;//之所以getHosts()是String[],是因為每一個塊的副本在不同的主機上
}
}
}
除錯location的值如下:
可以看到location包含了3塊,每塊中含有hosts(主機名稱),length,offset(偏移量),names(datanode的遠端通訊rpc地址) 等資訊
在storageids中有
和 datanode 中 ~/hadoop/dfs/data/current 的VERSION 中的storageID 相同:
5.通過 API 實現檔案上傳(寫入資料)
package hadoopDemo;
import java.io.FileInputStream;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;
public class TestFileSystemAPI {
/*
* 通過API實現檔案上傳
*/
@Test
public void putFile() throws Exception{
Configuration conf = new Configuration() ;
FileSystem fs = FileSystem.get(conf) ;
String file = "hdfs://ubuntucp:8020/test/how.txt" ;
Path path = new Path(file) ;
FSDataOutputStream out = fs.create(path) ;//建立檔案系統資料輸出流用來寫入檔案
IOUtils.copyBytes(new FileInputStream("/home/ubuntu/Downloads/bizhi.jpg"), out, 1024) ;
}
}
使用 append() 方法在一個現有檔案末尾追加資料:
package hadoopDemo;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;
public class TestFileSystemAPI {
/*
* 使用append()方法在一個現有檔案末尾追加資料
*/
@Test
public void append() throws Exception{
Configuration conf = new Configuration() ;
FileSystem fs = FileSystem.get(conf) ;
String file = "hdfs://ubuntucp:8020/test/a.txt" ;//檔案已存在
Path path = new Path(file) ;
FSDataOutputStream out = fs.append(path) ;
out.writeChars("I miss you !") ;
out.close() ;
}
}
6.檔案副本數和塊大小修改
首先要在叢集中修改最小塊限制:
1).進入 /soft/hadoop/etc/hadoop_cluster ,修改 hdfs-site.xml ,新增如下內容(把塊最小限制改為10K):
<property>
<name>dfs.namenode.fs-limits.min-block-size</name>
<value>10240</value>
</property>
2).將修改後的 hdfs-site.xml 分發給各主機
3).停掉叢集重新開啟:
start-dfs.sh
然後可以通過API實現副本數以及塊大小的修改:
預設配置請參考 F:\share for linux\hadoop-2.9.0\_conf 中的 hdfs-default.xml
package hadoopDemo;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;
public class TestFileSystemAPI {
/*
* 修改副本書和塊大小
*/
@Test
public void customReplication() throws Exception{
Configuration conf = new Configuration() ;
//set(String,String),修改修改當前會話副本數為4
conf.set("dfs.replication", "" + 4) ;
//修改當前檔案塊大小為50K,但hdfs有最小塊限制,所以要先修改最小塊限制(需在叢集中修改)
conf.set("dfs.blocksize", "" + (1024*50)) ;
FileSystem fs = FileSystem.get(conf) ;
String file = "hdfs://ubuntucp:8020/test/modify.txt" ;
Path path = new Path(file) ;
FSDataOutputStream out = fs.create(path) ;//建立檔案系統資料輸出流用來寫入檔案
IOUtils.copyBytes(new FileInputStream("/home/ubuntu/Downloads/bizhi.jpg"), out, 1024) ;
}
}
7.檔案通配及過濾
Hadoop為執行通配提供了兩個FileSystem方法:
public FileStatus[] globStatus(Path pathPattern) throws IOException
public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throws IOException
globStatus() 方法返回路徑格式與指定模式匹配的所有 FileStatus 物件組成的陣列。PathFilter 命令作為可選項可以進一步匹配結果進行限制。
萬用字元及其含義:
萬用字元 | 匹配 |
* | 代表0到多個字元 |
? | 代表單一字元 |
[ ab ] | 代表字元型別,匹配{a,b}中的一個字元 |
[ ^ab ] | 代表不是{a,b}中的一個字元 |
[ a-b ] | 代表匹配一個a到b之間的字元包括ab,ASCII程式碼在a-b之間的 |
[ ^a-b] | 代表不在a到b之間的字元包括ab |
{a,b} | 代表匹配a或b的一個語句 |
\c | 代表轉義字元匹配原字元c |
例項:
/* | /2007 /2008 |
/*/* | /2007/12 /2008/01 |
/200? | /2007 /2008 |
/200[78] | /2007 /2008 |
/200[7-8] | /2007 /2008 |
萬用字元模式並不能總能夠精確地描述想要訪問的檔案集,比如使用通配格式排除一個特定的檔案就不太可能。FileSystem 中的 listStatus() 和 globStatus() 方法提供了可選的 PathFilter 物件,從而控制萬用字元:
package org.apache.hadoop.fs;
public interface PathFilter {
boolean accept(Path path);
}
範例 PathFilter 用於排除匹配正則表示式的路徑:
首先在HDFS中建立相關目錄:
hdfs dfs -mkdir -p /test/2007/12/30 /test/2007/12/31 /test/2008/01/01 /test/2008/01/02
接著定義排他性路徑過濾的類 RegexExcludePathFilter :
package hadoopDemo.pathFilter;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
/*
* 排他性路徑過濾
*/
public class RegexExcludePathFilter implements PathFilter {//實現了PathFilter介面
private String regexp ;
public RegexExcludePathFilter(String regexp) {
this.regexp = regexp;
}
public boolean accept(Path path) {
return !path.toString().matches(regexp);
}
}
然後在單元測試中呼叫該類:
package hadoopDemo;
import hadoopDemo.pathFilter.RegexExcludePathFilter;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;
public class TestFileSystemAPI {
/*
* 通過PathFilter應用正則表示式對路徑過濾
*/
@Test
public void pathFilter() throws Exception{
Configuration conf = new Configuration() ;
FileSystem fs = FileSystem.get(conf) ;
FileStatus[] ft = fs.globStatus(new Path("/test/2007/*/*"), new RegexExcludePathFilter("^.*/2007/12/31$")) ;
//直接將FileStatus[]陣列提取資料形成Path[]陣列,可以替代ft.getPath()的for迴圈
Path[] path = FileUtil.stat2Paths(ft) ;
for(Path p : path){
System.out.println(p);
}
}
}
結果為:
8.刪除資料
使用 FileSystem 的 delete() 方法可以永久性刪除檔案或目錄:
public boolean delete(Path f , boolean recursive) throws IOException
如果 f 是一個檔案或空目錄,那麼 recursive 的值就會被忽略,直接刪除。只有在 recursive 值為 true 時,非空目錄及其內容才會被刪除。