(四)DFS檔案操作的原理及程式碼實現
阿新 • • 發佈:2018-12-18
1、檔案操作原理
1.1、下載過程
- Client向namenode發起Open file 請求。目的是獲取指定檔案的輸入流
- namenode收到請求之後,會檢查路徑的合法性,客戶端的操作許可權。如果檢測未通過,則直接報錯返回
- Client也會向namenode發起Getblockloaction請求,獲取指定檔案的元資料資訊。如果上一步檢測通過,則namenode會將元資料資訊封裝到輸入流裡,返回給客戶端
- 客戶端根據元資料資訊,直接去對應的datanode讀取檔案塊,然後下載到本地(建立本地的輸出流,然後做流的對接)
- 關閉流
1.2、上傳過程
- Client向namenode傳送Create file請求,目的是獲取HDFS檔案的輸出流
- namenode收到請求之後,會檢查路徑的合法性,客戶端的操作許可權。如果檢測未通過,則直接報錯返回
- 如果通過檢測,namenode會將檔案的切塊資訊(檔案被切成幾塊,儲存位置,以及副本位置),然後把這些資訊封裝到輸出流裡,返回給客戶端。
- clinet通過輸出流傳送檔案塊。會把檔案快打散成一個個的package,每個package最大為64KB,再傳輸一個個的package。這種機制叫做資料流管道機制,目的是充分利用每臺機器的頻寬,避免網路瓶頸和高延時的連線,最小化推送所有資料的延時
- 通過資料流管道機制,實現資料的傳送和副本的複製。datanode之間會進行package傳輸,儲存副本。每臺datanode伺服器收到資料之後,會向上遊反饋,直到最終反饋給Client,這一輪傳輸才算完成。
1.3、刪除檔案過程
- 客戶端向namenode發現刪除檔案指令,比如:hadoop fs -rm /park01/1.txt
- namenode收到請求之後,會檢查路徑的合法性,客戶端的操作許可權。如果檢測未通過,則直接報錯返回
- 如果檢測通過,會將對應的檔案從元資料中刪除。(注意,此時這個檔案並沒有真正從叢集上被刪除)
- 每臺datanode會定期向namenode傳送心跳,會領取刪除的指令,找到對應的檔案塊,進行檔案塊的刪除。
2、編寫程式碼操作檔案
2.1、準備環境
- eclipse版本: Version: Mars.2 Release (4.5.2)
- eclipse操作hadoop外掛下載:連結:https://pan.baidu.com/s/1Hiw50aG2s-PTNEzD1LHcDw 提取碼:r3ac
- 注意,外掛的版本要和hadoop版本保持一致。並將外掛放在eclipse的plugins目錄下
- 將hadoop安裝包解壓到無中文路徑的目錄
- 啟動eclipse,Window | Preferences | Hadoop Map/Reduce,配置好hadoop的安裝目錄
- 新建hadoop工程 File | New | Others,搜尋Map/Reduce Project。建立後可以發現hadoop依賴的jar包都自動引用過來了。現在就可以開始寫程式碼操作hdfs了。
2.2、編寫程式碼
- 連線HDFS
- 下載
- 上傳
- 刪除
- 建立目錄
- 重新命名
- 查詢目錄下檔案
- 遞迴查詢目錄下檔案
- 獲取檔案塊資訊
package hadoop;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;
public class DfsOperation {
//連線HDFS
@Test
public void testConnectNamenode() throws Exception {
//建立hadoop環境引數物件,通過物件的set方法設定引數
//通過物件配置的引數優先順序>配置檔案的配置
//物件配置的生效範圍是當前工作執行緒。配置檔案的生效範圍是全域性
Configuration conf = new Configuration();
//連線HDFS檔案系統
//Hadoop中有多種檔案系統(FileSystem有很多實現類),其中最重要的事分散式檔案系統
FileSystem fs = FileSystem.get(new URI("hdfs://192.168.80.100:9000"), conf);
}
//下載
@Test
public void getFile() throws Exception {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://192.168.80.100:9000"), conf);
//獲取HDFS上指定檔案的輸入流
InputStream in = fs.open(new Path("/park/test.txt"));
OutputStream out = new FileOutputStream(new File("test.txt"));
//通過hadoop提供的資料流工具,完成資料流的傳輸
IOUtils.copyBytes(in, out, conf);
in.close();
out.close();
fs.close();
}
//上傳
@Test
public void putFile() throws Exception {
Configuration conf = new Configuration();
conf.set("dfs.replication", "1"); //設定副本數量
FileSystem fs = FileSystem.get(new URI("hdfs://192.168.80.100:9000"), conf);
ByteArrayInputStream in = new ByteArrayInputStream("testPutFile!".getBytes());
OutputStream out = fs.create(new Path("/park/putFile.txt"));
IOUtils.copyBytes(in, out, conf);
in.close();
out.close();
fs.close();
}
//刪除
@Test
public void deleteFile() throws Exception {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://192.168.80.100:9000"), conf);
//false只能刪除不為空的目錄;true不為空的目錄也可以刪除
//也可以指定檔案去刪除
fs.delete(new Path("/park"), true);
fs.close();
}
//建立目錄
//fs.mkdirs(new Path("/park02"));
//重新命名
//fs.rename(new Path("/park"), new Path("/park01"));
//查詢目錄下檔案
@Test
public void searchFile() throws Exception {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://192.168.80.100:9000"), conf);
FileStatus[] ls = fs.listStatus(new Path("/result"));
for(FileStatus status : ls) {
System.out.println(status);
}
fs.close();
}
//遞迴查詢目錄下檔案
@Test
public void searchFileByR() throws Exception {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://192.168.80.100:9000"), conf);
//true表示遞迴檢視
RemoteIterator<LocatedFileStatus> rt = fs.listFiles(new Path("/result"), true);
while(rt.hasNext()) {
System.out.println(rt.next());
}
fs.close();
}
//獲取檔案塊資訊
@Test
public void getFileBlocksInfo() throws Exception {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://192.168.80.100:9000"), conf);
//0 檢視塊的起始範圍 ;Integer.MAX_VALUE 檢視塊的結束範圍
//通過這兩個引數控制檢視的塊的範圍
BlockLocation[] data = fs.getFileBlockLocations(new Path("/park/putFile.txt"), 0, Integer.MAX_VALUE);
for(BlockLocation bl : data) {
System.out.println(bl);
//輸出結果如下: 0,12,hadoop1
//0 表示第一塊的起始位置; 12表示塊的實際大小;hadoop01表示儲存的datanode伺服器
}
fs.close();
}
}