使用JavaAPI進行HDFS檔案系統的增刪改查
0、事前準備。
0.1 完成HADOOP的叢集安裝,完成HDFS的配置和初始化。
0.2 配置好Linux或Win環境下的Java開發環境。
本文環境為WIN10 + Java1.8 + Eclipse。
0.3 有一定Java基礎。
1、HDFS的基本工作原理簡介。
你現在需要知道的是:
1、HDFS是一套集群系統。叢集內機器數量可多可少,由Hadoop框架約束。
2、HDFS系統至少有1個Namenode機(可認為是主機),以及若干Datanode機(可認為是算機)。
3、HDFS系統的最基本功能是分散式儲存檔案,其命令與Shell命令類似。如 ls,cat, mkdir,rm,mv等等。
4、HDFS系統在正常情況下不會隨便被你玩垮!因為存在裡面的檔案通常有2個甚至更多的備份!
2、HDFS系統的配置載入順序:
①hadoop.hdfs程式包中hdfs-default.xml預設配置
②Java專案中的hdfs-default.xml配置
③Java檔案中,通過Hadoop客戶端提供的API進行改寫。例如def.replication配置:
//Configuration為org.apache.hadoop.conf.Configuration Configuration conf = new Configuration(); conf.set("dfs.replication", "5");
三種配置由①→②→③依次讀取,優先度依次升高(即③最高)。
3、HDFS的Jar包依賴匯入
對於Linux開發者而言,從hadoop.apache.org下載的binary版本,直接匯入Java專案即可。
對於Windows開發者,hadoop專案顯得並不友好。
我們首先需要下載所需hadoop版本的source版(例如hadoop-2.6.5-src.tar.gz),然後進行source版本本地化編譯。
個人推薦根據自身的作業系統和所需Hadoop版本情況,從網上直接下載編譯好的程式包。
附win10 x64環境編譯好的hadoop_2.6.5網盤:
連結:https://pan.baidu.com/s/1Q4DSur-lvnuX4UJCqA-URA 密碼:ao8g
4、HDFS系統通過JavaAPI進行操作
先附上程式碼:
package cn.test.hadoop.hdfs;
import java.io.IOException;
import java.net.URI;
import java.util.Iterator;
import java.util.Map.Entry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.junit.Before;
import org.junit.Test;
public class UserTestHdfsClient {
Configuration conf = null;
FileSystem fs = null;
@Before
public void init() throws Exception {
conf = new Configuration();
// conf.set("dfs.replication", "5");
// fs = FileSystem.get(conf);
fs = FileSystem.get(new URI("hdfs://hadoop.mini01:9000"), conf, "hadoop");
}
/**
* 遍歷conf中所有屬性
*/
@Test
public void testConf() {
Iterator<Entry<String, String>> it = conf.iterator();
while(it.hasNext()) {
Entry<String, String> entry = it.next();
System.out.println("The "+ entry.getKey()+" is :" + entry.getValue() + ".\n");
}
}
/**
* 刪除某目錄下所有檔案
*/
@Test
public void testDelete() throws Exception {
boolean delete = fs.delete(new Path("/testMkdirs"), true);
System.out.println(delete);
}
/**
* 顯示檔案清單
* @throws Exception
*/
@Test
public void testLs() throws Exception {
RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);
while(listFiles.hasNext()){
LocatedFileStatus fileStatus = listFiles.next();
System.out.println("FileName: " + fileStatus.getPath().getName());
System.out.println("FilePath: " + fileStatus.getPath());
System.out.println("FileOwner: " + fileStatus.getOwner());
System.out.println(fileStatus.getReplication());
System.out.println("********************************");
}
}
/**
* 上傳檔案
* @throws IllegalArgumentException
* @throws IOException
*/
public void testUpload() throws IllegalArgumentException, IOException {
fs.copyFromLocalFile(new Path("local"), new Path("dst"));
fs.close();
}
/**
* 下載
* @throws IOException
* @throws IllegalArgumentException
*/
public void testDownload() throws IllegalArgumentException, IOException {
fs.copyToLocalFile(new Path("src"), new Path("dst"));
fs.close();
}
}
須注意的點:
1、增刪查均可通過API簡單實現,但改檔案並不能直接通過HDFS實現。在HDFS檔案系統中,改檔案的方式是Append。
2、程式碼中的Path需要自行修改。
3、進行執行前,請確保以有權身份登入HDFS系統。
另附流方式實現HDFS檔案操作的程式碼:
package cn.test.hadoop.hdfs;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Before;
import org.junit.Test;
/**
* 用stream方式管理hdfs的檔案,
* 可以實現讀取指定偏移量範圍的檔案內容
* @author Henry
*
*/
public class HdfsStreamAccess {
FileSystem fs = null;
Configuration conf = null;
@Before
public void init() throws Exception {
conf = new Configuration();
fs = FileSystem.get(new URI("hdfs://hadoop.mini01:9000"),conf,"hadoop");
}
/**
* 從本地寫到hdfs
* @throws Exception
*/
@Test
public void testUpload() throws Exception {
FSDataOutputStream outputStream = fs.create(new Path("/testabc.txt"), true);
FileInputStream inputStream = new FileInputStream("d:/kms10.log");
IOUtils.copy(inputStream, outputStream);
}
@Test
public void testDownload() throws Exception {
FSDataInputStream inputStream = fs.open(new Path("/testabc.txt"));
FileOutputStream outputStream = new FileOutputStream("d:/inputstream.txt");
IOUtils.copy(inputStream, outputStream);
}
/**
* 使用流隨機讀取檔案
* @throws Exception
*/
@Test
public void testRandomAccess() throws Exception {
FSDataInputStream inputStream = fs.open(new Path("/testabc.txt"));
inputStream.seek(12); //此處 “12” 代表從12位元組後開始抓取流。可自行實現隨機讀取。
FileOutputStream outputStream = new FileOutputStream("d:/inputstream2.txt");
IOUtils.copy(inputStream, outputStream);
}
/**
* 從檔案系統進行讀取
* @throws IllegalArgumentException
* @throws IOException
*/
@Test
public void testCat() throws IllegalArgumentException, IOException {
FSDataInputStream inputStream = fs.open(new Path("/testabc.txt"));
IOUtils.copy(inputStream, System.out);
}
}
以上程式碼的執行方式均為Junit直接執行。
5、參考的文件:
連結:https://pan.baidu.com/s/14tdGH--fDjVnV57UV-NSpw 密碼:xeo0