離線數倉(八)
第1章 Azkaban概論
1.1 為什麼需要工作流排程系統
1)一個完整的資料分析系統通常都是由大量任務單元組成:
Shell指令碼程式,Java程式,MapReduce程式、Hive指令碼等
2)各任務單元之間存在時間先後及前後依賴關係
3)為了很好地組織起這樣的複雜執行計劃,需要一個工作流排程系統來排程執行;
1.2 常見工作流排程系統
1)簡單的任務排程:直接使用Linux的Crontab來定義;
2)複雜的任務排程:開發排程平臺或使用現成的開源排程系統,比如Ooize、Azkaban、Airflow、DolphinScheduler等。
1.3 Azkaban 與Oozie對比
對市面上最流行的兩種排程器,給出以下詳細對比,以供技術選型參考。總體來說,Ooize相比Azkaban是一個重量級的任務排程系統,功能全面,但配置使用也更復雜。如果可以不在意某些功能的缺失,輕量級排程器Azkaban是很不錯的候選物件。
第2章 Azkaban入門
2.1 叢集模式安裝
2.1.1 上傳tar包(網盤下載地址:https://pan.baidu.com/s/1qQsD9XYk7O2cu_OXrYKPjA 提取碼:jkcy)
1)將azkaban-db-3.84.4.tar.gz,azkaban-exec-server-3.84.4.tar.gz,azkaban-web-server-3.84.4.tar.gz上傳到
2)新建/opt/module/azkaban目錄,並將所有tar包解壓到這個目錄下
mkdir /opt/module/azkaban
3)解壓azkaban-db-3.84.4.tar.gz、 azkaban-exec-server-3.84.4.tar.gz和azkaban-web-server-3.84.4.tar.gz到/opt/module/azkaban目錄下
tar -zxvf /opt/software/azkaban-db-3.84.4.tar.gz -C /opt/module/azkaban/
tar -zxvf /opt/software/azkaban-exec-server-3.84.4.tar.gz -C /opt/module/azkaban/
tar -zxvf /opt/software/azkaban-web-server-3.84.4.tar.gz -C /opt/module/azkaban/
4)進入到/opt/module/azkaban目錄,依次修改名稱
cd /opt/module/azkaban/
mv azkaban-exec-server-3.84.4/ azkaban-exec
mv azkaban-web-server-3.84.4/ azkaban-web
2.1.2 配置MySQL
1)正常安裝MySQL
(1)解除安裝自帶的Mysql-libs(如果之前安裝過mysql,要全都解除安裝掉)
rpm -qa | grep -i -E mysql\|mariadb | xargs -n1 sudo rpm -e --nodeps
(2)上面解除安裝的操作還是最好的,這裡用編寫指令碼的方式進行操作,新建remove_mysql.sh並賦予可執行許可權,執行指令碼即可
#!/bin/bash service mysql stop 2>/dev/null service mysqld stop 2>/dev/null rpm -qa | grep -i mysql | xargs -n1 rpm -e --nodeps 2>/dev/null rpm -qa | grep -i mariadb | xargs -n1 rpm -e --nodeps 2>/dev/null rm -rf /var/lib/mysql rm -rf /usr/lib64/mysql rm -rf /etc/my.cnf rm -rf /usr/my.cnf
chmod +x remove_mysql.sh
(3)將安裝包和JDBC驅動上傳到/opt/software,共計6個(https://pan.baidu.com/s/1wojPY6P6qxjdMtc90KxP-Q 提取碼:yuan)
01_mysql-community-common-5.7.29-1.el7.x86_64.rpm 02_mysql-community-libs-5.7.29-1.el7.x86_64.rpm 03_mysql-community-libs-compat-5.7.29-1.el7.x86_64.rpm 04_mysql-community-client-5.7.29-1.el7.x86_64.rpm 05_mysql-community-server-5.7.29-1.el7.x86_64.rpm mysql-connector-java-5.1.48.jar
(4)安裝MySql
ls *.rpm | xargs -n1 sudo rpm -ivh
(5)啟動mysql
sudo systemctl start mysqld
(6)檢視mysql密碼
sudo cat /var/log/mysqld.log | grep password
(7)用剛剛查到的密碼進入mysql(如果報錯,給密碼加單引號,不管查詢到的預設密碼是什麼直接全部複製就行)
mysql -uroot -p’password’
(8)設定複雜密碼(由於mysql密碼策略,此密碼必須足夠複雜)
set password=password("Qs23=zs32");
(9)更改mysql密碼策略
set global validate_password_length=4;
set global validate_password_policy=0;
(10)設定簡單好記的密碼
set password=password("root123");
(11)進入msyql庫
use mysql
(12)查詢user表
select user, host from user;
(13)修改user表,把Host表內容修改為%
update user set host="%" where user="root";
(14)重新整理
flush privileges;
(15)退出
quit;
2)連線MySQL(我的MySQL安裝在Hadoop102)
mysql -hhadoop102 -uroot -proot123
3)登陸MySQL,建立Azkaban資料庫
create database azkaban;
4)建立azkaban使用者並賦予許可權(此步可以跳過,後續配置使用root使用者)
(1)設定密碼有效長度4位及以上
set global validate_password_length=4;
(2)設定密碼策略最低級別
set global validate_password_policy=0;
(3)建立Azkaban使用者,任何主機都可以訪問Azkaban,密碼是azkaban123
CREATE USER 'azkaban'@'%' IDENTIFIED BY 'azkaban123';
(4)賦予Azkaban使用者增刪改查許可權
grant select,insert,update,delete on azkaban.*to 'azkaban'@'%' with grant option;
5)建立Azkaban表,完成後退出MySQL(將azkaban-db-3.84.4.tar.gz上傳至hadoop102的/opt/software/並解壓,因為我的mysql安裝在hadoop102上)
use azkaban;
source /opt/module/azkaban/azkaban-db-3.84.4/create-all-sql-3.84.4.sql;
quit;
6)更改MySQL包大小;防止Azkaban連線MySQL阻塞
sudo vim /etc/my.cnf
max_allowed_packet=1024M
7)重啟MySQL
sudo systemctl restart mysqld
8)檢視狀態
sudo systemctl status mysqld
2.1.3 配置Executor Server
Azkaban Executor Server處理工作流和作業的實際執行
1)編輯azkaban.properties
vim /opt/module/azkaban/azkaban-exec/conf/azkaban.properties
#修改如下屬性 default.timezone.id=Asia/Shanghai azkaban.webserver.url=http://hadoop102:8081 mysql.host=hadoop102 mysql.database=azkaban mysql.user=root mysql.password=root123 #在最後新增 executor.port=12321
2)同步azkaban-exec到所有節點
xsync /opt/module/azkaban/azkaban-exec
3)必須進入到/opt/module/azkaban/azkaban-exec路徑,分別在三臺機器上,啟動executor server
cd /opt/module/azkaban/azkaban-exec
#如果在/opt/module/azkaban/azkaban-exec目錄下出現executor.port檔案,說明啟動成功
bin/start-exec.sh
4)下面啟用executor(每臺節點都執行)
curl -G "hadoop102:12321/executor?action=activate" && echo
curl -G "hadoop103:12321/executor?action=activate" && echo
curl -G "hadoop104:12321/executor?action=activate" && echo
如果三臺機器都出現如下提示,則表示啟用成功:{"status":"success"}
2.1.4 配置Web Server
Azkaban Web Server處理專案管理,身份驗證,計劃和執行觸發。
1)編輯azkaban.properties(我自己是安裝在hadoop103叢集上)
vim /opt/module/azkaban/azkaban-web/conf/azkaban.properties
#修改如下屬性 default.timezone.id=Asia/Shanghai user.manager.xml.file=/opt/module/azkaban/azkaban-web/conf/azkaban-users.xml executor.global.properties=/opt/module/azkaban/azkaban-web/conf/global.properties mysql.host=hadoop102 mysql.database=azkaban mysql.user=root mysql.password=root123 #StaticRemainingFlowSize:正在排隊的任務數 #CpuStatus:CPU佔用情況 #MinimumFreeMemory:記憶體佔用情況,測試環境下,必須將MinimumFreeMemory刪除掉,否則它會認為叢集資源不夠,不執行 azkaban.executorselector.filters=StaticRemainingFlowSize,CpuStatus
2)修改azkaban-users.xml檔案,新增yuange使用者
vim /opt/module/azkaban/azkaban-web/conf/azkaban-users.xml
<azkaban-users> <user groups="azkaban" password="azkaban" roles="admin" username="azkaban"/> <user password="metrics" roles="metrics" username="metrics"/> <user groups="azkaban" password="azkaban123" roles="metrics,admin" username="yuange"/> <role name="admin" permissions="ADMIN"/> <role name="metrics" permissions="METRICS"/> </azkaban-users>
3)必須進入到hadoop103的/opt/module/azkaban/azkaban-web路徑,啟動webserver
cd /opt/module/azkaban/azkaban-web
bin/start-web.sh
4)訪問 http://hadoop103:8081 ,並用yuange使用者登陸
5)編寫azkaban叢集啟動/停止指令碼(在Hadoop103上編寫)
vim /home/atguigu/bin/azkaban.sh
#!/bin/bash #azkaban的一鍵啟動指令碼,只接收單個start或stop引數 if(($#!=1)) then echo 請輸入單個start或stop引數! exit fi #對傳入的單個引數進行校驗,且執行相應的啟動和停止命令 if [ $1 = start ] then #啟動executor xcall.sh "cd /opt/module/azkaban/azkaban-exec; bin/start-exec.sh " sleep 5s #啟用executor for i in hadoop102 hadoop103 hadoop104 do curl -G $i:12321/executor?action=activate && echo done #啟動web-server cd /opt/module/azkaban/azkaban-web bin/start-web.sh elif [ $1 = stop ] then cd /opt/module/azkaban/azkaban-web bin/shutdown-web.sh xcall.sh /opt/module/azkaban/azkaban-exec/bin/shutdown-exec.sh else echo 請輸入單個start或stop引數! fi xcall.sh jps
6)新增可執行許可權
chmod +x /home/atguigu/bin/azkaban.sh
2.2 WorkFlow案例實操
2.2.1 HelloWorld案例
1)在windows環境,新建azkaban.project檔案,編輯內容如下:
azkaban-flow-version: 2.0
注意:該檔案作用,是採用新的Flow-API方式解析flow檔案
2)新建basic.flow檔案,內容如下
nodes: - name: jobA type: command config: command: echo "Hello World"
(1)Name:job名稱
(2)Type:job型別。command表示你要執行作業的方式為命令
(3)Config:job配置
3)將azkaban.project、basic.flow檔案壓縮到一個zip檔案,檔名稱必須是英文
4)在WebServer新建專案:http://hadoop103:8081/index
5)TestAzkabanOne.zip檔案上傳
6)執行任務流
9)在日誌中,檢視執行結果
2.2.2 作業依賴案例
需求:JobA和JobB執行完了,才能執行JobC
1)修改basic.flow為如下內容
nodes: - name: jobC type: command # jobC 依賴 JobA和JobB dependsOn: - jobA - jobB config: command: echo "I’m JobC,我依賴於JobA和JobB" - name: jobA type: command config: command: echo "I’m JobA" - name: jobB type: command config: command: echo "I’m JobB"
2)將修改後的basic.flow和azkaban.project壓縮成TestAzkabanTwo.zip檔案
3)重複2.3.1節HelloWorld後續步驟
2.2.4 自動失敗重試案例
需求:如果執行任務失敗,需要重試3次,重試的時間間隔10000ms
1)編譯配置流
nodes: - name: JobA type: command config: command: sh /not_exists.sh retries: 3 retry.backoff: 10000
retries:重試次數
retry.backoff:重試的時間間隔
2)將修改後的basic.flow和azkaban.project壓縮成TestAzkabanThree.zip檔案
3)重複2.3.1節HelloWorld後續步驟
4)執行並觀察到一次失敗+三次重試
5)也可以點選上圖中的Log,在任務日誌中看到,總共執行了4次
6)也可以在Flow全域性配置中新增任務失敗重試配置,此時重試配置會應用到所有Job
config: retries: 3 retry.backoff: 10000 nodes: - name: JobA type: command config: command: sh /not_exists.sh
2.2.5 手動失敗重試案例
需求:JobA=》JobB(依賴於A)=》JobC=》JobD=》JobE=》JobF。生產環境,任何Job都有可能掛掉,可以根據需求執行想要執行的Job
1)編譯配置流
nodes: - name: JobA type: command config: command: echo "This is JobA." - name: JobB type: command dependsOn: - JobA config: command: echo "This is JobB." - name: JobC type: command dependsOn: - JobB config: command: echo "This is JobC." - name: JobD type: command dependsOn: - JobC config: command: echo "This is JobD." - name: JobE type: command dependsOn: - JobD config: command: echo "This is JobE." - name: JobF type: command dependsOn: - JobE config: command: echo "This is JobF."
2)將修改後的basic.flow和azkaban.project壓縮成TestAzkabanFour.zip檔案
3)重複2.3.1節HelloWorld後續步驟
Enable和Disable下面都分別有如下引數:
Parents:該作業的上一個任務
Ancestors:該作業前的所有任務
Children:該作業後的一個任務
Descendents:該作業後的所有任務
Enable All:所有的任務
4)可以根據需求選擇性執行對應的任務
第3章 Azkaban進階
3.1 定時執行案例
需求:JobA每間隔1分鐘執行一次;
1)Azkaban可以定時執行工作流。在執行工作流時候,選擇左下角Schedule
2)右上角注意時區是上海,然後在左面填寫具體執行事件,填寫的方法和crontab配置定時任務規則一致。
3)觀察結果
4)刪除定時排程,點選removeSchedule即可刪除當前任務的排程規則
3.2 郵件報警案例
3.2.1 註冊郵箱
1)申請註冊一個郵箱
2)點選設定--->賬戶
3)開啟SMTP服務
4)一定要記住授權碼
3.2.2郵件報警案例
Azkaban預設支援通過郵件對失敗的任務進行報警,配置方法如下:
1)在azkaban-web節點hadoop103上,編輯/opt/module/azkaban/azkaban-web/conf/azkaban.properties,修改如下內容:
vim /opt/module/azkaban/azkaban-web/conf/azkaban.properties
#新增如下內容: #這裡設定郵件傳送伺服器,需要 申請郵箱,開通stmp服務 mail.sender=1035807396@qq.com mail.host=smtp.qq.com mail.user=1035807396@qq.com mail.password=用郵箱的授權碼 #Job執行成功發往的郵箱 job.failure.email=1430730265@qq.com #Job執行失敗發往的郵箱 job.success.email[email protected]
2)儲存並重啟web-server
bin/shutdown-web.sh
bin/start-web.sh
3)編輯basic.flow,加入如下屬性:
nodes: - name: jobA type: command config: command: echo "This is an email test."
4)將azkaban.project和basic.flow壓縮成TestAzkabanEmail.zip
5)建立工程-->上傳檔案-->執行作業-->檢視結果
6)觀察郵箱,發現執行成功或者失敗的郵件
第4章 參考資料
4.1 Azkaban完整配置
見官網文件:https://azkaban.readthedocs.io/en/latest/configuration.html
4.2 YAML語法
1)語法特點
(1)大小寫敏感
(2)通過縮排表示層級關係
(3)禁止使用tab縮排,只能使用空格鍵 (個人感覺這條最重要)
(4)縮排的空格數目不重要,只要相同層級左對齊即可
(5)使用#表示註釋
2)支援的資料結構
(1)物件:鍵值對的集合,又稱為對映(mapping)/ 雜湊(hashes) / 字典(dictionary)
(2)陣列:一組按次序排列的值,又稱為序列(sequence) / 列表(list)
(3)純量(scalars):單個的、不可再分的值
3)雙引號和單引號的區分
雙引號""
:不會轉義字串裡面的特殊字元,特殊字元作為本身想表示的意思
name: "123\n123"
---------------------------
輸出: 123 換行 123
如果不加引號
將會轉義特殊字元,當成字串處理
4)值的寫法
(1)字串:使用”或”“或不使用引號
value0: 'hello World!' value1: "hello World!" value2: hello World!
(2)布林值:true
或false
表示
(3)數字
l12 #整數 014 # 八進位制整數 0xC #十六進位制整數 13.4 #浮點數 1.2e+34 #指數 .inf空值 #無窮大
(4)空值
null或~表示
(5)日期:使用 iso-8601 標準表示日期
date: 2018-01-01t16:59:43.10-05:00
在springboot中yaml檔案的時間格式 date: yyyy/MM/dd HH:mm:ss
(6)強制型別轉換:YAML 允許使用個感嘆號!
,強制轉換資料型別,單歎號
通常是自定義型別,雙歎號
是內建型別
money: !!str 123 date: !Boolean true
內建型別列表
!!int # 整數型別 !!float # 浮點型別 !!bool # 布林型別 !!str # 字串型別 !!binary # 也是字串型別 !!timestamp # 日期時間型別 !!null # 空值 !!set # 集合 !!omap,!!pairs # 鍵值列表或物件列表 !!seq # 序列,也是列表 !!map # 鍵值表
(7)物件:Map(屬性和值)(鍵值對)的形式: key:(空格)v :表示一堆鍵值對,空格不可省略
car:
color: red
brand: BMW
一行寫法,相當於JSON格式:{"color":"red","brand":"BMW"}
car:{color: red,brand: BMW}
(8)陣列:一組連詞線開頭的行,構成一個數組
brand: - audi - bmw - ferrari
一行寫法,相當於JSON:["auri","bmw","ferrari"]
brand: [audi,bmw,ferrari]
(9)文字塊:|:使用|
標註的文字內容縮排表示的塊,可以保留塊中已有的回車換行
value: | hello world! 輸出結果:hello 換行 world!
+
表示保留文字塊末尾的換行,-
表示刪除字串末尾的換行(“|” 與 文字之間須另起一行)
value: | hello value: |- hello value: |+ hello 輸出:hello\n hello hello\n\n(有多少個回車就有多少個\n)
>:使用 >
標註的文字內容縮排表示的塊,將塊中回車替換為空格,最終連線成一行(“>” 與 文字之間的空格)
value: > hello world! 輸出:hello 空格 world!
(10)錨點與引用:使用 &
定義資料錨點(即要複製的資料),使用 *
引用錨點資料(即資料的複製目的地)(注意:*
引用部分不能追加內容)
name: &a yaml book: *a books: - java - *a - python 輸出book: yaml 輸出books:[java,yaml,python]
5)配置檔案注入資料:我們可以匯入配置檔案處理器,以後編寫配置就有提示了,@ConfigurationProperties
IDE會提示開啟線上的幫助文件
<!--匯入配置檔案處理器,配置檔案進行繫結就會有提示-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
/** * 將配置檔案中配置的每一個屬性的值,對映到這個元件中 * @ConfigurationProperties:告訴SpringBoot將本類中的所有屬性和配置檔案中相關的配置進行繫結; * prefix = "person":配置檔案中哪個下面的所有屬性進行一一對映 * * 只有這個元件是容器中的元件,才能容器提供的@ConfigurationProperties功能; * */ @Component //例項化 @ConfigurationProperties(prefix = "person")//yaml或者properties的字首 public class Person { private String name; private Integer age; private Boolean flag; private Date birthday; private Map<String,Object> maps; private List<Object> tempList; private Dog dog; //省略getter和setter以及toString方法
6)application.yaml檔案
person: name: 胖先森 age: 18 flag: false birthday: 2018/12/19 20:21:22 #Spring Boot中時間格式 maps: {bookName: "西遊記",author: '吳承恩'} tempList: - 紅樓夢 - 三國演義 - 水滸傳 dog: dogName: 大黃 dogAge: 4
7)在test中進行測試如下:
@RunWith(SpringRunner.class) @SpringBootTest public class Demo03BootApplicationTests { @Autowired private Person p1; @Test public void contextLoads() { System.out.println(p1); } }
8)application.properties檔案
propertiesperson123.name=劉備 person123.age=20 person123.birthday=2018/12/19 20:21:22 person123.maps.bookName=水滸傳 person123.maps.author=羅貫中 person123.temp-list=一步教育,步步為贏 person123.dog.dogName=小白 person123.dog.dogAge=5
java程式碼修改字首
@Component //例項化 @ConfigurationProperties(prefix = "person123")//yaml或者properties的字首 public class Person { private String name; private Integer age; private Boolean flag; private Date birthday; private Map<String,Object> maps; private List<Object> tempList; private Dog dog; //省略getter和setter以及toString方法
9)在test中進行測試如下:
@RunWith(SpringRunner.class) @SpringBootTest public class Demo03BootApplicationTests { @Autowired private Person p1; @Test public void contextLoads() { System.out.println(p1); } }