1. 程式人生 > >大數據學習---Hadoop的深入學習

大數據學習---Hadoop的深入學習

快照 pipeline 版本 速度 ado 命名空間 cmd rename 測試

Hadoop生態圈

存儲數據HDFS(Hadoop Distributed File System),運行在通用硬件上的分布式文件系統。具有高度容錯性、高吞吐量的的特點。

處理數據MapReduce,它是一種編程模型,Map(映射)和Reduce(歸約),它極大地方便了分布式並行編程,與hdfs的高度融合,它是基於java來進行編程的。

數據倉庫工具Hive,處理結構化SQL查詢功能,將sql語句解釋為MapReduce編程進行數據的處理,只能進行結構化的查詢。

Pig MapReduce之上的高級過程語言,查詢大型的半結構化數據集,處理非結構化數據。

Hadoop的優勢

低成本 基於來源軟件,運行與通用硬件平臺。

高擴展性 在可用的計算機集簇間分配數據並完成計算任務,這些集簇可以方便的擴展到數以千計的節點中。

高效性 在節點之間動態分配計算任務並保證各個節點的動態平衡,處理速度非常快。

高容錯性 自動保存數據的多個副本,並且能夠自動將失敗的任務重新分配。

Hadoop三大核心設計

MapReduce Map:任務分解 Reduce:結果匯總

HDFS NameNode:管理裏文件元數據 DataNode:存儲物理文件 Client:獲取文件的各種API

Hbase 列式內存數據庫,它是一個適用於非結構化數據存儲的數據庫,基於列存儲,不同行可有不同數據列,保留數據多個時間版本。

另: Zookeeper

分布式應用程序協調服務提供一致性服務。

HDFS的體系結構

NameNode:即元數據節點,用來管理文件系統的命名空間,記錄每個文件數據在各個DataNode上的位置和副本信息,協調客戶端對文件的訪問。

元數據文件:VERSION、seen_txid、fsimage_*、fsimage_*.md5、edits_*

DataNode:即數據節點用來存儲數據,負責所在物理節點的存儲管理,一次寫入,多次讀取(不修改)。

文件由數據塊組成,典型的塊大小是64MB,數據塊盡量散布到各個節點。

SecondaryNameNode:NameNode的一個快照,周期性的備份namenode,記錄namenode中的metadata及其他數據,可以用來恢復Namenode

dfs.namenode.name.dir/current/文件夾的幾個文件:

VERSION:存放的是版本信息,文件系統的標示符

seen_txid:存放事務ID的文件,它裏面的數字要與edits的尾數一致,也就是與edits_inprogress_000000000*後邊的數字一致,inprogress是正在使用的edits文件

fsimage_*、fsimage_*.md5、edits_*:存放的是文件的元數據信息。

HDFS文件讀取的過程

HDFS文件讀取的過程如下:

1).使用HDFS提供的客戶端開發庫Client,向遠程的Namenode發起RPC請求;

2).Namenode會視情況返回文件的部分或者全部block列表,對於每個block,Namenode都會返回有該block拷貝的DataNode地址;

3).客戶端開發庫Client會選取離客戶端最接近的DataNode來讀取block;如果客戶端本身就是DataNode,那麽將從本地直接獲取數據.

4).讀取完當前block的數據後,關閉與當前的DataNode連接,並為讀取下一個block尋找最佳的DataNode;

5).當讀完列表的block後,且文件讀取還沒有結束,客戶端開發庫會繼續向Namenode獲取下一批的block列表。

6).讀取完一個block都會進行checksum驗證,如果讀取datanode時出現錯誤,客戶端會通知Namenode,然後再從下一個擁有該block拷貝的datanode繼續讀。

HDFS寫入文件的過程

HDFS寫入文件的過程:

1).使用HDFS提供的客戶端開發庫Client,向遠程的Namenode發起RPC請求;

2).Namenode會檢查要創建的文件是否已經存在,創建者是否有權限進行操作,成功則會為文件創建一個記錄,否則會讓客戶端拋出異常;

4).當客戶端開始寫入文件的時候,開發庫會將文件切分成多個packets,並在內部以數據隊列"data queue"的形式管理這些packets,並向Namenode申請新的blocks,獲取用來存儲replicas的合適的datanodes列表,列表的大小根據在Namenode中對replication的設置而定。

開始以pipeline(管道)的形式將packet寫入所有的replicas中。開發庫把packet以流的方式寫入第一個datanode,該datanode把該packet存儲之後,再將其傳遞給在此pipeline中的下一個datanode,直到最後一個datanode,這種寫數據的方式呈流水線的形式。

5).最後一個datanode成功存儲之後會返回一個ack packet,在pipeline裏傳遞至客戶端,在客戶端的開發庫內部維護著"ack queue",成功收到datanode返回的ack packet後會從"ack queue"移除相應的packet。

