部署Azkaban多節點分散式模式
簡單介紹:
Azkaban是由Linkedin公司推出的一個批量工作流任務排程器,用於在一個工作流內以一個特定的順序執行一組工作和流程。Azkaban使用job配置檔案建立任務之間的依賴關係,並提供一個易於使用的web使用者介面維護和跟蹤你的工作流。 它有三個重要元件:
- 關係資料庫(目前僅支援mysql)
- web管理伺服器-AzkabanWebServer
- 執行伺服器-AzkabanExecutorServer
Azkaban使用MySQL來儲存它的狀態資訊,Azkaban Executor Server和Azkaban Web Server均使用到了MySQL資料庫。
AzkabanExecutorServer在如下幾個方面使用到了資料庫:
- 獲取project的資訊
- 執行工作流
- 儲存工作流執行日誌
- 如果一個工作流在不同的執行器上執行,它將從DB中獲取狀態。
AzkabanWebServer在如下幾個方面使用到了資料庫:
- Project管理
- 跟蹤工作流執行進度
- 訪問歷史工作流的執行資訊
- 定時執行工作流任務
- 記錄所有sla規則
AzkabanWebServer
AzkabanWebserver是整個Azkaban工作流系統的主要管理者,它負責project管理、使用者登入認證、定時執行工作流、跟蹤工作流執 行進度等一系列任務。同時,它還提供Web服務操作的介面,利用該介面,使用者可以使用curl或其他ajax的方式,來執行azkaban的相關操作。操作包括:使用者登入、建立project、上傳workflow、執行workflow、查詢workflow的執行進度、殺掉workflow等一系列操作,且這些操作的返回結果均是json的格式。
AzkabanExecutorServer
之所以將AzkabanWebServer和AzkabanExecutorServer分開,主要是因為在某個任務流失敗後,可以更方便的將重新執行。而且也更有利於Azkaban系統的升級。
注意:安裝sqoop的節點都要安裝azkaban
環境配置:由於azkaban3.0以上沒有相應的安裝包,需要從原始碼進行編譯。編譯的環境需要安裝jdk8。
分散式模式:叢集內應當安裝三個exec-server和一個web-server,相關元件分配如下:
bigdata243 azkaban-exec
bigdata244 azkaban-exec
bigdata245 azkaban-web-server azkaban-exec-server mysql
azkaban-web目錄
bin 啟動指令碼存放目錄
conf 配置檔案存放目錄(沒有的話從solo-server的目錄中拷貝過來)
lib 依賴jar包存放目錄
extlib 附加jar包存放目錄(沒有的話手動建立)
plugins 外掛安裝目錄
web web資原始檔
logs 日誌儲存目錄
sql sql資源
azkaban-exec目錄
bin 啟動指令碼存放目錄
conf 配置檔案存放目錄(沒有的話從solo-server的目錄中拷貝過來)
lib 依賴jar包存放目錄
extlib 附加jar包存放目錄(沒有的話手動建立)
plugins 外掛安裝目錄
編譯,安裝過程
官網下載:3.47版本
進入到azkaban下面編譯:[[email protected] azkaban-3.47.0]$ ./gradlew distTar
編譯結果為:
azkaban-common : 常用工具類。
azkaban-db : 對應的sql指令碼
azkaban-hadoop-secutity-plugin : hadoop 有關kerberos外掛
azkaban-solo-server: web和executor 一起的專案。
azkaban-web/executor-server:azkaban的 web和executor的server資訊
azkaban-spi: azkaban儲存介面以及exception類
編譯完成後:db、web、exec、solo四個目錄的build/distributions/下生成其壓縮包
將壓縮包拷貝到:新建資料夾:mkdir azkaban
cp azkaban-db-0.1.0-SNAPSHOT.tar.gz /home/hadoop/app/azkaban/
cp azkaban-exec-server-0.1.0-SNAPSHOT.tar.gz /home/hadoop/app/azkaban/
cp azkaban-web-server-0.1.0-SNAPSHOT.tar.gz /home/hadoop/app/azkaban/
cp azkaban-solo-server-0.1.0-SNAPSHOT.tar.gz /home/hadoop/app/azkaban/
解壓重新命名
tar -zxvf azkaban-web-server-0.1.0-SNAPSHOT.tar.gz
tar -zxvf azkaban-exec-server-0.1.0-SNAPSHOT.tar.gz
tar -zxvf azkaban-db-0.1.0-SNAPSHOT.tar.gz
tar -zxvf azkaban-solo-server-0.1.0-SNAPSHOT.tar.gz
mv azkaban-db-0.1.0-SNAPSHOT azkaban-db
mv azkaban-web-server-0.1.0-SNAPSHOT azkaban-web
mv azkaban-solo-server-0.1.0-SNAPSHOT azkaban-solo
mv azkaban-exec-server-0.1.0-SNAPSHOT azkaban-exec
建立Azkaban元資料庫:登入mysql,執行如下語句
mysql> create database azkaban_matadata;
Query OK, 1 row affected (0.00 sec)
mysql> use azkaban_matadata;
Database changed
mysql> source /home/hadoop/app/azkaban/azkaban-db/create-all-sql-0.1.0-SNAPSHOT.sql (會建立所有表)
配置keystore
在azkaban-web/bin目錄下執行這條命令,在執行完這條命令之後,會生成一個檔案:keystore.使用keytool建立SSL配置,keytool是JDK提供的一個工具,輸入如下命令,可以檢視
[[email protected] ~]# find / -name keytool
/home/hadoop/app/jdk1.8/bin/keytool
/home/hadoop/app/jdk1.8/jre/bin/keytool
執行命令建立SSL配置
[[email protected] bin]$ keytool -keystore keystore -alias jetty -genkey -keyalg RSA
輸入金鑰庫口令: azkaban
再次輸入新口令: azkaban
您的名字與姓氏是什麼? [Unknown]: 略過
您的組織單位名稱是什麼? [Unknown]: 略過
您的組織名稱是什麼? [Unknown]: 略過
您所在的城市或區域名稱是什麼? [Unknown]: 略過
您所在的省/市/自治區名稱是什麼? [Unknown]: 略過
該單位的雙字母國家/地區程式碼是什麼? [Unknown]: CN
CN=Unknown, OU=Unknown, O=Unknown, L=Unknown, ST=Unknown, C=CN是否正確?
[否]: Y
輸入 <jetty> 的金鑰口令 (如果和金鑰庫口令相同, 按回車):
將azkaban-solo下的conf plugins 和sql資料夾拷貝到azkaban-web目錄下
[[email protected] azkaban-solo]$ cp -a conf/ plugins/ sql/ /home/hadoop/app/azkaban/azkaban-web/
配置web-server
配置azkaban-web/conf/azkaban.properties
# Azkaban Personalization Settings
azkaban.name=bigdata245 # 伺服器UI名稱,用於伺服器上方顯示的名字
azkaban.label=Aliyun bigdata245 Azkaban # 描述資訊
azkaban.color=#FF3601 # 顏色
azkaban.default.servlet.path=/index
web.resource.dir=/home/hadoop/app/azkaban/azkaban-web/web/ #預設跟web目錄,設定為絕對路徑
default.timezone.id=Asia/Shanghai # 時區,預設為美國America/Los_Angeles
# Azkaban UserManager class
user.manager.class=azkaban.user.XmlUserManager #使用者許可權管理預設類
user.manager.xml.file=/home/hadoop/app/azkaban/azkaban-web/conf/azkaban-users.xml #使用者配置,具體配置參見下文
# Loader for projects
executor.global.properties=/home/hadoop/app/azkaban/azkaban-web/conf/global.properties #globa配置檔案所在位置
azkaban.project.dir=projects
database.type=mysql # 資料庫型別
mysql.port=3306 # 埠
mysql.host=245 # 資料庫連線IP
mysql.database=azkaban_matadata # 資料庫例項名
mysql.user=root # 資料庫使用者名稱
[email protected] # 資料庫密碼
mysql.numconnections=100 # 最大連線數
h2.path=./h2
h2.create.tables=true
# Velocity dev mode
velocity.dev.mode=false
# Azkaban Jetty server properties.
jetty.use.ssl=false
jetty.maxThreads=25 #最大執行緒數
jetty.port=8081 #jetty埠
jetty.ssl.port=8443 #jetty ssl埠號
jetty.keystore=/home/hadoop/app/azkaban/azkaban-web/bin/keystore #ssl的檔名,絕對路徑
jetty.password=azkaban #ssl檔案密碼
jetty.keypassword=azkaban #jetty主密碼與keystore檔案相同
jetty.truststore=keystore #SSL檔名
jetty.trustpassword=azkaban #SSL檔案密碼
# Azkaban Executor settings
executor.port=12321 #執行伺服器埠
# mail settings
mail.sender= #傳送郵箱
mail.host= #傳送郵箱smtp地址
# User facing web server configurations used to construct the user facing server URLs. They are useful when there is a reverse proxy between Azkaban web servers and users.
# enduser -> myazkabanhost:443 -> proxy -> localhost:8081
# when this parameters set then these parameters are used to generate email links.
# if these parameters are not set then jetty.hostname, and jetty.port(if ssl configured jetty.ssl.port) are used.
# azkaban.webserver.external_hostname=myazkabanhost.com
# azkaban.webserver.external_ssl_port=443
# azkaban.webserver.external_port=8081
job.failure.email=
job.success.email=
lockdown.create.projects=false
cache.directory=cache #快取目錄
# JMX stats
jetty.connector.stats=true
executor.connector.stats=true
# Azkaban plugin settings
azkaban.jobtype.plugin.dir=/home/hadoop/app/azkaban/azkaban-web/plugins/jobtypes
埠號使用規則:jetty.ssl.port > jetty.port。但是使用jetty.ssl.port的前提是jetty.use.ssl=true。這個配置表示開啟ssl【Secure Sockets Layer】安全套接層,否則使用jetty.port埠。
在azkaban-web/conf目錄下新增log4j.properties
[[email protected] conf]$ touch log4j.properties
log4j.rootLogger=INFO,C
log4j.appender.C=org.apache.log4j.ConsoleAppender
log4j.appender.C.Target=System.err
log4j.appender.C.layout=org.apache.log4j.PatternLayout
log4j.appender.C.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
新增MySQL驅動在azkaban-web目錄下建立資料夾:mkdir extlib
將lib目錄下的mysql驅動複製到extlib目錄下
[[email protected] azkaban-web]$ cp lib/mysql-connector-java-5.1.28.jar extlib/
新增管理員使用者以及密碼
進入azkaban-web/conf目錄,修改azkaban-users.xml,這個檔案存放使用者登入資訊以及許可權資訊。同時增加管理員使用者admin
<user username="admin" password="admin" roles="admin"/>
azkaban-web目錄下建立logs檔案用於存放日誌檔案 # mkdir logs
注意:多個執行器模式也就是分散式執行模式下執行,需要在webserver配置中啟用多個執行器模式。確認在azkaban.properties中具有以下屬性。azkaban.use.multiple.executors和azkaban.executorselector.comparator。*是必需的屬性。
注意:azkaban.use.multiple.executors 多重執行模式不予以尊重
配置多節點執行伺服器在azkaban-web/conf/azkaban.properties裡新增
azkaban.use.multiple.executors =true
azkaban.executorselector.filters = StaticRemainingFlowSize,MinimumFreeMemory,CpuStatus
azkaban.executorselector.comparator.NumberOfAssignedFlowComparator = 1
azkaban.executorselector.comparator.Memory = 1
azkaban.executorselector.comparator.LastDispatched = 1
azkaban.executorselector.comparator.CpuUsage = 1
以確認使用的是分散式方式,隨後提交的job會根據情況自行選擇執行伺服器,否則預設只使用本地執行伺服器。
配置exec-server
拷貝azkaban-web目錄下的conf和extlib到azkaban-web目錄下
cp -a conf/ extlib/ /home/hadoop/app/azkaban/azkaban-exec/
配置azkaban-web/conf/azkaban.properties
default.timezone.id=Asia/Shanghai
# Loader for projects
executor.global.properties=/home/hadoop/app/azkaban/azkaban-exec/conf/global.properties
azkaban.project.dir=/home/hadoop/app/azkaban/azkaban-exec/bin/projects
# Azkaban plugin settings
azkaban.jobtype.plugin.dir=plugins/jobtypes
database.type=mysql
mysql.port=3306
mysql.host=245
mysql.database=azkaban_matadata
mysql.user=root
[email protected]
mysql.numconnections=100
# Azkaban Executor settings
executor.maxThreads=50
executor.port=12321
executor.flow.threads=25
#分散式節點必配
azkaban.use.multiple.executors=true
azkaban.executorselector.filters=StaticRemainingFlowSize,MinimumFreeMemory,CpuStatus
azkaban.executorselector.comparator.NumberOfAssignedFlowComparator=1
azkaban.executorselector.comparator.Memory=1
azkaban.executorselector.comparator.LastDispatched=1
azkaban.executorselector.comparator.CpuUsage=1
在azkaban-exec/conf目錄下新增log4j.properties
[[email protected] conf]$ touch log4j.properties
log4j.rootLogger=INFO,C
log4j.appender.C=org.apache.log4j.ConsoleAppender
log4j.appender.C.Target=System.err
log4j.appender.C.layout=org.apache.log4j.PatternLayout
log4j.appender.C.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
在mysql的azkaban庫中新增各個執行伺服器的ip/域名和埠:
配置多執行器模式的執行程式,目前沒有執行程式管理UI。需要在資料庫中配置執行程式。需要將所有執行程式插入mysql DB以進行執行程式設定。驗證執行程式表中的正確執行程式是否處於活動狀態。
>insert into executors(host,port) values("bigdata245",3306);
>insert into executors(host,port) values("bigdata244",3306);
>insert into executors(host,port) values("bigdata243",3306);
啟動,先啟動exec-server(執行器),然後啟動web-server(web服務)
cd azkaban-exec/bin:./start-exec.sh
cd azkaban-web/bin:./start-web.sh
注意:在bin目錄下啟動會生成一堆檔案,如果用指令碼啟動注意修改配置路勁
啟動完成後,三臺節點下可以檢視到對應的程序
AzkabanExecutorServer 3
AzkabanWebServer 1
問題1;
The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.)
Caused by: java.net.ConnectException: Connection refused (Connection refused)
如果出現這兩個問題,去配置檔案檢視mysql連結是否出錯,還有mysql配置執行伺服器的語句是否有問題
訪問Azkaban UI介面
輸入使用者名稱密碼azkaban/azkaban登入
修改如下配置(azkaban預設啟動規則是在哪裡啟動在哪裡生成一堆檔案)
exec/bin
[[email protected] bin]$ cat start-exec.sh
#!/bin/bash
script_dir=$(dirname $0)
# pass along command line arguments to the internal launch script.
${script_dir}/internal/internal-start-executor.sh "[email protected]" >/home/hadoop/app/azkaban/azkaban-exec/bin/executorServerLog__`date +%F+%T`.out 2>&1 &
[[email protected] bin]$ pwd
/home/hadoop/app/azkaban/azkaban-exec/bin
web/bin
[[email protected] bin]$ pwd
/home/hadoop/app/azkaban/azkaban-web/bin
[[email protected] bin]$ cat start-web.sh
#!/bin/bash
script_dir=$(dirname $0)
${script_dir}/internal/internal-start-web.sh >/home/hadoop/app/azkaban/azkaban-web/bin/webServerLog_`date +%F+%T`.out 2>&1 &
配置azkaban-exec/conf/azkaban.properties
azkaban.project.dir=/home/hadoop/app/azkaban/azkaban-exec/bin/projects
配置azkaban-web/conf/azkaban.properties
azkaban.project.dir=/home/hadoop/app/azkaban/azkaban-web/bin/projects
Azkaban測試及使用
projects:最重要的部分,建立一個工程,所有flows將在工程中執行。
Scheduling:顯示定時任務
Executing:顯示當前執行的任務
History:顯示歷史執行任務
主要介紹Projects部分,在建立工程前,我們先了解下之間的關係,一個工程包含一個或多個flows,一個flow包含多個job。job是你想在azkaban中執行的一個程序,可以是簡單的linux命令,可是java程式,也可以是複雜的shell指令碼、或者python指令碼,當然,如果你安裝相關外掛,也可以執行外掛。一個job可以依賴於另一個job,這種多個job和它們的依賴組成的圖表叫做flow。
web-server節點:負責專案作業管理(上傳和分發)
exec-server節點:負責具體執行的executor會解析job檔案
一、commond 型別單一Job
1.建立工程
Flows:工作流程,有多個job組成
Permissions:許可權管理
Project Logs:工程日誌
2.建立Job
job就是一個以.job結尾的文字檔案,例如建立一個job,名為hello.job,用於列印hello azkaban
3.打包
將建立的job打包成.zip壓縮檔案,注意只能是.zip格式
4.使用Azkaban UI 介面建立project並上傳壓縮包
點選Execute執行
執行後,點選Detail,檢視日誌
azkaban-exec/plugins/jobtypes/commonprivate.properties配置檔案,內容中新增:azkaban.native.lib=false
關閉重啟服務
如果還不行,編譯原始碼
原始碼路徑:/home/hadoop/app/compile_azkaban3.47/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
修改如下:final boolean isExecuteAsUser = this.sysProps.getBoolean(EXECUTE_AS_USER, false);
重新編譯之後將azkaban/azkaban-exec-server/build/distributions目錄下的azkaban-exec-server-3.48.0-8-gdc851ec.tar.gz 解壓重新命名,然後再修改配置替換舊的azkaban-exec-server,最後重啟exec和web服務即可
再次執行就好了
二、commond 型別多JOb 工作流 flow
1.建立專案
首先,建立一個專案,名為 Com_Job
2.job 建立
假設有這麼一種場景:
(1).task1 依賴 task2
(2).task2 依賴 task3
(3).task3 依賴 task4
說明:假設task1是一個計算指標任務,task2 給 task1 提供執行需要的基礎資料
task3 給 task2 提供資料,以此類推。
3.flow 建立
多個jobs和它們的依賴組成flow。怎麼建立依賴,只要指定dependencies引數就行了
定義4個job:
(1).run_task1.job:計算業務指標資料
(2).run_task2.job:計算task1所需要的資料
(3).run_task3.job:計算task2所需要的資料
(4).run_task4.job:從 slaves 中抽取源資料
依賴關係:
task1 依賴 task2,task2 依賴 task3,task3 依賴 task4
4個job檔案內容如下(這裡以執行python為例)
# run_task1.job
type = command
command = python /home/hadoop/pyshell/run_task1.py
dependencies = run_task2
# run_task2.job
type = command
command = python /home/hadoop/pyshell/run_task2.py
dependencies = run_task3
# run_task3.job
type = command
command = python /home/hadoop/pyshell/run_task3.py
dependencies = run_task4
# run_task4.job
type = command
command = python /home/hadoop/pyshell/run_task4.py
建立python指令碼
[[email protected] pyshell]$ touch run_task1.py
[[email protected] pyshell]$ touch run_task2.py
[[email protected] pyshell]$ touch run_task3.py
[[email protected] pyshell]$ touch run_task4.py
4個檔案內容如下
run_task1.py
#!/usr/bin/python3
# -*- coding: utf-8 -*-
print("task1:計算業務指標資料...")
run_task2.py
#!/usr/bin/python3
# -*- coding: utf-8 -*-
print("task2:計算基礎資料,為task1提供資料")
run_task3.py
#!/usr/bin/python3
# -*- coding: utf-8 -*-
print("task3:資料清洗,為task2提供資料")
run_task4.py
#!/usr/bin/python3
# -*- coding: utf-8 -*-
print("task4:從Slaves中抽取源資料")
3.將上述 job 打成zip包,上傳至 azkaban
上傳完成後,點選右側Execute Flow按鈕,檢視流程檢視
Flow view:流程檢視。可以禁用,啟用某些job
Notification:定義任務成功或者失敗是否傳送郵件
Failure Options:定義一個job失敗,剩下的job怎麼執行
Concurrent:並行任務執行設定
Flow Parametters:引數設定。
4.執行
(1).執行一次,點選右下角Execute
(2).定時執行,點選左下角Schedule
設定完成後,執行右下角schedule,即完成排程配置,azkaban這裡的配置與linux下的crontab類似
想要檢視job的排程列表,切換到Schedule選單即可
5.檢視專案flow中各個Job的執行情況
綠色代表成功,藍色是執行,紅色是失敗。可以檢視job執行時間,依賴和日誌,點選details可以檢視各個job執行情況
三、MapReduce 任務
Azkaban 執行 MapReduce 任務,我們以 WordCount 為例
1.準備資料
[[email protected] ~]$ hadoop fs -mkdir -p /azkaban/input
[[email protected] data]$ hadoop fs -put words.txt /azkaban/input
使用hadoop提供的jar統計單詞數量
[[email protected] mapreduce]$ hadoop jar $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.5.jar wordcount /azkaban/input/* /azkaban/outputs/
執行結果
2.建立專案
3.job建立
job
# mapreduce_wordcount.job
type = command
command=sh /home/hadoop/pyshell/wordcount.sh
4.打包上傳,執行
5.檢視執行結果
azkaban上列印的日誌顯示已經成功
四、Hive 指令碼任務
1.建立專案
hive_export_to_mysql
2.job建立
我們要完成,hive中建立表,載入資料,然後匯出資料到mysql,分為兩個job
hive_task1:將hive中的資料匯出到mysql中
hive_task2:hive中建立表,載入資料
依賴關係:hive_task1 依賴 hive_task2
3.flow建立
job 檔案內容如下
# hive_task1.job
type = command
command = sh /home/hadoop/pyshell/hive_task1.sh
dependencies = hive_task2
# hive_task2.job
type = command
command = sh /home/hadoop/pyshell/hive_task2.sh
指令碼內容如下
[[email protected] pyshell]$ cat hive_task1.sh
#!/bin/bash
/home/hadoop/app/sqoop1/bin/sqoop export \
--connect jdbc:mysql://bigdata245:3306/sqoop \
--username root --password [email protected] \
--table EMP \
--export-dir /user/hive/warehouse/test.db/emp \
--input-fields-terminated-by ',' \
--input-null-string 'null' --input-null-non-string 'null' \
-m 1
[[email protected] pyshell]$ cat hive_task2
#!/bin/bash
hive -f /home/hadoop/pyshell/test.sql
sql檔案 test.sql內容如下
[hadoo[email protected] pyshell]$ cat test.sql
create database if not exists test;
use test;
drop table if exists emp;
create table emp(
empno int,
ename string,
job string
)
row format delimited fields terminated by ',';
load data local inpath '/home/hadoop/pyshell/emp.txt' overwrite into table emp;
emp.txt檔案內容如下
[[email protected] pyshell]$ cat emp.txt
1001,Tom,Java
1002,Jack,PHP
1003,Harvey,BigData
1004,David,IOS
1005,Kett,DBA
4.打包上傳
5.執行,檢視執行結果
執行前記得先在mysql中建立表emp,sql語句如下
DROP TABLE IF EXISTS `EMP`;
CREATE TABLE `EMP` (
`empno` int(11) DEFAULT NULL,
`ename` varchar(255) DEFAULT NULL,
`job` varchar(255) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
SET FOREIGN_KEY_CHECKS=1;