1. 程式人生 > 其它 >離線數倉(八)

離線數倉(八)

1Azkaban概論

1.1 為什麼需要工作流排程系統

  1)一個完整的資料分析系統通常都是由大量任務單元組成:

    Shell指令碼程式,Java程式,MapReduce程式、Hive指令碼等

  2)各任務單元之間存在時間先後及前後依賴關係

  3)為了很好地組織起這樣的複雜執行計劃,需要一個工作流排程系統來排程執行;

1.2 常見工作流排程系統

  1)簡單的任務排程:直接使用LinuxCrontab來定義;

  2)複雜的任務排程:開發排程平臺或使用現成的開源排程系統,比如OoizeAzkabanAirflow、DolphinScheduler等。

1.3 Azkaban
Oozie對比

  對市面上最流行的兩種排程器,給出以下詳細對比,以供技術選型參考。總體來說,Ooize相比Azkaban是一個重量級的任務排程系統,功能全面,但配置使用也更復雜。如果可以不在意某些功能的缺失,輕量級排程器Azkaban是很不錯的候選物件。

2Azkaban入門

2.1 叢集模式安裝

2.1.1 上傳tar包(網盤下載地址:https://pan.baidu.com/s/1qQsD9XYk7O2cu_OXrYKPjA  提取碼:jkcy

  1)將azkaban-db-3.84.4.tar.gz,azkaban-exec-server-3.84.4.tar.gz,azkaban-web-server-3.84.4.tar.gz上傳到

hadoop103/opt/software路徑

  2)新建/opt/module/azkaban目錄,並將所有tar包解壓到這個目錄下

mkdir /opt/module/azkaban

  3)解壓azkaban-db-3.84.4.tar.gz azkaban-exec-server-3.84.4.tar.gz和azkaban-web-server-3.84.4.tar.gz/opt/module/azkaban目錄下

tar -zxvf /opt/software/azkaban-db-3.84.4.tar.gz -C /opt/module/azkaban/
tar -zxvf /opt/software/azkaban-exec-server-3.84.4.tar.gz -C /opt/module/azkaban/
tar -zxvf /opt/software/azkaban-web-server-3.84.4.tar.gz -C /opt/module/azkaban/

  4)進入到/opt/module/azkaban目錄,依次修改名稱

cd /opt/module/azkaban/
mv azkaban-exec-server-3.84.4/ azkaban-exec
mv azkaban-web-server-3.84.4/ azkaban-web

2.1.2 配置MySQL

  1)正常安裝MySQL

    (1)解除安裝自帶的Mysql-libs(如果之前安裝過mysql,要全都解除安裝掉)

rpm -qa | grep -i -E mysql\|mariadb | xargs -n1 sudo rpm -e --nodeps

    (2)上面解除安裝的操作還是最好的,這裡用編寫指令碼的方式進行操作,新建remove_mysql.sh並賦予可執行許可權,執行指令碼即可

#!/bin/bash
service mysql stop 2>/dev/null
service mysqld stop 2>/dev/null
rpm -qa | grep -i mysql | xargs -n1 rpm -e --nodeps 2>/dev/null
rpm -qa | grep -i mariadb | xargs -n1 rpm -e --nodeps 2>/dev/null
rm -rf /var/lib/mysql
rm -rf /usr/lib64/mysql
rm -rf /etc/my.cnf
rm -rf /usr/my.cnf
chmod +x remove_mysql.sh

    (3)將安裝包和JDBC驅動上傳到/opt/software,共計6個(https://pan.baidu.com/s/1wojPY6P6qxjdMtc90KxP-Q  提取碼:yuan)

01_mysql-community-common-5.7.29-1.el7.x86_64.rpm
02_mysql-community-libs-5.7.29-1.el7.x86_64.rpm
03_mysql-community-libs-compat-5.7.29-1.el7.x86_64.rpm
04_mysql-community-client-5.7.29-1.el7.x86_64.rpm
05_mysql-community-server-5.7.29-1.el7.x86_64.rpm
mysql-connector-java-5.1.48.jar

    (4)安裝MySql

ls *.rpm | xargs -n1 sudo rpm -ivh

    (5)啟動mysql

sudo systemctl start mysqld

    (6)檢視mysql密碼

sudo cat /var/log/mysqld.log | grep password

    (7)用剛剛查到的密碼進入mysql(如果報錯,給密碼加單引號,不管查詢到的預設密碼是什麼直接全部複製就行)

mysql -uroot -p’password’

    (8)設定複雜密碼(由於mysql密碼策略,此密碼必須足夠複雜)

set password=password("Qs23=zs32");

    (9)更改mysql密碼策略

set global validate_password_length=4;
set global validate_password_policy=0;

    (10)設定簡單好記的密碼

set password=password("root123");

    (11)進入msyql

use mysql

    (12)查詢user

select user, host from user;

    (13)修改userHost表內容修改為%

update user set host="%" where user="root";

    (14)重新整理

flush privileges;

    (15)退出

quit;

  2)連線MySQL(我的MySQL安裝在Hadoop102)

