大資料:Hadoop新手入門
大資料:Hadoop入門
一:什麼是大資料
- 什麼是大資料:
(1.)大資料是指在一定時間內無法用常規軟體對其內容進行抓取,管理和處理的資料集合,簡而言之就是資料量非常大,大到無法用常規工具進行處理,如關係型資料庫,資料倉庫等。這裡“大”是一個什麼量級呢?如在阿里巴巴每天處理資料達到20PB(即20971520GB).
2.大資料的特點:
(1.)體量巨大。按目前的發展趨勢來看,大資料的體量已經到達PB級甚至EB級。
(2.)大資料的資料型別多樣,以非結構化資料為主,如網路雜誌,音訊,視屏,圖片,地理位置資訊,交易資料,社交資料等。
(3.)價值密度低。有價值的資料僅佔到總資料的一小部分。比如一段視屏中,僅有幾秒的資訊是有價值的。
(4.)產生和要求處理速度快。這是大資料區與傳統資料探勘最顯著的特徵。
3.除此之外還有其他處理系統可以處理大資料。
Hadoop (開源)
Spark(開源)
Storm(開源)
MongoDB(開源)
IBM PureDate(商用)
Oracle Exadata(商用)
SAP Hana(商用)
Teradata AsterData(商用)
EMC GreenPlum(商用)
HP Vertica(商用)
注:這裡我們只介紹Hadoop。
二:Hadoop體系結構 - Hadoop來源:
Hadoop源於Google在2003到2004年公佈的關於GFS(Google File System),MapReduce和BigTable的三篇論文,創始人Doug Cutting。Hadoop現在是Apache基金會頂級專案,“Hadoop”一個虛構的名字。由Doug Cutting的孩子為其黃色玩具大象所命名。 - Hadoop的核心:
(1.)HDFS和MapReduce是Hadoop的兩大核心。通過HDFS來實現對分散式儲存的底層支援,達到高速並行讀寫與大容量的儲存擴充套件。
(2.)通過MapReduce實現對分散式任務進行處理程式支援,保證高速分割槽處理資料。
3.Hadoop子專案:
(1.)HDFS:分散式檔案系統,整個Hadoop體系的基石。
(2.)MapReduce/YARN:並行程式設計模型。YARN是第二代的MapReduce框架,從Hadoop 0.23.01版本後,MapReduce被重構,通常也稱為MapReduce V2,老MapReduce也稱為 MapReduce V1。
(3.)Hive:建立在Hadoop上的資料倉庫,提供類似SQL語音的查詢方式,查詢Hadoop中的資料,
(4.)Pig:一個隊大型資料進行分析和評估的平臺,主要作用類似於資料庫中儲存過程。
(5.)HBase:全稱Hadoop Database,Hadoop的分散式的,面向列的資料庫,來源於Google的關於BigTable的論文,主要用於隨機訪問,實時讀寫的大資料。
(6.)ZooKeeper:是一個為分散式應用所設計的協調服務,主要為使用者提供同步,配置管理,分組和命名等服務,減輕分散式應用程式所承擔的協調任務。
還有其它特別多其它專案這裡不做一一解釋了。
HDFS架構
1 Master(NameNode/NN) 帶N個slaves(DataNode/DN)
HDFS/YARN/HBase
1個檔案會被拆分成多個Block
blocksize:128M
130==>2個Block: 128M和2M
NN:
1)負責客戶端請求的響應
2)負責元資料(檔案的名稱、副本系數、block存放的DN)的管理
DataNode:
- 儲存使用者的檔案對應的資料塊
2)要定期向name node傳送心跳資訊,彙報本身及其所有的block資訊,健康狀況
name node+n 個 datanode
建議:NAME node和data node是部署在不同的節點上
HDFS
所有的block 除了最後一塊 其他大小都一樣
namenode(filename,numReplicas(副本系數),block_ids)
副本存放策略:
1st replica. 如果寫請求方所在機器是其中一個datanode,則直接存放在本地,否則隨機在叢集中選擇一個datanode.
2nd replica. 第二個副本存放於不同第一個副本的所在的機架.
3rd replica.第三個副本存放於第二個副本所在的機架,但是屬於不同的節點.
A typical deployment has a dedicated machine that runs only the NameNode software.
Each of the other machines in the cluster runs one instance of the DataNode software.
The architecture does not preclude running multiple DataNodes on the same machine
but in a real deployment that is rarely the case.
NameNode + N個DataNode
建議:NN和DN是部署在不同的節點上
replication factor:副本系數、副本因子
All blocks in a file except the last block are the same size
Hadoop1.x時:
MapReduce:Master/Slave架構,1個JobTracker帶多個TaskTracker
JobTracker: 負責資源管理和作業排程
TaskTracker:
定期向JT彙報本節點的健康狀況、資源使用情況、作業執行情況;
接收來自JT的命令:啟動任務/殺死任務
YARN:不同計算框架可以共享同一個HDFS叢集上的資料,享受整體的資源排程
XXX on YARN的好處:
與其他計算框架共享叢集資源,按資源需要分配,進而提高叢集資源的利用率
XXX: Spark/MapReduce/Storm/Flink
YARN架構:
1)ResourceManager: RM
整個叢集同一時間提供服務的RM只有一個,負責叢集資源的統一管理和排程
處理客戶端的請求: 提交一個作業、殺死一個作業
監控我們的NM,一旦某個NM掛了,那麼該NM上執行的任務需要告訴我們的AM來如何進行處理
-
NodeManager: NM
整個叢集中有多個,負責自己本身節點資源管理和使用
定時向RM彙報本節點的資源使用情況
接收並處理來自RM的各種命令:啟動Container
處理來自AM的命令
單個節點的資源管理 -
ApplicationMaster: AM
每個應用程式對應一個:MR、Spark,負責應用程式的管理
為應用程式向RM申請資源(core、memory),分配給內部task
需要與NM通訊:啟動/停止task,task是執行在container裡面,AM也是執行在container裡面 -
Container
封裝了CPU、Memory等資源的一個容器
是一個任務執行環境的抽象 -
Client
提交作業
查詢作業的執行進度
殺死作業
YARN環境搭建
1)mapred-site.xml
mapreduce.framework.name
yarn
2)yarn-site.xml
yarn.nodemanager.aux-services
mapreduce_shuffle
- 啟動YARN相關的程序
sbin/start-yarn.sh
4)驗證
jps
ResourceManager
NodeManager
http://hadoop000:8088
5)停止YARN相關的程序
sbin/stop-yarn.sh
提交mr作業到YARN上執行:
/home/hadoop/app/hadoop-2.6.0-cdh5.7.0/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0-cdh5.7.0.jar
hadoop jar
hadoop jar hadoop-mapreduce-examples-2.6.0-cdh5.7.0.jar pi 2 3
wordcount: 統計檔案中每個單詞出現的次數
需求:求wc
- 檔案內容小:shell
2)檔案內容很大: TB GB ??? 如何解決大資料量的統計分析
> url TOPN < wc的延伸
工作中很多場景的開發都是wc的基礎上進行改造的
藉助於分散式計算框架來解決了: mapreduce
分而治之
(input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output)
核心概念
Split:交由MapReduce作業來處理的資料塊,是MapReduce中最小的計算單元
HDFS:blocksize 是HDFS中最小的儲存單元 128M
預設情況下:他們兩是一一對應的,當然我們也可以手工設定他們之間的關係(不建議)
InputFormat:
將我們的輸入資料進行分片(split): InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
TextInputFormat: 處理文字格式的資料
OutputFormat: 輸出
MapReduce1.x的架構
1)JobTracker: JT
作業的管理者 管理的
將作業分解成一堆的任務:Task(MapTask和ReduceTask)
將任務分派給TaskTracker執行
作業的監控、容錯處理(task作業掛了,重啟task的機制)
在一定的時間間隔內,JT沒有收到TT的心跳資訊,TT可能是掛了,TT上執行的任務會被指派到其他TT上去執行
2)TaskTracker: TT
任務的執行者 幹活的
在TT上執行我們的Task(MapTask和ReduceTask)
會與JT進行互動:執行/啟動/停止作業,傳送心跳資訊給JT
3)MapTask
自己開發的map任務交由該Task出來
解析每條記錄的資料,交給自己的map方法處理
將map的輸出結果寫到本地磁碟(有些作業只僅有map沒有reduce==>HDFS)
4)ReduceTask
將Map Task輸出的資料進行讀取
按照資料進行分組傳給我們自己編寫的reduce方法處理
輸出結果寫到HDFS
使用IDEA+Maven開發wc:
1)開發
2)編譯:mvn clean package -DskipTests
3)上傳到伺服器:scp target/hadoop-train-1.0.jar [email protected]:~/lib
4)執行
hadoop jar /home/hadoop/lib/hadoop-train-1.0.jar com.imooc.hadoop.mapreduce.WordCountApp hdfs://hadoop000:8020/hello.txt hdfs://hadoop000:8020/output/wc
相同的程式碼和指令碼再次執行,會報錯
security.UserGroupInformation:
PriviledgedActionException as:hadoop (auth:SIMPLE) cause:
org.apache.hadoop.mapred.FileAlreadyExistsException:
Output directory hdfs://hadoop000:8020/output/wc already exists
Exception in thread "main" org.apache.hadoop.mapred.FileAlreadyExistsException:
Output directory hdfs://hadoop000:8020/output/wc already exists
在MR中,輸出檔案是不能事先存在的
1)先手工通過shell的方式將輸出資料夾先刪除
hadoop fs -rm -r /output/wc
2) 在程式碼中完成自動刪除功能: 推薦大家使用這種方式
Path outputPath = new Path(args[1]);
FileSystem fileSystem = FileSystem.get(configuration);
if(fileSystem.exists(outputPath)){
fileSystem.delete(outputPath, true);
System.out.println("output file exists, but is has deleted");
}
Combiner
hadoop jar /home/hadoop/lib/hadoop-train-1.0.jar com.imooc.hadoop.mapreduce.CombinerApp hdfs://hadoop000:8020/hello.txt hdfs://hadoop000:8020/output/wc
使用場景:
求和、次數 +
平均數 X
Partitioner
hadoop jar /home/hadoop/lib/hadoop-train-1.0.jar com.imooc.hadoop.mapreduce.ParititonerApp hdfs://hadoop000:8020/partitioner hdfs://hadoop000:8020/output/partitioner
使用者行為日誌:使用者每次訪問網站時所有的行為資料(訪問、瀏覽、搜尋、點選…)
使用者行為軌跡、流量日誌
日誌資料內容:
1)訪問的系統屬性: 作業系統、瀏覽器等等
2)訪問特徵:點選的url、從哪個url跳轉過來的(referer)、頁面上的停留時間等
3)訪問資訊:session_id、訪問ip(訪問城市)等
2013-05-19 13:00:00 http://www.taobao.com/17/?tracker_u=1624169&type=1 B58W48U4WKZCJ5D1T3Z9ZY88RU7QA7B1 http://hao.360.cn/ 1.196.34.243
資料處理流程
1)資料採集
Flume: web日誌寫入到HDFS
2)資料清洗
髒資料
Spark、Hive、MapReduce 或者是其他的一些分散式計算框架
清洗完之後的資料可以存放在HDFS(Hive/Spark SQL)
3)資料處理
按照我們的需要進行相應業務的統計和分析
Spark、Hive、MapReduce 或者是其他的一些分散式計算框架
4)處理結果入庫
結果可以存放到RDBMS、NoSQL
5)資料的視覺化
通過圖形化展示的方式展現出來:餅圖、柱狀圖、地圖、折線圖
ECharts、HUE、Zeppelin
UserAgent
hadoop jar /home/hadoop/lib/hadoop-train-1.0-jar-with-dependencies.jar com.imooc.hadoop.project.LogApp /10000_access.log /browserout
spark啟動:spark-shell --master local[2]
spark實現wc:
val file = sc.textFile(“file:///home/hadoop/data/hello.txt”)
val a = file.flatMap(line => line.split(" "))
val b = a.map(word => (word,1))
Array((hadoop,1), (welcome,1), (hadoop,1), (hdfs,1), (mapreduce,1), (hadoop,1), (hdfs,1))
val c = b.reduceByKey(_ + _)
Array((mapreduce,1), (welcome,1), (hadoop,3), (hdfs,2))
sc.textFile(“file:///home/hadoop/data/hello.txt”).flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_ + _).collect
Flink執行
./bin/flink run ./examples/batch/WordCount.jar
–input file:///home/hadoop/data/hello.txt --output file:///home/hadoop/tmp/flink_wc_output
Beam執行:
#direct方式執行
mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount
-Dexec.args="–inputFile=/home/hadoop/data/hello.txt --output=counts"
-Pdirect-runner
#spark方式執行
mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount
-Dexec.args="–runner=SparkRunner --inputFile=/home/hadoop/data/hello.txt --output=counts" -Pspark-runner
#flink方式執行