6).如果傳輸過程中,有某個datanode出現了故障,那麽當前的pipeline會被關閉,出現故障的datanode會從當前的pipeline中移除,剩余的block會繼續剩下的datanode中繼續以pipeline的形式傳輸,同時Namenode會分配一個新的datanode,保持replicas設定的數量。

HDFS的shell操作

1. 直接訪問Hadoop程序

在/etc/prifile中加入

export HADOOP_HOME=/home/hduser/hadoop (hadoop的安裝目錄)
export PATH=$HADOOP_HOME/bin:$PATH

source /etc/profile 是環境變量生效

2. HDFS命令格式:

HDFS基本命令(在hadoop目錄下執行為例)

bin/hadoop fs -cmd <args>

cmd:具體的操作,基本上與UNIX的命令行相同

<args>:有時需包含參數

例如:bin/hadoop fs -ls /

3. 常見的hadoop命令解析:

put命令只能從本地的文件復制到HDFS上;

get命令將HDFS上的文件復制到本地;

cp命令只能在相同的文件系統上互相復制文件。這三個命令都可以復制多個文件。復制單個文件時,目標路徑可以是目錄也可以是文件,目標路徑是目錄時,文件名不改變,目標路徑是文件時,可以修改文件名;復制多個文件時,目標路徑必須是目錄,文件名不能修改。

copyFromLocal命令,此命令與put命令相似,區別是此命令只能復制一個文件。目標路徑是目錄時,文件名不改變,目標路徑是文件時,可以修改文件名。

copyToLocal命令,此命令與get命令相似,區別是此命令只能復制一個文件。目標路徑是目錄時,文件名不改變,目標路徑是文件時,可以修改文件名。

mv命令只能在相同的文件系統上移動文件,可以移動多個文件。標路徑是目錄時,文件名不改變,目標路徑是文件時,可以修改文件名。

以上所有目錄必須是存在的。

4. 常見的hadoop命令示例

mkdir 使用方法:hadoop fs -mkdir <paths>

      示例:hadoop fs -mkdir /user

ls 列出path目錄下的內容,包括文件名,權限,所有者,大小和修改時間

  hadoop fs -ls /

  hadoop fs -ls -R /

put 使用方法:hadoop fs -put <localsrc> ... <dst>

  從本地文件系統中復制單個或多個源路徑到目標文件系統。也支持從標準輸入中讀取輸入寫入目標文件系統。

  hadoop fs -put localfile /user/ 拷貝localfile文件到hdfs的user目錄下

  hadoop fs -put localfile1 localfile2 /user/ 同時拷貝localfile1和localfile2文件到hdfs的user目錄下

  hadoop fs -put - /user/hadoopfile 在hadoopfile文件裏手工錄入內容(錄入之前hadoopfile文件不存在),按Ctrl+C鍵錄入結束 

get 使用方法:hadoop fs -get [-ignorecrc] [-crc]    <src> <localdst> 復制文件到本地文件系統

  示例:hadoop fs -get /user/hadoop/file localfile

cat 使用方法:hadoop fs -cat URI [URI ...]

  示例:hadoop fs -ls /user/hadoopfile 查看文件內容

rm 使用方法:hadoop fs -rm URI [URI ...]

  刪除指定的文件。只刪除非空目錄和文件。請參考rmr命令了解遞歸刪除。

  示例:hadoop fs -rm /user/read.txt 刪除文件

  示例:hadoop fs -rm  -f /user/read.txt 強制刪除文件

rmr 使用方法:hadoop fs -rmr URI [URI ...]

  示例:hadoop fs -rmr /user/     刪除/usr/下所有的文件和目錄

delete的遞歸版本。

  示例:hadoop fs -rm -r /user/hadoop/dir

getmerge 使用方法:hadoop fs -getmerge <src> <localdst> [addnl]

  接受一個源目錄和一個目標文件作為輸入,並且將源目錄中所有的文件連接成本地目標文件。addnl是可選的,用於指定在每個文件結尾添加一個換行符。

  示例:hadoop fs -getmerge /input2 file2.txt

mv 使用方法:hadoop fs -mv URI [URI ...] <dest>

  將文件從源路徑移動到目標路徑。這個命令允許有多個源路徑,此時目標路徑必須是一個目錄。

  不允許在不同的文件系統間移動文件。

  也可以重命名文件

  示例:hadoop fs -mv /hadoop/file1 /hadoop/file2

      hadoop fs -mv /hadoop/file1.txt  /hadoop/file1.txt.bak.0722  

stat 使用方法:hadoop fs -stat URI [URI ...]

  返回指定路徑的統計信息。

  示例:hadoop fs -stat /input2