mysql -hhadoop102 -uroot -proot123

  3)登陸MySQL,建立Azkaban資料庫

create database azkaban;

  4)建立azkaban使用者並賦予許可權(此步可以跳過,後續配置使用root使用者)

    (1)設定密碼有效長度4位及以上

set global validate_password_length=4;

    (2)設定密碼策略最低級別

set global validate_password_policy=0;

    (3)建立Azkaban使用者,任何主機都可以訪問Azkaban,密碼是azkaban123

CREATE USER 'azkaban'@'%' IDENTIFIED BY 'azkaban123';

    (4)賦予Azkaban使用者增刪改查許可權

grant select,insert,update,delete on azkaban.*to 'azkaban'@'%' with grant option;

  5)建立Azkaban表,完成後退出MySQL(將azkaban-db-3.84.4.tar.gz上傳至hadoop102的/opt/software/並解壓,因為我的mysql安裝在hadoop102上)

use azkaban;
source /opt/module/azkaban/azkaban-db-3.84.4/create-all-sql-3.84.4.sql;
quit;

  6)更改MySQL包大小;防止Azkaban連線MySQL阻塞

sudo vim /etc/my.cnf
max_allowed_packet=1024M

  7)重啟MySQL

sudo systemctl restart mysqld

  8)檢視狀態

sudo systemctl status mysqld

2.1.3 配置Executor Server

  Azkaban Executor Server處理工作流和作業的實際執行

  1)編輯azkaban.properties

vim /opt/module/azkaban/azkaban-exec/conf/azkaban.properties
#修改如下屬性
default.timezone.id=Asia/Shanghai
azkaban.webserver.url=http://hadoop102:8081
mysql.host=hadoop102
mysql.database=azkaban
mysql.user=root
mysql.password=root123
#在最後新增
executor.port=12321

  2)同步azkaban-exec到所有節點

xsync /opt/module/azkaban/azkaban-exec

  3)必須進入/opt/module/azkaban/azkaban-exec路徑,分別在三臺機器上,啟動executor server

cd /opt/module/azkaban/azkaban-exec
#如果在/opt/module/azkaban/azkaban-exec目錄下出現executor.port檔案,說明啟動成功
bin/start-exec.sh

  4)下面啟用executor(每臺節點都執行)

curl -G "hadoop102:12321/executor?action=activate" && echo
curl -G "hadoop103:12321/executor?action=activate" && echo
curl -G "hadoop104:12321/executor?action=activate" && echo

    如果三臺機器都出現如下提示,則表示啟用成功:{"status":"success"}

2.1.4 配置Web Server

  Azkaban Web Server處理專案管理,身份驗證,計劃和執行觸發。

  1)編輯azkaban.properties(我自己是安裝在hadoop103叢集上)

vim /opt/module/azkaban/azkaban-web/conf/azkaban.properties
#修改如下屬性
default.timezone.id=Asia/Shanghai
user.manager.xml.file=/opt/module/azkaban/azkaban-web/conf/azkaban-users.xml
executor.global.properties=/opt/module/azkaban/azkaban-web/conf/global.properties
mysql.host=hadoop102
mysql.database=azkaban
mysql.user=root
mysql.password=root123
#StaticRemainingFlowSize:正在排隊的任務數
#CpuStatus:CPU佔用情況
#MinimumFreeMemory:記憶體佔用情況,測試環境下,必須將MinimumFreeMemory刪除掉,否則它會認為叢集資源不夠,不執行
azkaban.executorselector.filters=StaticRemainingFlowSize,CpuStatus

  2)修改azkaban-users.xml檔案,新增yuange使用者

vim /opt/module/azkaban/azkaban-web/conf/azkaban-users.xml
<azkaban-users>
  <user groups="azkaban" password="azkaban" roles="admin" username="azkaban"/>
  <user password="metrics" roles="metrics" username="metrics"/>

  <user groups="azkaban" password="azkaban123" roles="metrics,admin" username="yuange"/>

  <role name="admin" permissions="ADMIN"/>
  <role name="metrics" permissions="METRICS"/>
