hadoop實戰(三) 使用HDFS操作檔案
一、hadoop簡介
HADOOP叢集具體來說包含兩個叢集:HDFS叢集和YARN叢集,兩者邏輯上分離,但物理上常在一起
HDFS叢集:負責海量資料的儲存,叢集中的角色主要有 NameNode / DataNode
YARN叢集:負責海量資料運算時的資源排程,叢集中的角色主要有 ResourceManager /NodeManager
二、hdfs的工作機制
(一)、概述
1. HDFS叢集分為兩大角色:NameNode、DataNode
2. NameNode負責管理整個檔案系統的元資料
3. DataNode 負責管理使用者的檔案資料塊
4. 檔案會按照固定的大小(blocksize)切成若干塊後分布式儲存在若干臺datanode上
5. 每一個檔案塊可以有多個副本,並存放在不同的datanode上
6. Datanode會定期向Namenode彙報自身所儲存的檔案block資訊,而namenode則會負責保持檔案的副本數量
7. HDFS的內部工作機制對客戶端保持透明,客戶端請求訪問HDFS都是通過向namenode申請來進行
(二)、HDFS寫資料流程
1、概述
客戶端要向HDFS寫資料,首先要跟namenode通訊以確認可以寫檔案並獲得接收檔案block的datanode,然後,客戶端按順序將檔案逐個block傳遞給相應datanode,並由接收到block的datanode負責向其他datanode複製block的副本2、詳細步驟解析
1、根namenode通訊請求上傳檔案,namenode檢查目標檔案是否已存在,父目錄是否存在 2、namenode返回是否可以上傳
3、client請求第一個 block該傳輸到哪些datanode伺服器上
4、namenode返回3個datanode伺服器ABC
5、client請求3臺dn中的一臺A上傳資料(本質上是一個RPC呼叫,建立pipeline),A收到請求會繼續呼叫B,然後B呼叫C,將整個pipeline建立完成,逐級返回客戶端
6、client開始往A上傳第一個block(先從磁碟讀取資料放到一個本地記憶體快取),以packet為單位,A收到一個packet就會傳給B,B傳給C;A每傳一個packet會放入一個應答佇列等待應答
7、當一個block傳輸完成之後,client再次請求namenode上傳第二個block的伺服器。
(三)、HDFS讀資料流程
1、概述
客戶端將要讀取的檔案路徑傳送給namenode,namenode獲取檔案的元資訊(主要是block的存放位置資訊)返回給客戶端,客戶端根據返回的資訊找到相應datanode逐個獲取檔案的block並在客戶端本地進行資料追加合併從而獲得整個檔案2、詳細步驟解析
1、跟namenode通訊查詢元資料,找到檔案塊所在的datanode伺服器 2、挑選一臺datanode(就近原則,然後隨機)伺服器,請求建立socket流
3、datanode開始傳送資料(從磁盤裡面讀取資料放入流,以packet為單位來做校驗)
4、客戶端以packet為單位接收,現在本地快取,然後寫入目標檔案
(四)、NAMENODE工作機制
1、NAMENODE職責
負責客戶端請求的響應
元資料的管理(查詢,修改)
2、元資料管理
namenode對資料的管理採用了三種儲存形式: (1)記憶體元資料(NameSystem)
(2)磁碟元資料映象檔案
(3)資料操作日誌檔案(可通過日誌運算出元資料)
(1)元資料儲存機制
A、記憶體中有一份完整的元資料(記憶體meta data) B、磁碟有一個“準完整”的元資料映象(fsimage)檔案(在namenode的工作目錄中) C、用於銜接記憶體metadata和持久化元資料映象fsimage之間的操作日誌(edits檔案)注:當客戶端對hdfs中的檔案進行新增或者修改操作,操作記錄首先被記入edits日誌檔案中,當客戶端操作成功後,相應的元資料會更新到記憶體meta.data中(2)元資料手動檢視
可以通過hdfs的一個工具來檢視edits中的資訊
bin/hdfs oev -i edits -o edits.xml
bin/hdfs oiv -i fsimage_0000000000000000087 -p XML -o fsimage.xml
(3)元資料的checkpoint
每隔一段時間,會由secondary namenode將namenode上積累的所有edits和一個最新的fsimage下載到本地,並載入到記憶體進行merge(這個過程稱為checkpoint) ##### checkpoint操作的觸發條件配置引數:dfs.namenode.checkpoint.check.period=60 #檢查觸發條件是否滿足的頻率,60秒
dfs.namenode.checkpoint.dir=file://${hadoop.tmp.dir}/dfs/namesecondary
#以上兩個引數做checkpoint操作時,secondary namenode的本地工作目錄
dfs.namenode.checkpoint.edits.dir=${dfs.namenode.checkpoint.dir}
dfs.namenode.checkpoint.max-retries=3 #最大重試次數
dfs.namenode.checkpoint.period=3600 #兩次checkpoint之間的時間間隔3600秒
dfs.namenode.checkpoint.txns=1000000 #兩次checkpoint之間最大的操作記錄
checkpoint的附帶作用:
namenode和secondary namenode的工作目錄儲存結構完全相同,所以,當namenode故障退出需要重新恢復時,可以從secondary namenode的工作目錄中將fsimage拷貝到namenode的工作目錄,以恢復namenode的元資料(五)、DATANODE的工作機制
1、概述
(1)Datanode工作職責:
儲存管理使用者的檔案塊資料
定期向namenode彙報自身所持有的block資訊(通過心跳資訊上報)(這點很重要,因為,當叢集中發生某些block副本失效時,叢集如何恢復block初始副本數量的問題)
<property>
<name>dfs.blockreport.intervalMsec</name>
<value>3600000</value>
<description>Determines block reporting interval in milliseconds.</description>
</property>
(2)Datanode掉線判斷時限引數:
datanode程序死亡或者網路故障造成datanode無法與namenode通訊,namenode不會立即把該節點判定為死亡,要經過一段時間,這段時間暫稱作超時時長。HDFS預設的超時時長為10分鐘+30秒。如果定義超時時間為timeout,則超時時長的計算公式為:timeout = 2 * heartbeat.recheck.interval + 10 * dfs.heartbeat.interval。
而預設的heartbeat.recheck.interval 大小為5分鐘,dfs.heartbeat.interval預設為3秒。
需要注意的是hdfs-site.xml 配置檔案中的heartbeat.recheck.interval的單位為毫秒,dfs.heartbeat.interval的單位為秒。所以,舉個例子,如果heartbeat.recheck.interval設定為5000(毫秒),dfs.heartbeat.interval設定為3(秒,預設),則總的超時時間為40秒。
<property>
<name>heartbeat.recheck.interval</name>
<value>2000</value>
</property>
<property>
<name>dfs.heartbeat.interval</name>
<value>1</value>
</property>
(六)、HDFS的java操作
hdfs在生產應用中主要是客戶端的開發,其核心步驟是從hdfs提供的api中構造一個HDFS的訪問客戶端物件,然後通過該客戶端物件操作(增刪改查)HDFS上的檔案
首先是依賴:
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.1</version>
</dependency>
<!--<dependency>-->
<!--<groupId>org.springframework.boot</groupId>-->
<!--<artifactId>spring-boot-starter-test</artifactId>-->
<!--<scope>test</scope>-->
<!--</dependency>-->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
</dependency>
常見檔案操作:
package com.hadoop.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.junit.Before;
import org.junit.Test;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
public class HdfsClient {
FileSystem fs = null;
@Before
public void init() throws Exception {
// 構造一個配置引數物件,設定一個引數:我們要訪問的hdfs的URI
// 從而FileSystem.get()方法就知道應該是去構造一個訪問hdfs檔案系統的客戶端,以及hdfs的訪問地址
// new Configuration();的時候,它就會去載入jar包中的hdfs-default.xml
// 然後再載入classpath下的hdfs-site.xml
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://mini1:9000");
/**
* 引數優先順序: 1、客戶端程式碼中設定的值 2、classpath下的使用者自定義配置檔案 3、然後是伺服器的預設配置
*/
conf.set("dfs.replication", "3");
// 獲取一個hdfs的訪問客戶端,根據引數,這個例項應該是DistributedFileSystem的例項
// fs = FileSystem.get(conf);
// 如果這樣去獲取,那conf裡面就可以不要配"fs.defaultFS"引數,而且,這個客戶端的身份標識已經是hadoop使用者
fs = FileSystem.get(new URI("hdfs://mini1:9000"), conf, "root");
}
/**
* 往hdfs上傳檔案
*
* @throws Exception
*/
@Test
public void testAddFileToHdfs() throws Exception {
// 要上傳的檔案所在的本地路徑
Path src = new Path("/home/youjin/ruanjian/apache-tomcat-8.0.53.tar.gz");
// 要上傳到hdfs的目標路徑
Path dst = new Path("/apache-tomcat-8.0.53.tar.gz");
fs.copyFromLocalFile(src, dst);
fs.close();
}
/**
* 從hdfs中複製檔案到本地檔案系統
*
* @throws IOException
* @throws IllegalArgumentException
*/
@Test
public void testDownloadFileToLocal() throws IllegalArgumentException, IOException {
fs.copyToLocalFile(new Path("/aaa"), new Path("/home/youjin/test"));
fs.close();
}
@Test
public void testMkdirAndDeleteAndRename() throws IllegalArgumentException, IOException {
// 建立目錄
fs.mkdirs(new Path("/a1/b1/c1"));
// 刪除資料夾 ,如果是非空資料夾,引數2必須給值true
fs.delete(new Path("/aaa"), true);
// 重新命名檔案或資料夾
fs.rename(new Path("/a1"), new Path("/a2"));
}
/**
* 檢視目錄資訊,只顯示檔案
*
* @throws IOException
* @throws IllegalArgumentException
* @throws FileNotFoundException
*/
@Test
public void testListFiles() throws FileNotFoundException, IllegalArgumentException, IOException {
// 思考:為什麼返回迭代器,而不是List之類的容器
RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);
while (listFiles.hasNext()) {
LocatedFileStatus fileStatus = listFiles.next();
System.out.println(fileStatus.getPath().getName());
System.out.println(fileStatus.getBlockSize());
System.out.println(fileStatus.getPermission());
System.out.println(fileStatus.getLen());
BlockLocation[] blockLocations = fileStatus.getBlockLocations();
for (BlockLocation bl : blockLocations) {
System.out.println("block-length:" + bl.getLength() + "--" + "block-offset:" + bl.getOffset());
String[] hosts = bl.getHosts();
for (String host : hosts) {
System.out.println(host);
}
}
System.out.println("--------------為angelababy列印的分割線--------------");
}
}
/**
* 檢視檔案及資料夾資訊
*
* @throws IOException
* @throws IllegalArgumentException
* @throws FileNotFoundException
*/
@Test
public void testListAll() throws FileNotFoundException, IllegalArgumentException, IOException {
FileStatus[] listStatus = fs.listStatus(new Path("/"));
String flag = "d-- ";
for (FileStatus fstatus : listStatus) {
if (fstatus.isFile()) flag = "f-- ";
System.out.println(flag + fstatus.getPath().getName());
}
}
}
通過流的方式訪問hdfs:
package com.hadoop.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IOUtils;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
public class StreamAccess {
FileSystem fs = null;
@Before
public void init() throws Exception {
Configuration conf = new Configuration();
fs = FileSystem.get(new URI("hdfs://mini1:9000"), conf, "root");
}
@Test
public void testDownLoadFileToLocal() throws IllegalArgumentException, IOException {
//先獲取一個檔案的輸入流----針對hdfs上的
FSDataInputStream in = fs.open(new Path("/apache-tomcat-8.0.53.tar.gz"));
//再構造一個檔案的輸出流----針對本地的
FileOutputStream out = new FileOutputStream(new File("/home/youjin/test/apache-tomcat-8.0.53.tar.gz"));
//再將輸入流中資料傳輸到輸出流
IOUtils.copyBytes(in, out, 4096);
}
/**
* hdfs支援隨機定位進行檔案讀取,而且可以方便地讀取指定長度
* 用於上層分散式運算框架併發處理資料
* @throws IllegalArgumentException
* @throws IOException
*/
@Test
public void testRandomAccess() throws IllegalArgumentException, IOException{
//先獲取一個檔案的輸入流----針對hdfs上的
FSDataInputStream in = fs.open(new Path("/iloveyou.txt"));
//可以將流的起始偏移量進行自定義
in.seek(22);
//再構造一個檔案的輸出流----針對本地的
FileOutputStream out = new FileOutputStream(new File("/home/youjin/test/iloveyou.line.2.txt"));
IOUtils.copyBytes(in,out,19L,true);
}
/**
* 顯示hdfs上檔案的內容
* @throws IOException
* @throws IllegalArgumentException
*/
@Test
public void testCat() throws IllegalArgumentException, IOException{
FSDataInputStream in = fs.open(new Path("/iloveyou.txt"));
IOUtils.copyBytes(in, System.out, 1024);
}
/**
* 獲取一個檔案的所有block位置資訊,然後讀取指定block中的內容
* @throws IllegalArgumentException
* @throws IOException
*/
@Test
public void testCat1() throws IllegalArgumentException, IOException{
FSDataInputStream in = fs.open(new Path("/wordcount/input/somewords.txt"));
//拿到檔案資訊
FileStatus[] listStatus = fs.listStatus(new Path("/wordcount/input/somewords.txt"));
//獲取這個檔案的所有block的資訊
BlockLocation[] fileBlockLocations = fs.getFileBlockLocations(listStatus[0], 0L, listStatus[0].getLen());
//第一個block的長度
long length = fileBlockLocations[0].getLength();
//第一個block的起始偏移量
long offset = fileBlockLocations[0].getOffset();
System.out.println(length);
System.out.println(offset);
//獲取第一個block寫入輸出流
// IOUtils.copyBytes(in, System.out, (int)length);
byte[] b = new byte[4096];
FileOutputStream os = new FileOutputStream(new File("/home/youjin/test/block0"));
while(in.read(offset, b, 0, 4096)!=-1){
os.write(b);
offset += 4096;
if(offset>=length) return;
};
os.flush();
os.close();
in.close();
}
}