1. 程式人生 > >hdfs api讀寫文寫件個人練習

hdfs api讀寫文寫件個人練習

看下hdfs的讀寫原理,主要是開啟FileSystem,獲得InputStream or OutputStream;

那麼主要用到的FileSystem類是一個實現了檔案系統的抽象類,繼承來自org.apache.hadoop.conf.Configured,並且實現了Close able介面,可以適用於如本地檔案系統file://,ftp,hdfs等多種檔案系統,所以呢

若是自己要實現一個系統可以通過繼承這個類,做出相應的配置,並且實現相應的抽象方法;

public abstract class FileSystem extends Configured implements Closeable {
public static FileSystem get(Configuration conf) throws IOException {
return get(getDefaultUri(conf), conf);
}
public static URI getDefaultUri(Configuration conf) {
return URI.create(fixName(conf.get(FS_DEFAULT_NAME_KEY, DEFAULT_FS)));
}
public static FileSystem get(URI uri, Configuration conf) throws IOException {
String scheme = uri.getScheme();
String authority = uri.getAuthority();

if (scheme == null && authority == null) { // use default FS
return get(conf);
}

if (scheme != null && authority == null) { // no authority
URI defaultUri = getDefaultUri(conf);
if (scheme.equals(defaultUri.getScheme()) // if scheme matches default
&& defaultUri.getAuthority() != null) { // & default has authority
return get(defaultUri, conf); // return default
}
}

String disableCacheName www.xycheng178.com= String.format("fs.%s.impl.disable.cache", scheme);
if (conf.getBoolean(disableCacheName, false)) {
return createFileSystem(uri, conf);
}

return CACHE.get(uri, conf);
}
}
從部分原始碼看下,get方法根據conf獲取具體的檔案系統物件,,而get(uri,conf)方法基於uri和conf建立檔案系統物件;

那麼看一個簡單的應用,用java的api開啟一個檔案,並且打印出來

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
public class OpenMethod {
public static void main(String args[]) throws Exception{
Configuration conf = new Configuration();
conf.set("fs.defaultFs", "hdfs://10.192.4.33:9000/");//配置conf
FileSystem fs = FileSystem.get(conf);//根據建立FileSystem物件
Path path = new Path("/home/overshow/hehe.txt");
FSDataInputStream fis = fs.open(path);
byte b[] = new byte[200];
int i = fis.read(b);
System.out.println(new String(b,0,i));

}

}
這裡還有三個需要注意的類,一個是Path,一個是FSDataInputStream,一個是conf

let me show you something

first: Configuration

public class Configuration implements Iterable<Map.Entry<String,String>>,Writable {
public void set(String name, String value) {
set(name, value, www.ysyl157.com null);
}

/**
* Set the <code>value</code> of the <www.mcyllpt.com code>www.dasheng178.com name</code> property. If
* <code>name</code> is deprecated, it also sets the <code>value</code> to
* the keys that replace the deprecated key. Name will be trimmed before put
* into configuration.
*
* @param name property name.
* @param value property value.
* @param source the place that this configuration value came from
* (For debugging).
* @throws IllegalArgumentException when the value or name is null.
*/
}
這個類是作業的配置資訊類,通過Configuration可以實現在多個mapper和多個reducer任務之間共享資訊,所以任何作用的配置資訊必須通過Configuration傳遞,該類實現了Iterable和Writable兩個介面,首先Iterable是迭代出Configuration物件載入到記憶體中的所有name-value鍵值對。而Writable是為了實現hadoop框架要求的序列化,可以將記憶體中的name-value序列化到硬碟;其中的set方法設定Configuration的名稱和連結;

而Path類繼承了fs類,