</azkaban-users>

  3)必須進入到hadoop103的/opt/module/azkaban/azkaban-web路徑,啟動webserver

cd /opt/module/azkaban/azkaban-web
bin/start-web.sh

  4)訪問 http://hadoop103:8081 ,並用yuange使用者登陸

  5)編寫azkaban叢集啟動/停止指令碼(在Hadoop103上編寫)

vim /home/atguigu/bin/azkaban.sh
#!/bin/bash
#azkaban的一鍵啟動指令碼,只接收單個start或stop引數
if(($#!=1))
then
        echo 請輸入單個start或stop引數!
        exit
fi

#對傳入的單個引數進行校驗,且執行相應的啟動和停止命令
if [ $1 = start ]
then

#啟動executor
        
    xcall.sh "cd /opt/module/azkaban/azkaban-exec; bin/start-exec.sh "
    
    sleep 5s
#啟用executor
    for i in hadoop102 hadoop103 hadoop104
    do
         curl -G $i:12321/executor?action=activate && echo 
    done
#啟動web-server
    cd /opt/module/azkaban/azkaban-web
        bin/start-web.sh

elif [ $1 = stop ]
    then
    cd /opt/module/azkaban/azkaban-web
    bin/shutdown-web.sh

    xcall.sh /opt/module/azkaban/azkaban-exec/bin/shutdown-exec.sh
else
        echo 請輸入單個start或stop引數!
fi

xcall.sh jps

  6)新增可執行許可權

chmod +x /home/atguigu/bin/azkaban.sh

2.2 WorkFlow案例實操

2.2.1 HelloWorld案例

  1)在windows環境,新建azkaban.project檔案,編輯內容如下:

azkaban-flow-version: 2.0

    注意:該檔案作用,是採用新的Flow-API方式解析flow檔案

  2)新建basic.flow檔案,內容如下

nodes:
  - name: jobA
    type: command
    config:
      command: echo "Hello World"

    (1)Namejob名稱

    (2)Typejob型別。command表示你要執行作業的方式為命令

    (3)Configjob配置

  3)將azkaban.projectbasic.flow檔案壓縮到一個zip檔案,檔名稱必須是英文

  4)在WebServer新建專案:http://hadoop103:8081/index

  5)TestAzkabanOne.zip檔案上傳

  6)執行任務流

  9)在日誌中,檢視執行結果

2.2.2 作業依賴案例

  需求:JobAJobB執行完了,才能執行JobC

  1)修改basic.flow為如下內容

nodes:
  - name: jobC
    type: command
    # jobC 依賴 JobA和JobB
    dependsOn:
      - jobA
      - jobB
    config:
      command: echo "I’m JobC,我依賴於JobA和JobB"

  - name: jobA
    type: command
    config:
      command: echo "I’m JobA"

  - name: jobB
    type: command
    config:
      command: echo "I’m JobB"

  2)將修改後的basic.flow和azkaban.project壓縮成TestAzkabanTwo.zip檔案

  3)重複2.3.1HelloWorld後續步驟

2.2.4 自動失敗重試案例

  需求:如果執行任務失敗,需要重試3次,重試的時間間隔10000ms

  1)編譯配置流

nodes:
  - name: JobA
    type: command
    config:
      command: sh /not_exists.sh
      retries: 3
      retry.backoff: 10000

    retries:重試次數

    retry.backoff:重試的時間間隔

  2)將修改後的basic.flow和azkaban.project壓縮成TestAzkabanThree.zip檔案

  3)重複2.3.1HelloWorld後續步驟

  4)執行並觀察到一次失敗+三次重試

  5)也可以點選上圖中的Log,在任務日誌中看到,總共執行了4

  6)也可以在Flow全域性配置中新增任務失敗重試配置,此時重試配置會應用到所有Job

config:
  retries: 3
  retry.backoff: 10000
nodes:
  - name: JobA
    type: command
    config:
      command: sh /not_exists.sh

2.2.5 手動失敗重試案例

  需求:JobA=JobB(依賴於A=JobC=JobD=JobE=JobF。生產環境,任何Job都有可能掛掉,可以根據需求執行想要執行的Job

  1)編譯配置流