tail 使用方法:hadoop fs -tail [-f] URI

  將文件尾部1k字節的呢絨輸出到stdout。支持-f選項,行為和Unix中一致。

  示例:hadoop fs -tail pathname

chmod 使用方法:hadoop fs -chmod [-R] <MODE[,MODE]... | OCTALMODE> URI [URI ...]

  設為所有人皆可讀取

  chmod a+r file1.txt

chown 使用方法:hadoop fs -chown [-R] [OWNER][:[GROUP]] URI [URI]

touchz 使用方法:hadoop fs -touchz URI [URI ...]

  創建一個0字節的空文件。

  示例:hadoop fs -touchz pathname

copyToLocal 使用方法:hadoop fs -copyToLocal [ignorecrc] [-crc] URI <localdst>

  除了限定目標路徑是一個本地文件外,和get命令類似。

copyFromLocal 使用方法:hadoop fs -copyFromLocal <localsrc> URI

  除了限定源路徑是一個本地文件外,和put命令相似。

cp 使用方法:hadoop fs -cp URI [URI ...] <dest>

  將文件從源路徑復制到目標路徑。這個命令允許有多個源路徑,此時目標路徑必須是一個目錄。

  示例: hadoop fs -cp /user/hadoop/file1 /user/hadoop/file2

du 使用方法:hadoop fs -du URI [URI ...]

  顯示目錄中所有文件的大小

  示例:hadoop fs -du /user/hadoop/dir1

du -s 使用方法:hadoop fs -du -s <args>

  顯示文件的大小

  常用:hadoop fs -du -h /srv/smart/hhh.txt

expunge 使用方法:hadoop fs -expunge
  清空回收站
回收站: hadoop fs -ls /安裝目錄下/.trash 
       Hadoop文件刪除後可以放在回收站內

  

5. HDFS的API方式訪問

Eclipse中hadoop環境的建立

Configuration類:該類的對象封裝了客戶端或者服務端的配置。
FileSystem類:該類的對象是一個文件系統對象,可以用該隊想的一些方法來對文件進行操作。
FSDataInputStream和FSDataOutputStream:這兩個類是HDFS中的輸入輸出流。

基本流程:
得到Configuration對象
得到FileSystem對象
進行文件操作(讀寫、刪除、改名)
所需引入的庫

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

文件操作前的準備工作

獲取配置信息 Configuration conf = new Configuration(); 建立文件系統對象 FileSystem fs = FileSystem.get(conf); 寫文件 Path dstPath = new Path(dst);//目標路徑 FSDataOutputStream outputStream = fs.create(dstPath);//打開一個輸出流 outputStream.write(contents); outputStream.close(); fs.close(); 讀文件 InputStream in = null; in = fs.open(srcPath); IOUtils.copyBytes(in,System.out,4096,false);//復制到標準輸出流 IOUtils.closeStream(in); 上傳文件到HDFS Path srcPath = new Path(src);//原路徑 Path dstPath = new Path(dst);//目標路徑 fs.copyFromLocalFile(false,srcPath,dstPath); 前面參數是指是否刪除原文件,true為刪除,默認為false 獲取指定文件信息 FileStatus[] fileStatus = fs.listStatus(dstPath); boolean isok=fs.rename(oldPath,newPath); boolean isok=fs.deleteOnExit(path); boolean isok=fs.mkdirs(srcPath);

6. HDFS API具體操作

