大資料開發技術基礎
《大資料開發技術基礎》複習
題型:選擇題(30分)、簡答題(20分)、分析論述題(20分)、程式設計題(30分)
考試範圍:
1、 大資料概述
2、 Linux基礎知識及相關命令
3、 分散式檔案系統HDFS
4、 分散式資料庫HBase(含MySQL)
5、 NoSQL資料庫
6、 MapReduce
7、 大資料處理架構Hadoop
8、 Spark
9、 流計算
10、資料倉庫Hive
11、大資料在不同領域的應用
一、 Linux基本部分
ls, vi,rm, exit, top, cp, cat, tac, nl, mkdir, rmdir, tree, ls, pwd, ln, tail, mv, tar, file, find等等,詳見課件。
mkdir
Linux mkdir(英文全拼:make directory)命令用於建立目錄。
mkdir [-p] dirName
引數說明:
- -p 確保目錄名稱存在,不存在的就建一個。
rmdir
Linux rmdir(英文全拼:remove directory)命令刪除空的目錄。
rmdir [-p] dirName
引數:
- -p 是當子目錄被刪除後使它也成為空目錄的話,則順便一併刪除。
cp
Linux cp(英文全拼:copy file)命令主要用於複製檔案或目錄。
cp [options] source dest
或
cp [options] source... directory
引數說明:
- -a:此選項通常在複製目錄時使用,它保留連結、檔案屬性,並複製目錄下的所有內容。其作用等於dpR引數組合。
- -d:複製時保留連結。這裡所說的連結相當於 Windows 系統中的快捷方式。
- -f:覆蓋已經存在的目標檔案而不給出提示。
- -i:與 -f 選項相反,在覆蓋目標檔案之前給出提示,要求使用者確認是否覆蓋,回答 y 時目標檔案將被覆蓋。
- -p:除複製檔案的內容外,還把修改時間和訪問許可權也複製到新檔案中。
- -r:若給出的原始檔是一個目錄檔案,此時將複製該目錄下所有的子目錄和檔案。
- -l:不復制檔案,只是生成連結檔案。
例項
使用指令 cp 將當前目錄 test/ 下的所有檔案複製到新目錄 newtest
$ cp –r test/ newtest
注意:使用者使用該指令複製目錄時,必須使用引數 -r 或者 -R 。
mv
Linux mv(英文全拼:move file)命令用來為檔案或目錄改名、或將檔案或目錄移入其它位置。
mv [options] source dest
mv [options] source... directory
引數說明:
- -b: 當目標檔案或目錄存在時,在執行覆蓋前,會為其建立一個備份。
- -i: 如果指定移動的源目錄或檔案與目標的目錄或檔案同名,則會先詢問是否覆蓋舊檔案,輸入 y 表示直接覆蓋,輸入 n 表示取消該操作。
- -f: 如果指定移動的源目錄或檔案與目標的目錄或檔案同名,不會詢問,直接覆蓋舊檔案。
- -n: 不要覆蓋任何已存在的檔案或目錄。
- -u:當原始檔比目標檔案新或者目標檔案不存在時,才執行移動操作。
mv 引數設定與執行結果
命令格式 | 執行結果 |
---|---|
mv source_file(檔案) dest_file(檔案) |
將原始檔名 source_file 改為目標檔名 dest_file |
mv source_file(檔案) dest_directory(目錄) |
將檔案 source_file 移動到目標目錄 dest_directory 中 |
mv source_directory(目錄) dest_directory(目錄) |
目錄名 dest_directory 已存在,將 source_directory 移動到目錄名 dest_directory 中;目錄名 dest_directory 不存在則 source_directory 改名為目錄名 dest_directory |
mv source_directory(目錄) dest_file(檔案) |
出錯 |
例項
將檔案 aaa 改名為 bbb :
mv aaa bbb
將 info 目錄放入 logs 目錄中。注意,如果 logs 目錄不存在,則該命令將 info 改名為 logs。
mv info/ logs
再如將 /usr/runoob 下的所有檔案和目錄移到當前目錄下,命令列為:
$ mv /usr/runoob/* .
top
Linux top命令用於實時顯示 process 的動態。
使用許可權:所有使用者。
tac
tac與cat命令剛好相反,檔案內容從最後一行開始顯示,可以看出 tac 是 cat 的倒著寫!如:
nl
顯示的時候,順道輸出行號!
語法:
nl [-bnw] 檔案
選項與引數:
- -b :指定行號指定的方式,主要有兩種:
-b a :表示不論是否為空行,也同樣列出行號(類似 cat -n);
-b t :如果有空行,空的那一行不要列出行號(預設值); - -n :列出行號表示的方法,主要有三種:
-n ln :行號在熒幕的最左方顯示;
-n rn :行號在自己欄位的最右方顯示,且不加 0 ;
-n rz :行號在自己欄位的最右方顯示,且加 0 ; - -w :行號欄位的佔用的位數。
tar
引數:
-c :建立一個壓縮檔案的引數指令(create 的意思);
-x :解開一個壓縮檔案的引數指令!
-t :檢視 tarfile 裡面的檔案!
特別注意,在引數的下達中, c/x/t 僅能存在一個!不可同時存在!
因為不可能同時壓縮與解壓縮。
-z :是否同時具有 gzip 的屬性?亦即是否需要用 gzip 壓縮?
-j :是否同時具有 bzip2 的屬性?亦即是否需要用 bzip2 壓縮?
-v :壓縮的過程中顯示檔案!這個常用,但不建議用在背景執行過程!
-f :使用檔名,請留意,在 f 之後要立即接檔名喔!不要再加引數!
例如使用『 tar -zcvfP tfile sfile』就是錯誤的寫法,要寫成
『 tar -zcvPf tfile sfile』才對喔!
-p :使用原檔案的原來屬性(屬性不會依據使用者而變)
-P :可以使用絕對路徑來壓縮!
-N :比後面接的日期(yyyy/mm/dd)還要新的才會被打包進新建的檔案中!
--exclude FILE:在壓縮的過程中,不要將 FILE 打包!
範例:
範例一:將整個 /etc 目錄下的檔案全部打包成為 /tmp/etc.tar
[root@linux ~]# tar -cvf /tmp/etc.tar /etc <==僅打包,不壓縮!
[root@linux ~]# tar -zcvf /tmp/etc.tar.gz /etc <==打包後,以 gzip 壓縮
[root@linux ~]# tar -jcvf /tmp/etc.tar.bz2 /etc <==打包後,以 bzip2 壓縮
特別注意,在引數 f 之後的檔案檔名是自己取的,我們習慣上都用 .tar 來作為辨識。
如果加 z 引數,則以 .tar.gz 或 .tgz 來代表 gzip 壓縮過的 tar file ~
如果加 j 引數,則以 .tar.bz2 來作為附檔名啊~
上述指令在執行的時候,會顯示一個警告訊息:
『tar: Removing leading `/' from member names』那是關於絕對路徑的特殊設定。
範例二:查閱上述 /tmp/etc.tar.gz 檔案內有哪些檔案?
[root@linux ~]# tar -ztvf /tmp/etc.tar.gz
由於我們使用 gzip 壓縮,所以要查閱該 tar file 內的檔案時,
就得要加上 z 這個引數了!這很重要的!
範例三:將 /tmp/etc.tar.gz 檔案解壓縮在 /usr/local/src 底下
[root@linux ~]# cd /usr/local/src
[root@linux src]# tar -zxvf /tmp/etc.tar.gz
在預設的情況下,我們可以將壓縮檔在任何地方解開的!以這個範例來說,
我先將工作目錄變換到 /usr/local/src 底下,並且解開 /tmp/etc.tar.gz ,
則解開的目錄會在 /usr/local/src/etc 呢!另外,如果您進入 /usr/local/src/etc
則會發現,該目錄下的檔案屬性與 /etc/ 可能會有所不同喔!
範例四:在 /tmp 底下,我只想要將 /tmp/etc.tar.gz 內的 etc/passwd 解開而已
[root@linux ~]# cd /tmp
[root@linux tmp]# tar -zxvf /tmp/etc.tar.gz etc/passwd
我可以透過 tar -ztvf 來查閱 tarfile 內的檔名稱,如果單隻要一個檔案,
就可以透過這個方式來下達!注意到! etc.tar.gz 內的根目錄 / 是被拿掉了!
範例五:將 /etc/ 內的所有檔案備份下來,並且儲存其許可權!
[root@linux ~]# tar -zxvpf /tmp/etc.tar.gz /etc
這個 -p 的屬性是很重要的,尤其是當您要保留原本檔案的屬性時!
範例六:在 /home 當中,比 2005/06/01 新的檔案才備份
[root@linux ~]# tar -N '2005/06/01' -zcvf home.tar.gz /home
範例七:我要備份 /home, /etc ,但不要 /home/dmtsai
[root@linux ~]# tar --exclude /home/dmtsai -zcvf myfile.tar.gz /home/* /etc
範例八:將 /etc/ 打包後直接解開在 /tmp 底下,而不產生檔案!
[root@linux ~]# cd /tmp
[root@linux tmp]# tar -cvf - /etc | tar -xvf -
這個動作有點像是 cp -r /etc /tmp 啦~依舊是有其有用途的!
要注意的地方在於輸出檔變成 - 而輸入檔也變成 - ,又有一個 | 存在~
這分別代表 standard output, standard input 與管線命令啦!
這部分我們會在 Bash shell 時,再次提到這個指令跟大家再解釋囉!
tree
Linux tree命令用於以樹狀圖列出目錄的內容。
執行tree指令,它會列出指定目錄下的所有檔案,包括子目錄裡的檔案。
ln
https://www.runoob.com/linux/linux-comm-ln.html
Linux ln(英文全拼:link files)命令是一個非常重要命令,它的功能是為某一個檔案在另外一個位置建立一個同步的連結。
當我們需要在不同的目錄,用到相同的檔案時,我們不需要在每一個需要的目錄下都放一個必須相同的檔案,我們只要在某個固定的目錄,放上該檔案,然後在 其它的目錄下用ln命令連結(link)它就可以,不必重複的佔用磁碟空間。
語法
ln [引數][原始檔或目錄][目標檔案或目錄]
命令引數:
- 必要引數:
- -b 刪除,覆蓋以前建立的連結
- -d 允許超級使用者製作目錄的硬連結
- -f 強制執行
- -i 互動模式,檔案存在則提示使用者是否覆蓋
- -n 把符號連結視為一般目錄
- -s 軟連結(符號連結)
- -v 顯示詳細的處理過程
- 選擇引數:
- -S "-S<字尾備份字串> "或 "--suffix=<字尾備份字串>"
- -V "-V<備份方式>"或"--version-control=<備份方式>"
- --help 顯示幫助資訊
- --version 顯示版本資訊
tail
tail 命令可用於檢視檔案的內容,有一個常用的引數 -f 常用於查閱正在改變的日誌檔案。
tail -f filename 會把 filename 檔案裡的最尾部的內容顯示在螢幕上,並且不斷重新整理,只要 filename 更新就可以看到最新的檔案內容。
命令格式:
tail [引數] [檔案]
引數:
- -f 迴圈讀取
- -q 不顯示處理資訊
- -v 顯示詳細的處理資訊
- -c<數目> 顯示的位元組數
- -n<行數> 顯示檔案的尾部 n 行內容
- --pid=PID 與-f合用,表示在程序ID,PID死掉之後結束
- -q, --quiet, --silent 從不輸出給出檔名的首部
- -s, --sleep-interval=S 與-f合用,表示在每次反覆的間隔休眠S秒
file
https://www.runoob.com/linux/linux-comm-file.html
Linux file命令用於辨識檔案型別。
通過file指令,我們得以辨識該檔案的型別。
語法
file [-bcLvz][-f <名稱檔案>][-m <魔法數字檔案>...][檔案或目錄...]
引數:
- -b 列出辨識結果時,不顯示檔名稱。
- -c 詳細顯示指令執行過程,便於排錯或分析程式執行的情形。
- -f<名稱檔案> 指定名稱檔案,其內容有一個或多個檔名稱時,讓file依序辨識這些檔案,格式為每列一個檔名稱。
- -L 直接顯示符號連線所指向的檔案的類別。
- -m<魔法數字檔案> 指定魔法數字檔案。
- -v 顯示版本資訊。
- -z 嘗試去解讀壓縮檔案的內容。
- [檔案或目錄...] 要確定型別的檔案列表,多個檔案之間使用空格分開,可以使用shell萬用字元匹配多個檔案。
find
https://www.runoob.com/linux/linux-comm-find.html
Linux find 命令用來在指定目錄下查詢檔案。任何位於引數之前的字串都將被視為欲查詢的目錄名。如果使用該命令時,不設定任何引數,則 find 命令將在當前目錄下查詢子目錄與檔案。並且將查詢到的子目錄和檔案全部進行顯示。
語法
find path -option [ -print ] [ -exec -ok command ] {} \;
引數說明 :
find 根據下列規則判斷 path 和 expression,在命令列上第一個 - ( ) , ! 之前的部份為 path,之後的是 expression。如果 path 是空字串則使用目前路徑,如果 expression 是空字串則使用 -print 為預設 expression。
expression 中可使用的選項有二三十個之多,在此只介紹最常用的部份。
-mount, -xdev : 只檢查和指定目錄在同一個檔案系統下的檔案,避免列出其它檔案系統中的檔案
-amin n : 在過去 n 分鐘內被讀取過
-anewer file : 比檔案 file 更晚被讀取過的檔案
-atime n : 在過去n天內被讀取過的檔案
-cmin n : 在過去 n 分鐘內被修改過
-cnewer file :比檔案 file 更新的檔案
-ctime n : 在過去n天內被修改過的檔案
-empty : 空的檔案-gid n or -group name : gid 是 n 或是 group 名稱是 name
-ipath p, -path p : 路徑名稱符合 p 的檔案,ipath 會忽略大小寫
-name name, -iname name : 檔名稱符合 name 的檔案。iname 會忽略大小寫
-size n : 檔案大小 是 n 單位,b 代表 512 位元組的區塊,c 表示字元數,k 表示 kilo bytes,w 是二個位元組。
-type c : 檔案型別是 c 的檔案。
d: 目錄
c: 字型裝置檔案
b: 區塊裝置檔案
p: 具名貯列
f: 一般檔案
l: 符號連結
s: socket
-pid n : process id 是 n 的檔案
概念與分析論述(見書後習題)
1. 大資料決策與傳統的基於資料倉庫的決策
資料倉庫具備批量和週期性的資料載入以及資料變化的實時探測、傳播和載入能力,能結合歷史資料和實時資料實現查詢分析和自動規則觸發,從而提供戰略決策和戰術決策。 大資料決策可以面向型別繁多的、非結構化的海量資料進行決策分析
2. 大資料、雲端計算和物聯網
大資料
大資料的特點
資料量大
資料型別繁多:包括結構化資料和非結構化資料
處理速度快
價值密度低
大資料的影響
對科學研究的影響:實驗->理論->計算->資料。
在思維方式方面,大資料具有”全面而非抽樣、效率而非精確、相關而非因果“三大顯著特徵。
社會、就業、人才培養等方面的影響
雲端計算
1. 雲端計算概念
雲端計算實現了通過網路提供可伸縮的、廉價的分散式計算能力,使用者只需要在具備網路接入條件的地方,就可以隨時隨地獲得所需的各種IT資源
雲端計算包括三種典型服務 :IaaS(基礎設施即服務)、PaaS(平臺即服務)、SaaS(軟體即服務)。
雲端計算包括公有云、私有云、混合雲三種類型
2.雲資料關鍵技術
虛擬化、分散式儲存、分散式計算、多租戶
3.雲端計算資料中心
資料中心是雲端計算的重要載體,為雲端計算提供計算、儲存、頻寬等各種硬體資源
物聯網
1.物聯網概念
物聯網是物物相連的網際網路,是網際網路的延伸,它利用區域性網路或網際網路等通訊技術把感測器、控制器、機器、人員和物等通過新的方式聯在一起,形成人與物、物與物相聯,實現資訊化和遠端管理控制
雲端計算、大資料和物聯網代表了IT領域最新的技術發展趨勢,三者既有區別又有聯絡
區別:
大資料側重於對海量資料的儲存、處理和分析,從海量資料中發現價值,服務於生產和生活;
雲端計算本質上旨在整合和優化各種IT資源,並通過網路以服務的方式廉價地提供給使用者;
物聯網的發展目標是實現物物相連,應用創新是物聯網發展的核心
3. HDFS中的名稱節點和資料節點
分散式檔案系統在物理結構上是由計算機叢集中的多個節點構成的。這些節點分為兩類。
名稱節點負責檔案和目錄的建立、刪除和重新命名等,同時管理著資料節點和資料塊的對映關係,因此客戶端只有訪問名稱節點才能找到請求的資料塊所在的位置,進而到相應位置讀取所需資料塊。
資料節點負責資料的儲存和讀取。在儲存時,名稱節點分配儲存位置,然後由客戶端把資料直接寫入相應的資料節點;在讀取時,客戶端從名稱節點獲得資料節點和檔案塊的對映關係,然後就可以到相應位置訪問檔案塊。資料節點也要根據名稱節點的命令建立、刪除資料塊和冗餘複製。資料節點會向名稱節點定期傳送自己所儲存的塊的列表。
名稱節點負責管理分散式檔案系統的名稱空間,儲存了兩個核心的資料結構(FsImage和EditLog)。FSImage用於維護檔案系統樹以及檔案系統樹中所有的檔案與資料夾的元資料;操作日誌檔案EditLog中記錄了所有針對檔案的建立、刪除、重新命名操作。名稱節點記錄了每個檔案中各個塊所在的資料節點的位置資訊,在系統每次啟動時掃描所有資料節點重構得到這些資訊。
4. HDFS的冗餘資料儲存策略
採用多副本方式對資料進行冗餘儲存,通常一個數據塊的多個副本會被分到不同的資料節點上。
- 加快資料傳輸速度:讓客戶端分別從不同的資料塊副本中讀取資料
- 容易檢查資料錯誤:多副本容易判斷是否出錯
- 保證資料的可靠性:某個節點出現故障也不會造成資料丟失
5. HDFS是探測錯誤發生以及恢復
1. 名稱節點出錯
Hadoop採用兩種機制來確保名稱節點的安全:
- 把名稱節點上的元資料資訊同步儲存到其他檔案系統
- 執行一個第二名稱節點,名稱節點宕機之後,利用第二名稱節點中的原資料資訊進行系統恢復(仍然會丟失部分資料)
2. 資料節點出錯
資料節點定期向名稱節點發送“心跳”資訊。發生故障或斷網時,名稱節點收不到“心跳”,把資料節點標記為“宕機”,節點上的所有資料標記為“不可讀”,不再發送I/O請求。
名稱節點定期檢查一些資料塊的副本數量是否小於冗餘因子,如果小於,名稱節點啟動資料冗餘複製,生成新的副本。
3. 資料出錯
檔案被建立時,客戶端就會對每一個檔案塊進行資訊摘錄,並把這些資訊寫入同一個路徑的隱藏檔案中。客戶端讀取檔案時,會先讀取該資訊檔案,然後利用該資訊檔案對每個讀取的資料塊進行md5和sha1校驗,如果校驗出錯,客戶端就會請求到另一個數據節點讀取該檔案塊,並向名稱節點報告這個檔案塊有錯誤,名稱節點會定期檢查並重新複製這個塊。
6. HDFS在正常情況下讀\寫檔案過程
讀資料的過程
寫資料的過程
7. HBase和BigTable
HBase是一個高可靠、高效能、面向列、可伸縮的分散式資料庫,是谷歌BigTable的開源實現,主要用來儲存非結構化和半結構化的鬆散資料。Hbase的目標是處理非常龐大的表。
HBase利用Hadoop MapReduce來處理HBase中的海量資料,實現高效能運算;利用Zookeeper作為協同服務,實現穩定服務和失敗恢復;使用HDFS作為高可靠的底層儲存,利用廉價叢集提供海量資料儲存能力; Sqoop為HBase提供了高效便捷的RDBMS資料匯入功能,Pig和Hive為HBase提供了高層語言支援。
專案 | BigTable | HBase |
---|---|---|
檔案儲存系統 | GFS | HDFS |
海量資料處理 | MapReduce | Hadoop MapReduce |
協同服務管理 | Chubby | Zookeeper |
8. HBase和傳統關係資料庫
區別 | 傳統關係資料庫 | HBase |
---|---|---|
資料型別 | 關係模型 | 資料模型 |
資料操作 | 增刪查改、多表連線 | 增刪查,無法實現表與表之間關聯 |
儲存模式 | 基於行模式儲存,元組或行會被連續地儲存在磁碟也中 | 基於列儲存,每個列族都由幾個檔案儲存,不同列族的檔案是分離的 |
資料索引 | 針對不同列構建複雜的多個索引 | 只有一個行鍵索引 |
資料維護 | 用最新的當前值去替換記錄中原來的舊值 | 更新操作不會刪除資料舊的版本,而是生成一個新的版本 |
可伸縮性 | 很難實現橫向擴充套件,縱向擴充套件的空間也比較有限 | 輕易地通過在叢集中增加或者減少硬體數量來實現效能的伸縮 |
HBase不支援事務,無法實現跨行的原子性
9. HBase中行鍵、列族和時間戳
HBase實際上就是一個稀疏、多維、持久化儲存的對映表,它採用行鍵、列族、列限定符和時間戳進行索引,每個值都是byte[]
1.行
每個HBase都由若干行組成,每個行由行鍵來表示。訪問表中的行有3種方式:通過單個行鍵訪問;通過一個行鍵的區間來訪問;全表掃描。行鍵可以是任意字串,在HBase中,行鍵儲存為位元組陣列。儲存時,資料按照行鍵的字典序排序儲存。
2.列族
一個HBase表被分組成許多“列族”的集合,它是基本的訪問控制單元。列族需要在表建立時就定義好,數量不宜太多(限制於幾十個),不需要頻繁修改。
在HBase中,訪問控制、磁碟和記憶體的使用統計都是在列族層面上進行的。
3.時間戳
每個單元格都儲存著同一份資料的多個版本,這些版本採用時間戳進行索引。
10. HBase的概念檢視和物理檢視
概念檢視
稀疏、多維的對映關係
物理檢視
屬於同一個列族的資料儲存在一起,和每個列族一起存放的還包括行鍵和時間戳
11. HBase的資料分割槽機制
根據行鍵的值對錶中的行進行分割槽,每個行區構成一個分割槽,被稱為“Region”,包含了位於某個值域區間內的所有資料,它是負載均衡和資料分發的基本單位,這些Region會被Master伺服器分發到不同的Region伺服器上。
初始每個表只包含一個Region,隨著資料的不斷插入,Region持續增大,達到閾值時就被自動等分成兩個新的Region。
Region的預設大小是100MB到200MB。通常每個Region伺服器放置10~1000個Region
12. 在HBase的三層結構下,客戶端訪問資料
每個Region有一個RegionID來標識它的唯一性,一個Region的識別符號可以表示成”表名+開始主鍵+RegionID“。
.META.表:又稱元資料表。維護一張記錄Region識別符號和Region伺服器標識關係的對映表。可以知道某個Region被儲存在哪個Region伺服器中。
-ROOT-表:又稱根資料表。.META.表的條目非常多的時候,.META.表需要分割成多個Region。為了定位這些Region,需要在構建一個新的對映表即-ROOT-表,記錄所有元資料的位置。-ROOT-表不能分割,只存在一個Region,該Region的名字在程式中寫死,Master主伺服器永遠知道它的位置。
首先訪問Zookeeper,獲取-ROOT表的位置資訊,然後訪問-Root-表,獲得.MATA.表的資訊,接著訪問.MATA.表,找到所需的Region具體位於哪個Region伺服器,最後才會到該Region伺服器讀取資料。
13. Region伺服器向HDFS檔案系統中讀寫資料的基本原理
Region伺服器內部管理了一系列Region物件和一個HLog檔案,其中HLog是磁碟上面的記錄檔案,它記錄著所有的更新操作。每個Region物件又是由多個Store組成的,每個Store對應了表中的一個列族的儲存。每個Store又包含一個MemStore和若干個StoreFile。其中MemStore是在記憶體中的快取,儲存最近更新的資料;StoreFile是磁碟中的檔案,這些檔案都是B樹結構,其底層實現方式是HDFS檔案系統中的HFile,HFile的資料塊通常採用壓縮方式儲存,大大減少網路I/O和磁碟I/O。
當用戶寫入資料時,會被分配到相應的Region伺服器取執行操作。使用者資料首先被寫入到MemStore和HLog中,當操作寫入HLog之後,commit()呼叫才會將其返回給客戶端
當用戶讀取資料時,Region伺服器會首先訪問MemStore,如果資料不再快取中,才會到磁碟上面的StoreFile中去找
14. HLog的工作原理
HBase系統為每個Region伺服器配置了一個HLog檔案,它是一種預寫式日誌(Write Ahead Log),使用者更新資料必須首先寫入日誌後,才能寫入MemStore快取,並且,直到MemStore快取內容對應的日誌已經寫入磁碟,該快取內容才能被刷寫到磁碟。
15. 在HBase中,每個region伺服器維護一個HLog的優缺點
缺點:如果一個Region伺服器發生故障,為了恢復其上次的Region物件,需要將Region伺服器上的HLog按照其所屬的Region物件進行拆分,然後分發到其他Region伺服器上執行恢復操作。
優點:多個Region物件的更新操作所發生的日誌修改,只需要不斷把日誌記錄追加到單個日誌檔案中,不需要同時開啟、寫入到多個日誌檔案中,因此可以減少磁碟定址次數,提高對錶的寫操作效能。
16. Region伺服器意外終止情況的處理
Region發生故障時,Zookeeper通知Master,Master首先處理HLog檔案。系統根據每條日誌記錄所屬的Region物件將HLog資料進行拆分,分別放到相應Region物件的目錄下,然後再將失效的Region重新分配到可用的Region伺服器中,並把與該Region物件相關的HLog日誌記錄也傳送給相應的Region伺服器。Region伺服器領取到分配給自己的Region物件以及與之相關的HLog日誌記錄以後,會重新做一遍日誌記錄中的各種操作,把日誌記錄中的資料寫入MemStore快取,然後重新整理到磁碟的StoreFIle檔案中,完成資料恢復。
17. 關係資料庫和NoSQL資料庫
18. 鍵值資料庫、列族資料庫、文件資料庫和圖資料庫的適用場合和優缺點
資料庫 | 適用場合 | 優點 | 缺點 |
---|---|---|---|
鍵值資料庫 | 涉及頻繁讀寫、擁有簡單資料模型的應用 內容快取,比如會話、配置檔案、引數、購物車等 儲存配置和使用者資料資訊的移動應用 | 擴充套件性好,靈活性好,大量寫操作時效能高 | 無法儲存結構化資訊,條件查詢效率較低 |
列族資料庫 | 分散式資料儲存與管理 資料在地理上分佈於多個數據中心的應用程式 可以容忍副本中存在短期不一致情況的應用程式 擁有動態欄位的應用程式 擁有潛在大量資料的應用程式,大到幾百TB的資料 | 查詢速度快,可擴充套件性強,容易進行分散式擴充套件,複雜性低 | 功能較少,大都不支援強事務一致性 |
文件資料庫 | 儲存、索引並管理面向文件的資料或者類似的半結構化資料 比如,用於後臺具有大量讀寫操作的網站、使用JSON資料結構的應用、使用巢狀結構等非規範化資料的應用程式 | 效能好(高併發),靈活性高,複雜性低,資料結構靈活 提供嵌入式文件功能,將經常查詢的資料儲存在同一個文件中 既可以根據鍵來構建索引,也可以根據內容構建索引 | 缺乏統一的查詢語法 |
圖形資料庫 | 專門用於處理具有高度相互關聯關係的資料,比較適合於社交網路、模式識別、依賴分析、推薦系統以及路徑尋找等問題 | 靈活性高,支援複雜的圖形演算法,可用於構建複雜的關係圖譜 | 複雜性高,只能支援一定的資料規模 |
19. JobTracker和TaskTracker的功能
MapReduce框架採用了Master/Slave架構,包括一個Master和若干個Slave。Master上執行JobTracker,Slave上執行TaskTracker
MapReduce體系結構主要由四個部分組成,分別是:Client、JobTracker、TaskTracker以及Task
-
Client
使用者編寫的MapReduce程式通過Client提交到JobTracker端
使用者可通過Client提供的一些介面檢視作業執行狀態
-
JobTracker
JobTracker負責資源監控和作業排程
JobTracker 監控所有TaskTracker與Job的健康狀況,一旦發現失敗,就將相應的任務轉移到其他節點
JobTracker 會跟蹤任務的執行進度、資源使用量等資訊,並將這些資訊告訴任務排程器(TaskScheduler),而排程器會在資源出現空閒時,選擇合適的任務去使用這些資源
-
TaskTracker
TaskTracker 會週期性地通過“心跳”將本節點上資源的使用情況和任務的執行進度彙報給JobTracker,同時接收JobTracker 傳送過來的命令並執行相應的操作(如啟動新任務、殺死任務等)
TaskTracker 使用“slot”等量劃分本節點上的資源量(CPU、記憶體等)。一個Task 獲取到一個slot 後才有機會執行,而Hadoop排程器的作用就是將各個TaskTracker上的空閒slot分配給Task使用。slot 分為Map slot 和Reduce slot 兩種,分別供MapTask 和Reduce Task 使用
-
Task
Task 分為Map Task 和Reduce Task 兩種,均由TaskTracker 啟動
MapReduce框架採用了Master/Slave架構,包括一個Master和若干Slave,Master上執行JobTracker,Slave上執行TaskTracker,使用者提交的每個計算作業,會被劃分成若干個任務。JobTracker負責資源管理、任務排程和任務監控,會重新排程已經失敗的任務。TaskTracker負責執行由JobTracker指派的任務。
20. Map函式和Reduce函式的輸入、輸出以及處理過程
Map函式
Map函式的輸入來自於分散式檔案系統的檔案塊,這些檔案塊的格式是任意的,可以是文件,也可以是二進位制格式的。檔案塊是一系列元素的集合,這些元素也是任意型別的,同一個元素不能跨越檔案塊儲存。
Map函式將輸入的元素轉換成<key,value>形式的鍵值對,鍵和值的型別也是任意的,其中鍵沒有唯一性,不能作為輸出的身份標識。
Reduce函式
將輸入的一系列具有相同鍵的鍵值對以某種方式組合起來,輸出處理後的鍵值對,輸出結果會合併成一個檔案。
使用者指定Reduce任務的個數(如n個),並通知實現系統,然後主控程序通常會選擇一個Hash函式,Map任務輸出的每個鍵都會經過Hash函式計算,並根據雜湊結果將該鍵值對輸入相應的Reduce任務來處理。對於處理鍵為k的Reduce任務的輸入形式為<k,<v1,v2,...,vn>>,輸出為<k,V>
21. MapReduce的工作流程(需包括提交任務、Map、Shuffle、Reduce的過程)
- MapReduce框架使用InputeFormat模組做Map前的預處理,比如驗證輸入的格式是否符合輸入的定義;然後,將輸入檔案切分成邏輯上的多個InputSplit,InputSplit是MapReduce對檔案進行處理和運算的輸入單位,只是一個邏輯概念,每個InputSplit並沒有對檔案進行實際切割,只是記錄了要處理的資料的位置和長度
- 因為InputSplit是邏輯切分而非物理切分,所以還需要通過RecordReader(RR)根據InputSplit中的資訊來處理InputSplit中的具體記錄,載入資料並轉換為適合Map任務讀取的鍵值對,輸入給Map任務。
- Map任務會根據使用者自定義的對映規則,輸出一系列的<key,value>作為中間結果
- 為了讓Reduce可以並行處理Map的結果,需要對Map的輸出進行一定的分割槽、排序、合併、歸併等操作,得到<key,value-list>形式的中間結果,再交給對應的Reduce進行處理。
- Reduce以一系列<key,value-list>中間結果作為輸入,執行使用者定義的邏輯,輸出結果給OutputFormat模組
- OutputFormat模組會驗證輸出目錄是否已經存在以及輸出結果型別是否符合配置檔案中的配置型別,如果都滿足,就輸出Reduce的結果到分散式檔案系統
22. Map端和Reduce端的Shuffle過程(包括Spill、Sort、Merge、Fetch的過程)
1.Map端的shuffle過程
-
輸入資料和執行Map任務
-
寫入快取
每個Map任務都分配有一個快取,Map的輸出結果不是立即寫入磁碟,而是首先寫入快取。在快取中累積一定數量的Map輸出結果以後,在一次性批量寫入磁碟,大大減少磁碟I/O開銷。在寫入快取之前,key與value值都會被序列化成位元組陣列
-
溢寫(分割槽、排序、合併)
快取預設大小100MB。通常會設定一個溢寫比例(比如0.8),快取達到閾值時(100MB用了80MB),就會啟動溢寫操作(Spill),把快取中的內容一次性寫入磁碟並清空快取。溢寫的過程由另外一個單獨的後臺執行緒來完成,不影響Map結果寫入快取。
在溢寫到磁碟之前,快取中的資料首先會被分割槽。快取中的資料是<key,value>鍵值對,MapReduce通過partitioner介面對這些鍵值進行分割槽,預設採用Hash函式對key進行雜湊後再用Reduce任務的數量取模。
對於每個分割槽內的鍵值對,後臺執行緒會根據key對他們進行記憶體排序(sort),排序是MapReduce的預設操作。排序結束後,還包含一個可選的合併(combine)操作。
“合併”是指將那些具有相同key的<key,value>的value加起來。比如<"xmu",1>和<"xmu",1>,經過合併操作以後得到一個鍵值對<"xmu",2>,減少了鍵值對的數量。Map端的合併操作與Reduce類似,但是由於操作發生在Map段,所以稱之為“合併”,從而有別於Reduce。
經過分割槽、排序以及可能發生的合併操作之後,快取中的鍵值對就可以被寫入磁碟,並清空快取。每次溢寫操作都會生成一個新的溢寫檔案
-
檔案歸併
溢寫檔案會越來越多,在Map任務全部結束之前,系統會對所有溢寫檔案中的資料進行歸併(Merge),生成一個大的溢寫檔案,大的溢寫檔案中的所有鍵值對也是經過分割槽和排序的。
“歸併”是指具有相同key的鍵值對會被歸併成一個新的鍵值對。具體而言,對於若干個具有相同key的鍵值對<k,v1>,<k,v2>,......,<k,vn>會被歸併成一個新的鍵值對<k,<v1,v2,...,vn>>
另外進行檔案歸併時,如果磁碟中生成的溢寫檔案的數量超過引數min.num.spills.forcombine的值(預設是3),那麼就可以再次執行Combiner函式,對資料進行合併操作,從而減少寫入磁碟的資料量。
經過上述4個步驟,Map段的Shuffle過程全部完成,最終生成的一個大檔案會被存放在本地磁碟上。這個大檔案中的資料是被分割槽的,不同分割槽會被髮送到不同的Reduce任務進行並行處理。JobTracker會一直監測Map任務的執行,當檢測到一個Map任務完成後,就會立刻通知相關的Reduce任務來領取資料,然後開始Reduce段的Shuffle
2.Reduce端的Shuffle
從Map端讀取Map結果,然後執行歸併操作,最後輸送給Reduce任務進行處理。
-
“領取”資料
Map端的Shuffle過程結束後,所有Map輸出結果都儲存在Map機器的本地磁碟上,Reduce任務需要把這些資料“領取”(Fetch)回來存放在自己所在機器的本地磁碟上。因此,在每個Reduce任務真正開始之前,它大部分時間都在從Map端把屬於自己處理的那些分割槽的資料“領取”過來。每個Reduce任務會不斷地通過RPC向JobTracker詢問Map任務是否已經完成;JobTracker檢測到一個Map任務完成後,就會通知相關的Reduce任務來“領取資料”;一旦一個Reduce任務收到JobTracker的通知,它就會到Map任務所在機器上把屬於自己處理的分割槽資料領取到本地磁碟中。一般系統中會有多個Map機器,因此Reduce任務會使用多個執行緒同時從多個Map機器領回資料
-
歸併資料
從Map端領回的資料首先被存放在Reduce任務所在機器的快取中,如果快取被佔滿,就溢寫到磁碟中。當溢寫過程啟動時,具有相同key的鍵值對會被歸併(Merge),如果使用者定義了Combiner,則歸併後的資料還可以執行合併操作,減少寫入磁碟的資料量。溢寫結束後會在磁碟上生成一個溢寫檔案。當所有資料都被領回時,多個溢寫檔案會被歸併成一個大檔案,歸併時還會對鍵值對進行排序。每輪歸併操作可以歸併的檔案數量是由引數io.sort.factor的值來控制(預設10)。(假設50個溢寫檔案,每輪歸併10個,則需要經過5輪歸併,得到5個歸併後的大檔案,不會繼續歸併成一個新的大檔案)
-
把資料輸入給Reduce任務
磁碟中經過多輪歸併後得到若干大檔案,直接輸入給Reduce任務。
23. MapReduce程式的中間結果儲存在本地磁碟而不是HDFS上有何優缺點
因為Map的輸出是中間的結果,這個中間結果是由reduce處理後才產生最終輸出結果,而且一旦作業完成,map的輸出結果就可以刪除。如果把它儲存在hdfs中就並備份,有些小題大作,如果執行map任務的節點將map中間結果傳送給reduce任務之前失敗,hadoop將在另一個節點上重新執行這個map任務以再次構建中間結果。
24. 在YARN框架中執行一個MapReduce程式時,從提交到完成需要經歷的具體步驟
YARN包括ResourceManager、ApplicationMaster和NodeManager。ResourceManage負責資源管理,ApplicationMaster負責任務排程和監控,NodeManager負責執行原TaskTracker的任務。
ResourceManager
功能:
- 處理客戶端請求
- 啟動/監控ApplicationMaster
- 監控NodeManager
- 資源分配與排程
包括兩個元件:排程器(Scheduler)和應用程式管理器(Applications Manager)。
排程器主要負責資源管理和分配。排程器接受來自ApplicationMaster的應用程式資源請求,並根據容量、佇列等限制條件,把叢集中的資源以“容器”的形式分配給提出申請的應用程式,容器的選擇通常會考慮應用程式所要處理的資料的位置,就近選擇(計算向資料靠攏)。每個容器封裝有一定數量的CPU、記憶體、磁碟等資源。
應用程式管理器負責系統中所有應用程式的管理工作,主要包括應用程式提交、與排程器協商資源以啟動ApplicationMaster、監控ApplicationMaster執行狀態並在失敗時重新啟動等
在Hadoop平臺上,使用者的應用程式是以作業(Job)的形式提交的,然後一個作業會被分解成多個任務(包括Map任務和Reduce任務)進行分散式執行。ResourceManager接受使用者提交的作業,按照作業的上下文資訊以及從NodeManager收集來的容器狀態資訊,啟動排程過程,為使用者作業啟動一個ApplicationMaster。
ApplicationMaster
- 為應用程式申請資源,並分配給內部任務
- 任務排程、監控與容錯
主要功能:
- 當用戶作業提交時,ApplicationMaster與ResourceManager協商獲取資源,ResourceManager會以容器的形式為ApplicationMaster分配資源
- 把獲得的資源進一步分配給內部的各個任務(Map任務和Reduce任務),實現資源的“二次分配”
- 與NodeManager保持互動通訊進行應用程式的啟動、執行、監控和停止,監控申請到的資源的使用情況,對所有任務的執行進度和狀態進行監控,並在任務發生失敗時執行失敗恢復(重新申請資源重啟任務)
- 定時向ResourceManager傳送“心跳”訊息,報告資源的使用情況和應用的進度資訊;
- 當作業完成時,ApplicationMaster向ResourceManager登出容器,執行週期完成。
NodeManager
- 單個節點上的資源管理
- 處理來自ResourceManger的命令
- 處理來自ApplicationMaster的命令
NodeManager是駐留在一個YARN叢集中的每個節點上的代理,主要負責:
- 容器生命週期管理
- 監控每個容器的資源(CPU、記憶體等)使用情況
- 跟蹤節點健康狀況,並以“心跳”的方式與ResourceManager保持通訊
- 向ResourceManager彙報作業的資源使用情況和每個容器的執行狀態
- 接收來自ApplicationMaster的啟動/停止容器的各種請求
需要說明的是,NodeManager主要負責管理抽象的容器,只處理與容器相關的事情,而不具體負責每個任務(Map任務或Reduce任務)自身狀態的管理,因為這些管理工作是由ApplicationMaster完成的,ApplicationMaster會通過不斷與NodeManager通訊來掌握各個任務的執行狀態
在叢集部署方面,YARN的各個元件是和Hadoop叢集中的其他元件進行統一部署的
-
使用者編寫客戶端應用程式,向YARN提交應用程式,提交的內容包括ApplicationMaster程式、啟動ApplicationMaster的命令、使用者程式等。
-
YARN中的ResourceManager負責接收和處理來自客戶端的請求。接到客戶端應用程式請求後,ResourceManager裡面的排程器會為應用程式分配一個容器。同時,ResourceManager的應用程式管理器會與該容器所在的NodeManager通訊,為該應用程式在該容器中啟動一個ApplicationMaster
-
ApplicationMaster被建立後會首先向ResourceManager註冊,從而使得使用者可以通過ResourceManager來直接檢視應用程式的執行狀態
-
ApplicationMaster採用輪詢的方式通過RPC協議向ResourceManager申請資源。
-
ResourceManager以“容器”的形式向提出申請的ApplicationMaster分配資源,一旦ApplicationMaster申請到資源後,就會與該容器所在的NodeManager進行通訊,要求它啟動任務。
-
當ApplicationMaster要求容器啟動任務時,它會為任務設定好執行環境(包括環境變數、JAR包、二進位制程式等),然後將任務啟動命令寫到一個指令碼中,最後通過在容器中執行該指令碼來啟動任務。
-
各個任務通過某個RPC協議向ApplicationMaster彙報自己的狀態和進度,讓ApplicationMaster可以隨時掌握各個任務的執行狀態,從而可以在任務失敗時重啟任務。
-
應用程式執行完成後,ApplicationMaster向ResourceManager的應用程式管理器登出並關閉自己。若ApplicationMaster因故失敗,ResourceManager中的應用程式管理器會監測到失敗的情形,然後將其重新啟動,直到所有任務執行完畢。
25. HDFS中“塊池”的概念,HDFS聯邦中的一個名稱節點失效會否影響為其他名稱節點提供服務?
HDFS聯邦中擁有多個獨立的名稱空間,其中每一個名稱空間管理屬於自己的一組塊,這些屬於同一個名稱空間的塊構成一個“塊池”(Block Pool)。每個資料結點會為多個塊池提供塊的儲存。資料結點是一個物理概念,塊池是邏輯概念,一個塊池是一組塊的邏輯集合,塊池中的各個塊實際上是儲存在各個不同的資料節點中的。因此HDFS聯邦中的一個名稱節點的失效也不會影響到與它相關的資料節點繼續為其他名稱節點服務
26. Spark的幾個主要概念:RDD、DAG、階段、分割槽、窄依賴、寬依賴
RDD:彈性分散式資料集(Resilient Distributed Dataset),是分散式記憶體的一個抽象概念,提供了一種高度受限的共享記憶體模型
DAG:有向無環圖(Directed Acyclic Graph),反映RDD之間的依賴關係
Executor:執行中工作結點(Worker Node)上的一個程序,負責執行任務,併為應用程式儲存資料
應用:使用者編寫的Spark程式
作業(Job):一個作業包含多個RDD及作用於相應RDD上的各種操作
階段(Stage):作業排程的基本單位,一個作業會分為多組任務,每組任務被稱為“階段”,或者也被稱為“任務集”
任務(Task):執行在Executor上的工作單元
分割槽:一個RDD就是一個分散式物件集合,本質上是一個只讀的分割槽記錄集合,每個RDD可以分成多個分割槽,每個分割槽就是一個數據集片段。
窄依賴:一個父RDD的分割槽對應一個子RDD的分割槽,或者多個父RDD的分割槽對應一個RDD的分割槽。
寬依賴:一個父RDD的分割槽對應一個子RDD的多個分割槽
27. Spark對RDD的操作主要分為:行動和轉換
行動(Action):在資料集上進行運算,返回計算結果。(接受RDD返回非RDD)如count,collect
轉換(Transformation):指定RDD之間的相互依賴關係。(接受RDD並返回RDD)如map,filter,groupBy
28. Hadoop MapReduce的缺陷,並說明Spark具有哪些優點?
Hadoop MapReduce
- 表達能力有限。計算都必須要轉換成Map和Reduce兩個操作,難以描述複雜的資料處理過程
- 磁碟IO開銷大。每次執行時候都需要從磁碟讀取資料,並且在計算完成後需要將中間結果寫入到磁碟。
- 延遲高。以此計算可能需要分解成一系列按順序執行的MapReduce任務,任務之間的銜接由於涉及到IO開銷,會產生較高延遲。而且,在前一個任務開始之前,其他任務無法開始,因此難以勝任複雜、多階段的計算任務
Spark
- Spark的計算模式也屬於MapReduce,但不侷限於Map和Reduce操作,還提供了多種資料集操作型別,程式設計模型比MapReduce更靈活;
- Spark提供了記憶體計算,中間結果直接存放記憶體中,帶來更高的迭代運算效率;
- Spark基於DAG的任務排程執行機制,要優於MapReduce的迭代執行機制
Spark採用的Executor利用多執行緒來執行具體任務(Hadoop MapReduce採用程序模型),減少任務的啟動開銷
Executor中有一個BlockManager儲存模組,會將記憶體和磁碟共同作為儲存裝置,當需要多輪迭代計算時,可以將中間結果儲存到這個儲存模組裡,不需要讀寫到HDFS等檔案系統裡,減少IO開銷;或者在互動式查詢場景下,預先將表快取到該儲存系統上,提高讀寫IO效能
29. MapReduce的框架、小批量處理、以及流資料處理
30. 流計算流程與傳統的資料處理流程
傳統的資料處理流程:採集資料,儲存在關係資料庫等資料管理系統中,使用者通過查詢操作與資料管理系統進行互動
流計算處理流程:資料實時採集、資料實時計算和實時查詢服務。
資料實時採集
資料採集系統的基本架構一般有以下三個部分:
- Agent:主動採集資料,並把資料推送到Collector部分
- Collector:接收多個Agent的資料,並實現有序、可靠、高效能的轉發
- Store:儲存Collector轉發過來的資料(對於流計算不儲存資料)
資料實時計算
資料實時計算階段對採集的資料進行實時的分析和計算,並反饋實時結果
經流處理系統處理後的資料,可視情況進行儲存,以便之後再進行分析計算。在時效性要求較高的場景中,處理之後的資料也可以直接丟棄
實時查詢服務
- 實時查詢服務:經由流計算框架得出的結果可供使用者進行實時查詢、展示或儲存
- 傳統的資料處理流程,使用者需要主動發出查詢才能獲得想要的結果。而在流處理流程中,實時查詢服務可以不斷更新結果,並將使用者所需的結果實時推送給使用者
- 雖然通過對傳統的資料處理系統進行定時查詢,也可以實現不斷地更新結果和結果推送,但通過這樣的方式獲取的結果,仍然是根據過去某一時刻的資料得到的結果,與實時結果有著本質的區別
流處理系統與傳統的資料處理系統有如下不同:
- 流處理系統處理的是實時的資料,而傳統的資料處理系統處理的是預先儲存好的靜態資料
- 使用者通過流處理系統獲取的是實時結果,而通過傳統的資料處理系統,獲取的是過去某一時刻的結果
- 流處理系統無需使用者主動發出查詢,實時查詢服務可以主動將實時結果推送給使用者
31. 單詞統計:採用MapReduce框架與採用Storm框架區別?
Streams
Storm將流資料Stream描述成一個無限的Tuple序列,這些Tuple序列會以分散式的方式並行地建立和處理
每個tuple是一堆值,每個值有一個名字,並且每個值可以是任何型別
Tuple本來應該是一個Key-Value的Map,由於各個元件間傳遞的tuple的欄位名稱已經事先定義好了,所以Tuple只需要按序填入各個Value,所以就是一個Value List(值列表)
spout
Storm認為每個Stream都有一個源頭,並把這個源頭抽象為Spout
通常Spout會從外部資料來源(佇列、資料庫等)讀取資料,然後封裝成Tuple形式,傳送到Stream中。Spout是一個主動的角色,在介面內部有個nextTuple函式,Storm框架會不停的呼叫該函式
Bolts
Storm將Streams的狀態轉換過程抽象為Bolt。Bolt既可以處理Tuple,也可以將處理後的Tuple作為新的Streams傳送給其他Bolt
Bolt可以執行過濾、函式操作、Join、操作資料庫等任何操作
Bolt是一個被動的角色,其介面中有一個execute(Tuple input)方法,在接收到訊息之後會呼叫此函式,使用者可以在此方法中執行自己的處理邏輯
Topology
Storm將Spouts和Bolts組成的網路抽象成Topology,它可以被提交到Storm叢集執行。Topology可視為流轉換圖,圖中節點是一個Spout或Bolt,邊則表示Bolt訂閱了哪個Stream。當Spout或者Bolt傳送元組時,它會把元組傳送到每個訂閱了該Stream的Bolt上進行處理
Topology裡面的每個處理元件(Spout或Bolt)都包含處理邏輯, 而元件之間的連線則表示資料流動的方向
Topology裡面的每一個元件都是並行執行的
在Topology裡面可以指定每個元件的並行度, Storm會在叢集裡面分配那麼多的執行緒來同時計算
在Topology的具體實現上,Storm中的Topology定義僅僅是一些Thrift結構體(二進位制高效能的通訊中介軟體),支援各種程式語言進行定義
Stream Groupings
Storm中的Stream Groupings用於告知Topology如何在兩個元件間(如Spout和Bolt之間,或者不同的Bolt之間)進行Tuple的傳送。箭頭表示Tuple的流向,圓圈表示任務。每一個Spout和Bolt都可以有多個分散式任務,一個任務在什麼時候、以什麼方式傳送Tuple就是由Stream Groupings來決定的
Mapreduce使用的是Map和Reduce的抽象,而Storm使用的是Spout和Bolts 的抽象,Mapreduce作業(Job)會最終完成計算並結束任務,而Storm框架的Topology會一直處理訊息直到人為停止
32. Storm叢集中的Master和Worker節點各自執行什麼後臺程序,這些程序又分別負責什麼工作?
Storm叢集採用“Master-Worker”的節點方式。
Master節點執行名為“Nimbus”的後臺程式,負責在叢集範圍內分發程式碼,為Worker分配任務和監測故障
每個Worker節點執行名為“Supervisor”的後臺程式,負責監聽分配給它所在機器的工作,即根據Nimbus分配的任務來決定啟動或停止Worker程序。一個Worker節點上同時執行若干個Worker程序。
33. Nimbus程序和Supervisor程序意外終止後,重啟時是否能恢復到終止之前的狀態,為什麼?
重啟時能恢復到終止之前的狀態
Master節點並沒有直接和Worker節點通訊而是藉助Zookeeper將資訊狀態存放在Zookeeper中或本地磁碟中,以便節點故障時快速恢復。這意味著若Nimbus程序或Supervisor程序意外終止,重啟時也能讀取、恢復之前的狀態並繼續工作。這種設計使得Storm極其穩定。
34. MapReduce Job與Storm Topology
Mapreduce作業最終會完成計算並結束執行,而Topolopy將持續處理訊息直到人為停止。
35. 協同過濾演算法:基於使用者的協同過濾和基於物品的協同過濾
基於使用者的協同過濾(UserCF)
1992年被提出,是推薦系統中最古老的演算法
UserCF演算法符合人們對於“趣味相投”的認知,即興趣相似的使用者往往有相同的物品喜好:當目標使用者需要個性化推薦時,可以先找到和目標使用者有相似興趣的使用者群體,然後將這個使用者群體喜歡的、而目標使用者沒有聽說過的物品推薦給目標使用者
UserCF演算法的實現主要包括兩個步驟:
第一步:找到和目標使用者興趣相似的使用者集合
第二步:找到該集合中的使用者所喜歡的、且目標使用者沒有聽說過的物品推薦給目標使用者
實現UserCF演算法的關鍵步驟是計算使用者與使用者之間的興趣相似度。目前較多使用的相似度演算法有:
- 泊松相關係數(Person Correlation Coefficient)
- 餘弦相似度(Cosine-based Similarity)
- 調整餘弦相似度(Adjusted Cosine Similarity)
給定使用者u和使用者v,令N(u)表示使用者u感興趣的物品集合,令N(v)為使用者v感興趣的物品集合,則使用餘弦相似度進行計算使用者相似度的公式為:
由於很多使用者相互之間並沒有對同樣的物品產生過行為,因此其相似度公式的分子為0,相似度也為0
我們可以利用物品到使用者的倒排表(每個物品所對應的、對該物品感興趣的使用者列表),僅對有對相同物品產生互動行為的使用者進行計算
如圖,喜歡物品C的使用者包括a,c,因此w[a][c]和w[c][a]都加1。對每個物品計算的使用者列表計算完成後,得到使用者相似度矩陣W。w[u][v]即為\(w_{uv}\)的分子部分,將w[u][v]除以分母便可得到使用者相似度\(w_{uv}\)
得到使用者相似度後,再用如下公式來度量使用者u對物品i的興趣程度p(u,i)。
其中,S(u, K)是和使用者u興趣最接近的K個使用者的集合,N(i)是喜歡物品i的使用者集合,\(w_{uv}\)是使用者u和使用者v的相似度,\(r_{vi}\)是隱反饋資訊,代表使用者v對物品i的感興趣程度,為簡化計算可令\(r_{vi}=1\)
對所有物品計算\(p_{ui}\)後,可以對\(p_{ui}\)進行降序處理,取前N個物品作為推薦結果展示給使用者u(稱為Top-N推薦)
基於物品的協同過濾
基於物品的協同過濾演算法(簡稱ItemCF演算法)是目前業界應用最多的演算法。
ItemCF演算法是給目標使用者推薦那些和他們之前喜歡的物品相似的物品。ItemCF演算法主要通過分析使用者的行為記錄來計算物品之間的相似度
該演算法基於的假設是:物品A和物品B具有很大的相似度是因為喜歡物品A的使用者大多也喜歡物品B。例如,該演算法會因為你購買過《資料探勘導論》而給你推薦《機器學習實戰》,因為買過《資料探勘導論》的使用者多數也購買了《機器學習實戰》
ItemCF演算法與UserCF演算法類似,計算也分為兩步:
第一步:計算物品之間的相似度;
第二步:根據物品的相似度和使用者的歷史行為,給使用者生成推薦列表。
ItemCF演算法通過建立使用者到物品倒排表(每個使用者喜歡的物品的列表)來計算物品相似度
如果使用者a喜歡物品A和C,則在\(M_a[A][C]\)和\(M_a[C][A]\)上都加1。得到每個使用者的物品相似度矩陣。將所有使用者的物品相似度矩陣\(M_u\)相加得到最終的物品相似度矩陣R,其中\(R[i][j]\)記錄了同時喜歡物品i和物品j的使用者數。將矩陣R歸一化,便可得到物品間的餘弦相似度矩陣W。
得到物品相似度後,在使用如下公式來度量使用者u對物品j的興趣程度
其中,S(j, K)是和物品j最相似的K個物品的集合,N(u)是使用者u喜歡的物品的集合,\(w_{ji}\)代表物品 \(i\) 和物品 \(j\) 的相似度,\(r_{ui}\) 是隱反饋資訊,代表使用者 \(u\) 對物品 \(i\) 的感興趣程度,為簡化計算可令\(r_{ui}=1\)
36. UserCF演算法和ItemCF演算法的實現步驟、應用場景、以及優缺點
應用場景:
UserCF演算法的推薦更偏向於社會化:適合應用於新聞推薦、微博話題推薦等應用場景,推薦結果在新穎性方面有一定優勢
ItemCF演算法的推薦更偏向於個性化:適合用於電子商務、電影、圖書等應用場景,可以利用使用者的歷史行為給推薦結果做出解釋,讓使用者更為信服推薦的結果
缺點:
UserCF:隨著使用者數目的增大,使用者相似度計算複雜度越來越高,而且UserCF推薦結果相關性較弱,難以對推薦結果作出解釋,容易受大眾影響而推薦熱門物品
ItemCF:傾向於推薦於使用者已購買商品相似的商品,往往會出現多樣性不足、推薦新穎度較低的問題
四、程式設計題
Scala程式設計、讀取HDFS中指定檔案(包括讀寫許可權、大小、建立時間、路徑等資訊,以及建立目錄,建立,刪除檔案)、使用MapReduce程式設計技術對文件內容建立索引庫、統計單詞、建立表格(包括插入、求值等)運用HBase的API建立表格、MySQL資料庫建立資料庫、建立表等。
基礎HDFS操作
hdfs dfs -mkdir [-p] path
建立資料夾在path路徑上
hdfs dfs -ls [path]
讀取hdfs中全部目錄/[path]路徑下所有目錄,包括檔案的讀寫許可權、大小、生成時間、路徑
hdfs dfs -rm -r path
刪除目錄path,-r表示刪除目錄下所有子目錄和內容。
hdfs dfs -put localpath/file hdfspath
將本地file檔案上傳到hdfs的hdfspath目錄下
hdfs dfs -cat hdfspath/file
檢視hdfspath下的file檔案
hdfs dfs -get hdfspath/file localpath
將hdfspath下的file檔案下載到localpath本地路徑下
hdfs dfs -cp hdfspath/file hdfspath2
將hdfspath下的file複製到hdfspath2目錄下
Java api操作
設定路徑等初始化(獲取檔案系統 get file system:fs)
//包 org.apache.hadoop.conf.Configuration
Configuration conf = new Configuration();
conf.set(“fs.defaultFS”,”hdfs://localhost:9000”); //設定file system路徑
FileSystem fs = FileSystem.get(new URI(“hdfs://localhost:9000”), conf, “username”)
上傳檔案
fs.copyFromLocalFile(new Path(“C:/test.txt”),new Path(“/test.txt”));
下載檔案
/***
parameters
Boolean delSrc 是否刪除原檔案
Path src 要下載的檔案
Path dst 檔案下載到的路徑
Boolean useRawLocalFileSystem 是否開啟檔案校驗
***/
fs.copyToLocalFile(false, new Path(“/test.txt”), new Path(“C:/test.txt”), true);
刪除資料夾
引數2為true時,刪除非空目錄才能成功
fs.delete(new Path(“/path/”), true);
修改檔案命名
fs.rename(new Path(“/test.txt”) , new Path(“/test2.txt”));
檢視檔案詳情
RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(New Path(“/”), true);
while(listFiles.hasNext()){
LocatedFileStatus status = listFiles.next();
status.getPath();//路徑
status.getPath().getName();//檔名
status.getLen();//長度
status.getPermission();//許可權
status.getGroup();//分組
}
檔案上傳
//建立輸入流
FileInputStream fis = new FileInputStream(new File(“C:/test.txt”));
//建立輸出流
FSDataOutputStream fos = fs.create(new Path(“/test.txt”));
//流對拷
IOUtils.copyBytes(fis, fos, conf);
檔案下載
//建立輸出流
FileOutputStream fos = new FileOutputStream(new File(“C:/test.txt”));
//建立輸入流
FSDataInputStream fis = fs.create(new Path(“/test.txt”));
//流對拷
IOUtils.copyBytes(fis,fos, conf);
Mapreduce操作
wordcount
Map過程
//首先繼承Mapper實現Map過程
public static class Map extends Mapper<Object,Text, Text, IntWritable>{
private static Text text = new Text();
//過載map方法
public void map(Object key, Text value, Context context) throw IOException,InterruptException{
StringTokenizer itr = new StringTokenizer(value.toString());
while(itr.hasMoreTokens){
this.text.set(itr.nextToken());
context.write(text, 1);
}
}
}
得到<word,1>的若干中間結果
Reduce過程
//繼承實現Reducer
public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable>{
private Intwritable result = new Intwritable();
//重寫reduce方法
public void reduce (Text key, Iterable<IntWritable>values, Context context ){
int sum = 0;
IntWritable val;
for(Iterator i$=values.iterator();i$.hasNext();sum+=val.get()){
val = (IntWritable)i$.next();
}
this.result.set(sum);
context.write(key,result);
}
}
Main函式中
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
conf.set(“fs.defaultFS”,”hdfs://localhost:9000”);
//宣告job
Job job = Job.getInstance(conf);
job.setJarByClass(類名.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job,new Path(“hdfs://localhost:9000/input”));
FileOutputFormat.addOutputPath(job,new Path(“hdfs://localhost:9000/output”));
System.exit(job.waitForCompletion(true)?0:1);
}
合併、去重
package MapReduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class Merge {
public static class Map extends Mapper<Object, Text, Text, Text> {
private static Text text = new Text();
public void map(Object key, Text value, Context content) throws IOException, InterruptedException {
text = value;
content.write(text, new Text(""));
}
}
public static class Reduce extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
context.write(key, new Text(""));
}
}
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
conf.set("fs.defaultFS","hdfs://BigData1:9000");
Job job = Job.getInstance(conf);
job.setJarByClass(Merge.class);
job.setJobName("Merge");
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path("hdfs://BigData1:9000/ex/ex2/input"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://BigData1:9000/ex/ex2/output"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
HBase
Shell程式設計
create ‘表名’,’列族1’,’列族2’,…’列族n’
craete 'user','info1','info2'
檢視所有表
list
判斷表是否存在
exists 'user'
掃描整個表
scan 'user'
描述表
describe 'user'
查詢記錄數,rowkey相同只算一條
count 'user'
查詢某一行
get 'user','1'
查詢某行的某個列族
get 'user','1','info'
查詢某行的某個列
get 'user','1','info:Age'
刪除列族下的某一列
delete 'user','1','info:age'
刪除某一列族
delete 'user','1','info'
刪除一行
deleteall 'user','1'
清空表
truncate 'user'
查詢表中rowkey行列族info的num個歷史版本
get ‘user’ , ‘1’ , {COLUMN=>’info’ , VERSIONS=>NUM}
API程式設計
初始化
public static Configuration conf;
public static Connection connection;
public static Admin admin;
public static void init() {
conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum","127.0.0.1");
conf.set("hbase.zookeeper.property,clienPort","2181");
try {
connection = ConnectionFactory.createConnection(conf);
admin = connection.getAdmin();
} catch (IOException e) {
e.printStackTrace();
}
}
public static void close() {
try {
if (admin != null) {
admin.close();
}
if (null != connection) {
connection.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
刪除行
/*
* 刪除表tableName中row指定的行的記錄。
*/
public static void deleteRow(String tableName, String row) throws IOException {
Table table = connection.getTable(TableName.valueOf(tableName));
Delete delete = new Delete(row.getBytes());
table.delete(delete);
table.close();
System.out.println("刪除行:" + row + " 成功");
}
修改表
/*
* 修改表tableName,行row(可以用學生姓名S_Name表示),列column指定的單元格的資料
*/
public static void modifyData(String tableName, String row, String column, String data) throws IOException {
Table table = connection.getTable(TableName.valueOf(tableName));
Put put = new Put(row.getBytes());
String[] cols = column.split(":");
put.addColumn(cols[0].getBytes(),cols[1].getBytes(),data.getBytes());
table.put(put);
table.close();
System.out.println("修改成功!");
}
掃描某一列
/*
* 瀏覽表tableName某一列的資料,如果某一行記錄中該列資料不存在,則返回null。
* 要求當引數column為某一列族名稱時,如果底下有若干個列限定符,則要列出每個列限定符代表的列的資料;
* 當引數column為某一列具體名稱(例如“Score:Math”)時,只需要列出該列的資料
*/
public static void scanColumn(String tableName, String column) throws IOException {
Table table = connection.getTable(TableName.valueOf(tableName));
Scan scan = new Scan();
String[] cols = column.split(":");
if (cols.length == 1){
scan.addFamily(column.getBytes());
}
else{
scan.addFamily(cols[0].getBytes());
scan.addColumn(cols[0].getBytes(),cols[1].getBytes());
}
ResultScanner scanner = table.getScanner(scan);
for (Result result = scanner.next(); result != null; result = scanner.next()) {
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
System.out.print("行鍵" + Bytes.toString(CellUtil.cloneRow(cell)));
System.out.print("\t列族" + Bytes.toString(CellUtil.cloneFamily(cell)));
System.out.print("\t列" + Bytes.toString(CellUtil.cloneQualifier(cell)));
System.out.println("\t值" + Bytes.toString(CellUtil.cloneValue(cell)));
}
}
table.close();
}
新增資料
/*
* 向表tableName、行row(用S_Name表示)和字串陣列fields指定的單元格中新增對應的資料values。
* 其中,fields中每個元素如果對應的列族下還有相應的列限定符的話,用“columnFamily:column”表示。
* 例如,同時向“Math”、“Computer Science”、“English”三列新增成績時,
* 字串陣列fields為{“Score:Math”, ”Score:Computer Science”, ”Score:English”},陣列values儲存這三門課的成績。
*/
public static void addRecord(String tableName, String row, String[] fields, String[] values) throws IOException {
Table table = connection.getTable(TableName.valueOf(tableName));
for (int i = 0; i < fields.length; i++) {
Put put = new Put(row.getBytes());
String[] cols = fields[i].split(":");
put.addColumn(cols[0].getBytes(), cols[1].getBytes(), values[i].getBytes());
table.put(put);
}
table.close();
System.out.println("新增成功!");
}
建立表
/*
* 建立表,引數tableName為表的名稱,字串陣列fields為儲存記錄各個欄位名稱的陣列。
* 要求當HBase已經存在名為tableName的表的時候,先刪除原有的表,然後再建立新的表
*/
public static void createTable(String tableName, String[] fields) throws IOException {
HBaseAdmin hBaseAdmin = new HBaseAdmin(conf);
if (admin.tableExists(TableName.valueOf(tableName))){ //表已存在則刪除表
System.out.println("表已存在!\n刪除已存在的表");
hBaseAdmin.disableTable(tableName);
hBaseAdmin.deleteTable(tableName);
}
HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));
for (String item:fields)
hTableDescriptor.addFamily(new HColumnDescriptor(item));
hBaseAdmin.createTable(hTableDescriptor);
System.out.println("建立 " + tableName + " 成功");
}
Spark
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object work1 {
def main(args: Array[String]):Unit={
println("Hello Spark")
val sparkConf = new SparkConf().setMaster("local").setAppName("Ex4_Work1")
val sc : SparkContext = new SparkContext(sparkConf)
val fileRDD: RDD[String] = sc.textFile("Data01.txt")
// (1)該系總共有多少學生;
println("(1)該系總共有多少學生;")
val nameRDD = fileRDD.map(Item=>{Item.split(",")(0)})
val User_Count = nameRDD.distinct()
println(User_Count.count)
// (2)該系共開設來多少門課程;
println("(2)該系共開設來多少門課程;")
val courseRDD = fileRDD.map(Item=>{Item.split(",")(1)})
val Course_Count = courseRDD.distinct()
println(Course_Count.count)
// (3)Tom同學的總成績平均分是多少;
println("(3)Tom同學的總成績平均分是多少;")
val TomRDD = fileRDD.filter(Item=>{Item.split(",")(0)=="Tom"})
val Tom_Grade = TomRDD.map(Item=>(Item.split(",")(0),(Item.split(",")(2).toInt,1)))
val Tom_AVG_GRADE = Tom_Grade.reduceByKey((a,b)=>(a._1+b._1,a._2+b._2))
println(Tom_AVG_GRADE.mapValues(Item=>Item._1/Item._2).first())
// (4)求每名同學的選修的課程門數;
println("(4)求每名同學的選修的課程門數;")
val User_Course = fileRDD.map(Item=>(Item.split(",")(0),(Item.split(",")(1),1)))
val User_Course_Num = User_Course.reduceByKey((a,b)=>("course_num",a._2+b._2))
User_Course_Num.mapValues(Item=>(Item._2)).foreach(println)
// (5)該系DataBase課程共有多少人選修;
println("(5)該系DataBase課程共有多少人選修;")
val DataBase = fileRDD.filter(Item=>{Item.split(",")(1)=="DataBase"})
println(DataBase.count)
// (6)各門課程的平均分是多少;
println("(6)各門課程的平均分是多少;")
val Course_Score = fileRDD.map(Item=>(Item.split(",")(1),(Item.split(",")(2).toInt,1)))
val Course_Score_SUM = Course_Score.reduceByKey((a,b)=>(a._1+b._1,a._2+b._2))
val Course_Score_AVG = Course_Score_SUM.mapValues(a=>a._1/a._2)
Course_Score_AVG.foreach(println)
// (7)使用累加器計算共有多少人選了DataBase這門課。
println("(7)使用累加器計算共有多少人選了DataBase這門課。")
val User_DataBase = fileRDD.filter(Item=>{Item.split(",")(1) == "DataBase"}).map(Item=>((Item.split(",")(1),1)))
val acc = sc.longAccumulator("Acc")
User_DataBase.values.foreach(x => acc.add(x))
println(acc.value)
sc.stop()
}
}
本文來自部落格園,作者:silly丶,轉載請保留本文署名silly丶,並在文章頂部註明原文連結:https://www.cnblogs.com/EIPsilly/p/15728192.html