nodes:
  - name: JobA
    type: command
    config:
      command: echo "This is JobA."

  - name: JobB
    type: command
    dependsOn:
      - JobA
    config:
      command: echo "This is JobB."

  - name: JobC
    type: command
    dependsOn:
      - JobB
    config:
      command: echo "This is JobC."

  - name: JobD
    type: command
    dependsOn:
      - JobC
    config:
      command: echo "This is JobD."

  - name: JobE
    type: command
    dependsOn:
      - JobD
    config:
      command: echo "This is JobE."

  - name: JobF
    type: command
    dependsOn:
      - JobE
    config:
      command: echo "This is JobF."

  2)將修改後的basic.flow和azkaban.project壓縮成TestAzkabanFour.zip檔案

  3)重複2.3.1HelloWorld後續步驟

    EnableDisable下面都分別有如下引數:

      Parents:該作業的上一個任務

      Ancestors:該作業前的所有任務

      Children:該作業後的一個任務

      Descendents:該作業後的所有任務

      Enable All:所有的任務

  4)可以根據需求選擇性執行對應的任務

3Azkaban進階

3.1 定時執行案例

  需求:JobA每間隔1分鐘執行一次;

  1Azkaban可以定時執行工作流。在執行工作流時候,選擇左下角Schedule

  2)右上角注意時區是上海,然後在左面填寫具體執行事件,填寫的方法和crontab配置定時任務規則一致。

  3)觀察結果

  4)刪除定時排程,點選removeSchedule即可刪除當前任務的排程規則

3.2 郵件報警案例

3.2.1 註冊郵箱

  1)申請註冊一個郵箱

  2)點選設定--->賬戶

  3)開啟SMTP服務

  4)一定要記住授權碼

3.2.2郵件報警案例

  Azkaban預設支援通過郵件對失敗的任務進行報警,配置方法如下:

  1)在azkaban-web節點hadoop103上,編輯/opt/module/azkaban/azkaban-web/conf/azkaban.properties,修改如下內容:

vim /opt/module/azkaban/azkaban-web/conf/azkaban.properties
#新增如下內容:
#這裡設定郵件傳送伺服器,需要 申請郵箱,開通stmp服務
mail.sender=1035807396@qq.com
mail.host=smtp.qq.com
mail.user=1035807396@qq.com
mail.password=用郵箱的授權碼
#Job執行成功發往的郵箱
job.failure.email=1430730265@qq.com
#Job執行失敗發往的郵箱
job.success.email[email protected]

  2)儲存並重啟web-server

bin/shutdown-web.sh
bin/start-web.sh

  3)編輯basic.flow,加入如下屬性:

nodes:
  - name: jobA
    type: command
    config:
      command: echo "This is an email test."

  4)將azkaban.project和basic.flow壓縮成TestAzkabanEmail.zip

  5)建立工程-->上傳檔案-->執行作業-->檢視結果

  6)觀察郵箱,發現執行成功或者失敗的郵件

4章 參考資料

4.1 Azkaban完整配置

  見官網文件:https://azkaban.readthedocs.io/en/latest/configuration.html

4.2 YAML語法

  1)語法特點

    (1)大小寫敏感

    (2)通過縮排表示層級關係

    (3)禁止使用tab縮排,只能使用空格鍵 (個人感覺這條最重要)

    (4)縮排的空格數目不重要,只要相同層級左對齊即可

    (5)使用#表示註釋

  2)支援的資料結構

    (1)物件:鍵值對的集合,又稱為對映(mapping)/ 雜湊(hashes) / 字典(dictionary)

    (2)陣列:一組按次序排列的值,又稱為序列(sequence) / 列表(list)

    (3)純量(scalars):單個的、不可再分的值

  3)雙引號和單引號的區分

    雙引號"":不會轉義字串裡面的特殊字元,特殊字元作為本身想表示的意思

name: "123\n123"
---------------------------
輸出: 123 換行 123

    如果不加引號將會轉義特殊字元,當成字串處理

  4)值的寫法

    (1)字串:使用”或”“或不使用引號

value0: 'hello World!'
value1: "hello World!"
value2: hello World!

    (2)布林值:truefalse表示

    (3)數字

l12 #整數 
014 # 八進位制整數 
0xC #十六進位制整數 
13.4 #浮點數 
1.2e+34 #指數 
.inf空值 #無窮大

    (4)空值

null或~表示

    (5)日期:使用 iso-8601 標準表示日期

date: 2018-01-01t16:59:43.10-05:00

      在springboot中yaml檔案的時間格式 date: yyyy/MM/dd HH:mm:ss

    (6)強制型別轉換:YAML 允許使用個感嘆號!,強制轉換資料型別,單歎號通常是自定義型別,雙歎號是內建型別

money: !!str
123
date: !Boolean
true

      內建型別列表