技術分享圖片
  1 打開eclipse建立一個工程,將代碼copy,逐步運行。
  2 
  3 在工程中建立一個conf目錄,註意先將相關的配置文件拷貝過去主要是三個文件core-site.xml , mapred-site.xml,hdfs-site.xml
  4 
  5 並在project屬性中設置java build path 單擊addfolder 將conf目錄加上
  6 
  7 import java.io.FileInputStream;
  8 
  9 import java.io.IOException;
 10 
 11 import java.io.InputStream;
 12 
 13 import org.apache.hadoop.conf.Configuration;
 14 
 15 import org.apache.hadoop.fs.FSDataOutputStream;
 16 
 17 import org.apache.hadoop.fs.FileStatus;
 18 
 19 import org.apache.hadoop.fs.FileSystem;
 20 
 21 import org.apache.hadoop.fs.Path;
 22 
 23 import org.apache.hadoop.io.IOUtils;
 24 
 25 public class HdfsFile {
 26 
 27     //創建新文件
 28 
 29     public static void createFile(String dst , byte[] contents) throws IOException{
 30 
 31         Configuration conf = new Configuration();
 32 
 33         FileSystem fs = FileSystem.get(conf);
 34 
 35         Path dstPath = new Path(dst); //目標路徑
 36 
 37         //打開一個輸出流
 38 
 39         FSDataOutputStream outputStream = fs.create(dstPath);
 40 
 41         outputStream.write(contents);
 42 
 43         outputStream.close();
 44 
 45         fs.close();
 46 
 47         System.out.println("文件創建成功!");
 48 
 49     }
 50 
 51     //上傳本地文件
 52 
 53     public static void uploadFile(String src,String dst) throws IOException{
 54 
 55         Configuration conf = new Configuration();
 56 
 57         FileSystem fs = FileSystem.get(conf);
 58 
 59         Path srcPath = new Path(src); //原路徑
 60 
 61         Path dstPath = new Path(dst); //目標路徑
 62 
 63         //調用文件系統的文件復制函數,前面參數是指是否刪除原文件,true為刪除,默認為false
 64 
 65         fs.copyFromLocalFile(false,srcPath, dstPath);
 66 
 67         //打印文件路徑
 68 
 69         System.out.println("Upload to "+conf.get("fs.default.name"));
 70 
 71         System.out.println("------------list files------------"+"\n");
 72 
 73         FileStatus [] fileStatus = fs.listStatus(dstPath);
 74 
 75         for (FileStatus file : fileStatus)
 76 
 77         {
 78 
 79             System.out.println(file.getPath());
 80 
 81         }
 82 
 83         fs.close();
 84 
 85     }
 86 
 87     //文件重命名
 88 
 89     public static void rename(String oldName,String newName) throws IOException{
 90 
 91         Configuration conf = new Configuration();
 92 
 93         FileSystem fs = FileSystem.get(conf);
 94 
 95         Path oldPath = new Path(oldName);
 96 
 97         Path newPath = new Path(newName);
 98 
 99         boolean isok = fs.rename(oldPath, newPath);
100 
101         if(isok){
102 
103             System.out.println("rename ok!");
104 
105         }else{
106 
107             System.out.println("rename failure");
108 
109         }
110 
111         fs.close();
112 
113     }
114 
115     //刪除文件
116 
117     public static void delete(String filePath) throws IOException{
118 
119         Configuration conf = new Configuration();
120 
121         FileSystem fs = FileSystem.get(conf);
122 
123         Path path = new Path(filePath);
124 
125         boolean isok = fs.deleteOnExit(path);
126 
127         if(isok){
128 
129             System.out.println("delete ok!");
130 
131         }else{
132 
133             System.out.println("delete failure");
134 
135         }
136 
137         fs.close();
138 
139     }
140 
141     //創建目錄
142 
143     public static void mkdir(String path) throws IOException{
144 
145         Configuration conf = new Configuration();
146 
147         FileSystem fs = FileSystem.get(conf);
148 
149         Path srcPath = new Path(path);
150 
151         boolean isok = fs.mkdirs(srcPath);
152 
153         if(isok){
154 
155             System.out.println("create dir ok!");
156 
157         }else{
158 
159             System.out.println("create dir failure");
160 
161         }
162 
163         fs.close();
164 
165     }
166 
167     //讀取文件的內容
168 
169     public static void readFile(String filePath) throws IOException{
170 
171         Configuration conf = new Configuration();
172 
173         FileSystem fs = FileSystem.get(conf);
174 
175         Path srcPath = new Path(filePath);
176 
177         InputStream in = null;
178 
179         try {
180 
181             in = fs.open(srcPath);
182 
183             IOUtils.copyBytes(in, System.out, 4096, false); //復制到標準輸出流
184 
185         } finally {
186 
187             IOUtils.closeStream(in);
188 
189         }
190 
191     }
192 
193     public static void main(String[] args) throws IOException {
194 
195         //測試上傳文件
196 
197         //uploadFile("/home/hadoop/music1.txt","hdfs://master:9000/user/hadoop/test.txt");
198 
199         //測試創建文件
200 
201       //byte[] contents =  "hello world 世界你好\n".getBytes();
202 
203        //createFile("hdfs://master:9000/user/hadoop/test1/d.txt",contents);
204 
205         //測試重命名
206 
207         //rename("hdfs://master:9000/user/hadoop/test1/d.txt", "hdfs://master:9000/user/hadoop/test1/dd.txt");
208 
209         //測試刪除文件
210 
211         //delete("hdfs://master:9000/user/hadoop/test1/d.txt"); //使用相對路徑
212 
213        //delete("hdfs://master:9000/user/hadoop/test1");    //刪除目錄
214 
215         //測試新建目錄
216 
217        // mkdir("hdfs://master:9000/user/hadoop/test1");
218 
219         //測試讀取文件
220 
221         //readFile("hdfs://master:9000/user/hadoop/test1/d.txt");
222 
223     }
224 
225 }
View Code

大數據學習---Hadoop的深入學習