public class Path implements www.michenggw.com/ Comparable {
private void checkPathArg( String path ) throws IllegalArgumentException {
// disallow construction of a Path from an empty string
if ( path == null ) {
throw new IllegalArgumentException(
"Can not create a Path from a null string");
}
if( path.length() =www.leyouzaixian2.com= 0 ) {
throw new IllegalArgumentException(
"Can not create a Path from an empty string");
}
}

/** Construct a path from a String. Path strings are URIs, but with
* unescaped elements and some additional normalization. */
public Path(String pathString) throws IllegalArgumentException {
checkPathArg( pathString );

// We can't use 'new URI(String)' directly, since it assumes things are
// escaped, which we don't require of Paths.

// add a slash in front of paths with Windows drive letters
if (hasWindowsDrive(pathString) && pathString.charAt(0) != '/') {
pathString = "/" + pathString;
}

// parse uri components
String scheme = null;
String authority = null;

int start = 0;

// parse uri scheme, if any
int colon = pathString.indexOf(':');
int slash = pathString.indexOf('/');
if ((colon != -1) &&
((slash == -1) || (colon < slash))) { // has a scheme
scheme = pathString.substring(0, colon);
start = colon+1;
}

// parse uri authority, if any
if (pathString.startsWith("//", start) &&
(pathString.length()-start > 2)) { // has authority
int nextSlash = pathString.indexOf('/', start+2);
int authEnd = nextSlash > 0 ? nextSlash : pathString.length();
authority = pathString.substring(start+2, authEnd);
start = authEnd;
}

// uri path is the rest of the string -- query & fragment not supported
String path = pathString.substring(start, pathString.length());

initialize(scheme, authority, path, null);
}

/**
* Construct a path from a URI
*/
public Path(URI aUri) {
uri = aUri.normalize(www.hengy178.com);
}
}
好吧,其實就是設定了hdfs的地址;

最後一個類,FSDataInputStream,額,不想看,太長了,

用fs的open方法建立一個FSDataInputStream類的例項,然後簡單來說,讀檔案的流程就是,客戶端到最近的(Namenode說了算)DATa Node上呼叫FSDataInputStream的read方法,通過反覆的呼叫read方法,將資料從DataNode傳遞到客戶端。

值得一提的是,它建立string的那個構造方法,我找了半天原始碼,似乎是這個,

public String(byte bytes[www.mengzhidu178.com], int offset, int length) {
checkBounds(bytes, offset,www.gangchengyuLe178.com length);
this.value = StringCoding.decode(bytes, offset, length);
}
/**
* Constructs a new {@code String} by decoding the specified array of bytes
* using the platform's www.yongxinzaixian.cn default charset. The length of the new {@code
* String} is a function of the charset, and hence may not be equal to the
* length of the byte array.
*
* <p> The behavior of this constructor when the given bytes are not valid
* in the default charset is unspecified. The {@link
* java.nio.charset.CharsetDecoder} class should be used when more control
* over the decoding process is required.
*
* @param bytes
* The bytes to be decoded into characters
*
* @since JDK1.1
**/
#######################################################################

寫檔案流程差不多一致,不過用到的是另外一個輸出流的類FSDataOutputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;



public class Create_Method {
public static void main(String args[]) throws Exception{
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://10.192.4.33:9000/");
FileSystem fs = FileSystem.get(conf);
FSDataOutputStream fos = fs.create(new Path("/words.txt"));
fos.writeChars("hello world");


}
}
當然,fs還有一個用處就是檢視檔案目錄,但是注意它的型別,是一個特殊的可迭代物件;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.RemoteIterator;
public class listStatus {
public static void main(String args[])throws Exception{
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://10.192.4.33:9000/data");
FileSystem fs = FileSystem.get(conf);
Path path = new Path("/");
RemoteIterator<LocatedFileStatus> list = fs.listFiles(path, true);
while(list.hasNext()) {
System.out.println(list.next());
}
}
}
看下listFiles方法的原始碼

public RemoteIterator<LocatedFileStatus> listFiles(
final Path f, final boolean recursive)
throws FileNotFoundException, IOException {
return new RemoteIterator<LocatedFileStatus>() {
private Stack<RemoteIterator<LocatedFileStatus>> itors =
new Stack<RemoteIterator<LocatedFileStatus>>();
private RemoteIterator<LocatedFileStatus> curItor =
listLocatedStatus(f);
private LocatedFileStatus curFile;

@Override
public boolean hasNext() throws IOException {
while (curFile == null) {
if (curItor.hasNext()) {
handleFileStat(curItor.next());
} else if (!itors.empty()) {
curItor = itors.pop();
} else {
return false;
}
}
return true;
}

/**
* Process the input stat.
* If it is a file, return the file stat.
* If it is a directory, traverse the directory if recursive is true;
* ignore it if recursive is false.
* @param stat input status
* @throws IOException if any IO error occurs
*/