!!int # 整數型別 
!!float # 浮點型別 
!!bool # 布林型別 
!!str # 字串型別 
!!binary # 也是字串型別 
!!timestamp # 日期時間型別 
!!null # 空值 
!!set # 集合 
!!omap,!!pairs # 鍵值列表或物件列表
!!seq # 序列,也是列表 !!map # 鍵值表

    (7)物件:Map(屬性和值)(鍵值對)的形式: key:(空格)v :表示一堆鍵值對,空格不可省略

car:
    color: red
    brand: BMW

      一行寫法,相當於JSON格式:{"color":"red","brand":"BMW"}

car:{color: red,brand: BMW}

    (8)陣列:一組連詞線開頭的行,構成一個數組

brand:
   - audi
   - bmw
   - ferrari

      一行寫法,相當於JSON:["auri","bmw","ferrari"]

brand: [audi,bmw,ferrari]

    (9)文字塊:|:使用|標註的文字內容縮排表示的塊,可以保留塊中已有的回車換行

value: |
   hello
   world!
輸出結果:hello 換行 world!

      +表示保留文字塊末尾的換行,-表示刪除字串末尾的換行(“|” 與 文字之間須另起一行

value: |
hello

value: |-
hello

value: |+
hello
輸出:hello\n hello hello\n\n(有多少個回車就有多少個\n)

      >:使用 > 標註的文字內容縮排表示的塊,將塊中回車替換為空格,最終連線成一行(“>” 與 文字之間的空格)

value: > hello
world!
輸出:hello 空格 world!

    (10)錨點與引用:使用 & 定義資料錨點(即要複製的資料),使用 * 引用錨點資料(即資料的複製目的地)(注意:*引用部分不能追加內容)

name: &a yaml
book: *a
books: 
   - java
   - *a
   - python
輸出book: yaml 
輸出books:[java,yaml,python]

  5)配置檔案注入資料:我們可以匯入配置檔案處理器,以後編寫配置就有提示了,@ConfigurationPropertiesIDE會提示開啟線上的幫助文件

<!--匯入配置檔案處理器,配置檔案進行繫結就會有提示-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-configuration-processor</artifactId>
    <optional>true</optional>
</dependency>
/**
 * 將配置檔案中配置的每一個屬性的值,對映到這個元件中
 * @ConfigurationProperties:告訴SpringBoot將本類中的所有屬性和配置檔案中相關的配置進行繫結;
 *      prefix = "person":配置檔案中哪個下面的所有屬性進行一一對映
 *
 * 只有這個元件是容器中的元件,才能容器提供的@ConfigurationProperties功能;
 *
 */
@Component //例項化
@ConfigurationProperties(prefix = "person")//yaml或者properties的字首
public class Person {

    private String name;
    private Integer age;
    private Boolean flag;
    private Date birthday;
    private Map<String,Object> maps;
    private List<Object> tempList;
    private Dog dog;
    //省略getter和setter以及toString方法

  6)application.yaml檔案

person:
  name: 胖先森
  age: 18
  flag: false
  birthday: 2018/12/19 20:21:22 #Spring Boot中時間格式
  maps: {bookName: "西遊記",author: '吳承恩'}
  tempList:
    - 紅樓夢
    - 三國演義
    - 水滸傳
  dog:
    dogName: 大黃
    dogAge: 4

  7)在test中進行測試如下:

@RunWith(SpringRunner.class)
@SpringBootTest
public class Demo03BootApplicationTests {

    @Autowired
    private Person p1;

    @Test
    public void contextLoads() {
        System.out.println(p1);
    }

}

  8)application.properties檔案

propertiesperson123.name=劉備
person123.age=20
person123.birthday=2018/12/19 20:21:22
person123.maps.bookName=水滸傳
person123.maps.author=羅貫中
person123.temp-list=一步教育,步步為贏
person123.dog.dogName=小白
person123.dog.dogAge=5

    java程式碼修改字首

@Component //例項化
@ConfigurationProperties(prefix = "person123")//yaml或者properties的字首
public class Person {

    private String name;
    private Integer age;
    private Boolean flag;
    private Date birthday;
    private Map<String,Object> maps;
    private List<Object> tempList;
    private Dog dog;
    //省略getter和setter以及toString方法

  9)在test中進行測試如下:

@RunWith(SpringRunner.class)
@SpringBootTest
public class Demo03BootApplicationTests {

    @Autowired
    private Person p1;

    @Test
    public void contextLoads() {
        System.out.println(p1);
    }
}