離線輔助系統
學習目標:
1、理解flume、sqoop、oozie的應用場景
2、理解flume、sqoop、oozie的基本原理
3、掌握flume、sqoop、oozie的使用方法
離線輔助系統 |
資料接入 |
Flume介紹 |
Flume元件 |
||
Flume實戰案例 |
||
任務排程 |
排程器基礎 |
|
市面上排程工具 |
||
Oozie的使用 |
||
Oozie的流程定義詳解 |
||
資料匯出 |
sqoop基礎知識 |
|
sqoop實戰及原理 |
||
Sqoop資料匯入實戰 |
||
Sqoop資料匯出實戰 |
||
Sqoop作業操作 |
||
Sqoop的原理 |
-
前言
在一個完整的大資料處理系統中,除了hdfs+mapreduce+hive組成分析系統的核心之外,還需要資料採集、結果資料匯出、任務排程等不可或缺的輔助系統,而這些輔助工具在hadoop生態體系中都有便捷的開源框架,如圖所示:
-
一、日誌採集框架Flume
-
1.1 Flume介紹
-
1.1.1 概述
- Flume是一個分散式、可靠、和高可用的海量日誌採集、聚合和傳輸的系統。
- Flume可以採集檔案,socket資料包等各種形式源資料,又可以將採集到的資料輸出到HDFS、hbase、hive、kafka等眾多外部儲存系統中
- 一般的採集需求,通過對flume的簡單配置即可實現
- Flume針對特殊場景也具備良好的自定義擴充套件能力,因此,flume可以適用於大部分的日常資料採集場景
-
1.1.2 執行機制
- Flume分散式系統中最核心的角色是agent,flume採集系統就是由一個個agent所連線起來形成
- 每一個agent相當於一個資料傳遞員[M1] ,內部有三個元件:
-
- Source:採集源,用於跟資料來源對接,以獲取資料
- Sink:下沉地,採集資料的傳送目的,用於往下一級agent傳遞資料或者往最終儲存系統傳遞資料
- Channel:angent內部的資料傳輸通道,用於從source將資料傳遞到sink
-
-
1.1.3 Flume採集系統結構圖
1. 簡單結構
單個agent採集資料
2. 複雜結構
多級agent之間串聯
-
1.2 Flume實戰案例
- 1.2.1 Flume的安裝部署
- Flume的安裝非常簡單,只需要解壓即可,當然,前提是已有hadoop環境
上傳安裝包到資料來源所在節點上
然後解壓 tar -zxvf apache-flume-1.6.0-bin.tar.gz
然後進入flume的目錄,修改conf下的flume-env.sh,在裡面配置JAVA_HOME
2、根據資料採集的需求配置採集方案,描述在配置檔案中(檔名可任意自定義)
3、指定採集方案配置檔案,在相應的節點上啟動flume agent
先用一個最簡單的例子來測試一下程式環境是否正常
- 先在flume的conf目錄下新建一個檔案
vi netcat-logger.conf
# 定義這個agent中各元件的名字 a1.sources = r1 a1.sinks = k1 a1.channels = c1
# 描述和配置source元件:r1 a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444
# 描述和配置sink元件:k1 a1.sinks.k1.type = logger
# 描述和配置channel元件,此處使用是記憶體快取的方式 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100
# 描述和配置source channel sink之間的連線關係 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 |
2.啟動agent去採集資料
bin/flume-ng agent -c conf -f conf/netcat-logger.conf -n a1 -Dflume.root.logger=INFO,console |
-c conf 指定flume自身的配置檔案所在目錄
-f conf/netcat-logger.con 指定我們所描述的採集方案
-n a1 指定我們這個agent的名字
3.測試
先要往agent採集監聽的埠上傳送資料,讓agent有資料可採
隨便在一個能跟agent節點聯網的機器上
telnet anget-hostname port (telnet localhost 44444)
- 1.2.2 採集案例
- a、採集目錄到HDFS
採集需求:某伺服器的某特定目錄下,會不斷產生新的檔案,每當有新檔案出現,就需要把檔案採集到HDFS中去
根據需求,首先定義以下3大要素
- 採集源,即source——監控檔案目錄 : spooldir
- 下沉目標,即sink——HDFS檔案系統 : hdfs sink
- source和sink之間的傳遞通道——channel,可用file channel 也可以用記憶體channel
配置檔案編寫:
#定義三大元件的名稱 agent1.sources = source1 agent1.sinks = sink1 agent1.channels = channel1
# 配置source元件 agent1.sources.source1.type = spooldir agent1.sources.source1.spoolDir = /home/hadoop/logs/ agent1.sources.source1.fileHeader = false
#配置攔截器 agent1.sources.source1.interceptors = i1 agent1.sources.source1.interceptors.i1.type = host agent1.sources.source1.interceptors.i1.hostHeader = hostname
# 配置sink元件 agent1.sinks.sink1.type = hdfs agent1.sinks.sink1.hdfs.path =hdfs://hdp-node-01:9000/weblog/flume-collection/%y-%m-%d/%H-%M agent1.sinks.sink1.hdfs.filePrefix = access_log agent1.sinks.sink1.hdfs.maxOpenFiles = 5000 agent1.sinks.sink1.hdfs.batchSize= 100 agent1.sinks.sink1.hdfs.fileType = DataStream agent1.sinks.sink1.hdfs.writeFormat =Text agent1.sinks.sink1.hdfs.rollSize = 102400 agent1.sinks.sink1.hdfs.rollCount = 1000000 agent1.sinks.sink1.hdfs.rollInterval = 60 #agent1.sinks.sink1.hdfs.round = true #agent1.sinks.sink1.hdfs.roundValue = 10 #agent1.sinks.sink1.hdfs.roundUnit = minute agent1.sinks.sink1.hdfs.useLocalTimeStamp = true # Use a channel which buffers events in memory agent1.channels.channel1.type = memory agent1.channels.channel1.keep-alive = 120 agent1.channels.channel1.capacity = 500000 agent1.channels.channel1.transactionCapacity = 600
# Bind the source and sink to the channel agent1.sources.source1.channels = channel1 agent1.sinks.sink1.channel = channel1 |
Channel引數解釋:
capacity:預設該通道中最大的可以儲存的event數量
trasactionCapacity:每次最大可以從source中拿到或者送到sink中的event數量
keep-alive:event新增到通道中或者移出的允許時間
- b、採集檔案到HDFS
採集需求:比如業務系統使用log4j生成的日誌,日誌內容不斷增加,需要把追加到日誌檔案中的資料實時採集到hdfs
根據需求,首先定義以下3大要素
- 採集源,即source——監控檔案內容更新 : exec ‘tail -F file’
- 下沉目標,即sink——HDFS檔案系統 : hdfs sink
- Source和sink之間的傳遞通道——channel,可用file channel 也可以用 記憶體channel
配置檔案編寫:
agent1.sources = source1 agent1.sinks = sink1 agent1.channels = channel1
# Describe/configure tail -F source1 agent1.sources.source1.type = exec agent1.sources.source1.command = tail -F /home/hadoop/logs/access_log agent1.sources.source1.channels = channel1
#configure host for source agent1.sources.source1.interceptors = i1 agent1.sources.source1.interceptors.i1.type = host agent1.sources.source1.interceptors.i1.hostHeader = hostname
# Describe sink1 agent1.sinks.sink1.type = hdfs #a1.sinks.k1.channel = c1 agent1.sinks.sink1.hdfs.path =hdfs://hdp-node-01:9000/weblog/flume-collection/%y-%m-%d/%H-%M agent1.sinks.sink1.hdfs.filePrefix = access_log agent1.sinks.sink1.hdfs.maxOpenFiles = 5000 agent1.sinks.sink1.hdfs.batchSize= 100 agent1.sinks.sink1.hdfs.fileType = DataStream agent1.sinks.sink1.hdfs.writeFormat =Text agent1.sinks.sink1.hdfs.rollSize = 102400 agent1.sinks.sink1.hdfs.rollCount = 1000000 agent1.sinks.sink1.hdfs.rollInterval = 60 agent1.sinks.sink1.hdfs.round = true agent1.sinks.sink1.hdfs.roundValue = 10 agent1.sinks.sink1.hdfs.roundUnit = minute agent1.sinks.sink1.hdfs.useLocalTimeStamp = true
# Use a channel which buffers events in memory agent1.channels.channel1.type = memory agent1.channels.channel1.keep-alive = 120 agent1.channels.channel1.capacity = 500000 agent1.channels.channel1.transactionCapacity = 600
# Bind the source and sink to the channel agent1.sources.source1.channels = channel1 agent1.sinks.sink1.channel = channel1 |
-
1.3 更多source和sink元件
Flume支援眾多的source和sink型別,詳細手冊可參考官方文件
http://flume.apache.org/FlumeUserGuide.html
-
二、 工作流排程器azkaban
-
2.1 概述
-
2.1.1為什麼需要工作流排程系統
- 一個完整的資料分析系統通常都是由大量任務單元組成:
shell指令碼程式,java程式,mapreduce程式、hive指令碼等
- 各任務單元之間存在時間先後及前後依賴關係
- 為了很好地組織起這樣的複雜執行計劃,需要一個工作流排程系統來排程執行;
例如,我們可能有這樣一個需求,某個業務系統每天產生20G原始資料,我們每天都要對其進行處理,處理步驟如下所示:
- 通過Hadoop先將原始資料同步到HDFS上;
- 藉助MapReduce計算框架對原始資料進行轉換,生成的資料以分割槽表的形式儲存到多張Hive表中;
- 需要對Hive中多個表的資料進行JOIN處理,得到一個明細資料Hive大表;
- 將明細資料進行復雜的統計分析,得到結果報表資訊;
- 需要將統計分析得到的結果資料同步到業務系統中,供業務呼叫使用。
-
2.1.2 工作流排程實現方式
簡單的任務排程:直接使用linux的crontab來定義;
複雜的任務排程:開發排程平臺
或使用現成的開源排程系統,比如ooize、azkaban等
-
2.1.3 常見工作流排程系統
市面上目前有許多工作流排程器
在hadoop領域,常見的工作流排程器有Oozie, Azkaban,Cascading,Hamake等
-
2.1.4 各種排程工具特性對比
下面的表格對上述四種hadoop工作流排程器的關鍵特性進行了比較,儘管這些工作流排程器能夠解決的需求場景基本一致,但在設計理念,目標使用者,應用場景等方面還是存在顯著的區別,在做技術選型的時候,可以提供參考
特性 |
Hamake |
Oozie |
Azkaban |
Cascading |
工作流描述語言 |
XML |
XML (xPDL based) |
text file with key/value pairs |
Java API |
依賴機制 |
data-driven |
explicit |
explicit |
explicit |
是否要web容器 |
No |
Yes |
Yes |
No |
進度跟蹤 |
console/log messages |
web page |
web page |
Java API |
Hadoop job排程支援 |
no |
yes |
yes |
yes |
執行模式 |
command line utility |
daemon |
daemon |
API |
Pig支援 |
yes |
yes |
yes |
yes |
事件通知 |
no |
no |
no |
yes |
需要安裝 |
no |
yes |
yes |
no |
支援的hadoop版本 |
0.18+ |
0.20+ |
currently unknown |
0.18+ |
重試支援 |
no |
workflownode evel |
yes |
yes |
執行任意命令 |
yes |
yes |
yes |
yes |
Amazon EMR支援 |
yes |
no |
currently unknown |
yes |
-
2.1.5 Azkaban與Oozie對比
對市面上最流行的兩種排程器,給出以下詳細對比,以供技術選型參考。總體來說,ooize相比azkaban是一個重量級的任務排程系統,功能全面,但配置使用也更復雜。如果可以不在意某些功能的缺失,輕量級排程器azkaban是很不錯的候選物件。
詳情如下:
- 功能
兩者均可以排程mapreduce,pig,java,指令碼工作流任務
兩者均可以定時執行工作流任務
- 工作流定義
Azkaban使用Properties檔案定義工作流
Oozie使用XML檔案定義工作流
- 工作流傳參
Azkaban支援直接傳參,例如${input}
Oozie支援引數和EL表示式,例如${fs:dirSize(myInputDir)}
- 定時執行
Azkaban的定時執行任務是基於時間的
Oozie的定時執行任務基於時間和輸入資料
- 資源管理
Azkaban有較嚴格的許可權控制,如使用者對工作流進行讀/寫/執行等操作
Oozie暫無嚴格的許可權控制
- 工作流執行
Azkaban有兩種執行模式,分別是solo server mode(executor server和web server部署在同一臺節點)和multi server mode(executor server和web server可以部署在不同節點)
Oozie作為工作流伺服器執行,支援多使用者和多工作流
- 工作流管理
Azkaban支援瀏覽器以及ajax方式操作工作流
Oozie支援命令列、HTTP REST、Java API、瀏覽器操作工作流
-
2.2 Azkaban介紹
Azkaban是由Linkedin開源的一個批量工作流任務排程器。用於在一個工作流內以一個特定的順序執行一組工作和流程。Azkaban定義了一種KV檔案格式來建立任務之間的依賴關係,並提供一個易於使用的web使用者介面維護和跟蹤你的工作流。
它有如下功能特點:
- Web使用者介面
- 方便上傳工作流
- 方便設定任務之間的關係
- 排程工作流
- 認證/授權(許可權的工作)
- 能夠殺死並重新啟動工作流
- 模組化和可插拔的外掛機制
- 專案工作區
- 工作流和任務的日誌記錄和審計
-
2. 3 Azkaban安裝部署
-
準備工作
Azkaban Web伺服器
azkaban-web-server-2.5.0.tar.gz
Azkaban執行伺服器
azkaban-executor-server-2.5.0.tar.gz
MySQL
目前azkaban只支援 mysql,需安裝mysql伺服器,本文件中預設已安裝好mysql伺服器,並建立了 root使用者,密碼 root.
下載地址:http://azkaban.github.io/downloads.html
-
安裝
將安裝檔案上傳到叢集,最好上傳到安裝 hive、sqoop的機器上,方便命令的執行
在當前使用者目錄下新建 azkabantools目錄,用於存放源安裝檔案.新建azkaban目錄,用於存放azkaban執行程式
-
azkaban web伺服器安裝
解壓azkaban-web-server-2.5.0.tar.gz
命令: tar –zxvf azkaban-web-server-2.5.0.tar.gz
將解壓後的azkaban-web-server-2.5.0 移動到 azkaban目錄中,並重新命名 webserver
命令: mv azkaban-web-server-2.5.0 ../azkaban
cd ../azkaban
mv azkaban-web-server-2.5.0 server
-
azkaban 執行服器安裝
解壓azkaban-executor-server-2.5.0.tar.gz
命令:tar –zxvf azkaban-executor-server-2.5.0.tar.gz
將解壓後的azkaban-executor-server-2.5.0 移動到 azkaban目錄中,並重新命名 executor
命令:mv azkaban-executor-server-2.5.0 ../azkaban
cd ../azkaban
mv azkaban-executor-server-2.5.0 executor
azkaban指令碼匯入
解壓: azkaban-sql-script-2.5.0.tar.gz
命令:tar –zxvf azkaban-sql-script-2.5.0.tar.gz
將解壓後的mysql 指令碼,匯入到mysql中:
進入mysql
mysql> create database azkaban;
mysql> use azkaban;
Database changed
mysql> source /home/hadoop/azkaban-2.5.0/create-all-sql-2.5.0.sql;
-
建立SSL配置
參考地址: http://docs.codehaus.org/display/JETTY/How+to+configure+SSL
命令: keytool -keystore keystore -alias jetty -genkey -keyalg RSA
執行此命令後,會提示輸入當前生成 keystor的密碼及相應資訊,輸入的密碼請勞記,資訊如下:
輸入keystore密碼:
再次輸入新密碼:
您的名字與姓氏是什麼?
[Unknown]:
您的組織單位名稱是什麼?
[Unknown]:
您的組織名稱是什麼?
[Unknown]:
您所在的城市或區域名稱是什麼?
[Unknown]:
您所在的州或省份名稱是什麼?
[Unknown]:
該單位的兩字母國家程式碼是什麼
[Unknown]: CN
CN=Unknown, OU=Unknown, O=Unknown, L=Unknown, ST=Unknown, C=CN 正確嗎?
[否]: y
輸入<jetty>的主密碼
(如果和 keystore 密碼相同,按回車):
再次輸入新密碼:
完成上述工作後,將在當前目錄生成 keystore 證書檔案,將keystore 考貝到 azkaban web伺服器根目錄中.如:cp keystore azkaban/webserver
-
配置檔案
注:先配置好伺服器節點上的時區
- 先生成時區配置檔案Asia/Shanghai,用互動式命令 tzselect 即可
- 拷貝該時區檔案,覆蓋系統本地時區配置
cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime
azkaban web伺服器配置
進入azkaban web伺服器安裝目錄 conf目錄
修改azkaban.properties檔案
命令vi azkaban.properties
內容說明如下:
#Azkaban Personalization Settings azkaban.name=Test #伺服器UI名稱,用於伺服器上方顯示的名字 azkaban.label=My Local Azkaban #描述 azkaban.color=#FF3601 #UI顏色 azkaban.default.servlet.path=/index # web.resource.dir=web/ #預設根web目錄 default.timezone.id=Asia/Shanghai #預設時區,已改為亞洲/上海 預設為美國
#Azkaban UserManager class user.manager.class=azkaban.user.XmlUserManager #使用者許可權管理預設類 user.manager.xml.file=conf/azkaban-users.xml #使用者配置,具體配置參加下文
#Loader for projects executor.global.properties=conf/global.properties # global配置檔案所在位置 azkaban.project.dir=projects #
database.type=mysql #資料庫型別 mysql.port=3306 #埠號 mysql.host=hadoop03 #資料庫連線IP mysql.database=azkaban #資料庫例項名 mysql.user=root #資料庫使用者名稱 mysql.password=root #資料庫密碼 mysql.numconnections=100 #最大連線數
# Velocity dev mode velocity.dev.mode=false # Jetty伺服器屬性. jetty.maxThreads=25 #最大執行緒數 jetty.ssl.port=8443 #Jetty SSL埠 jetty.port=8081 #Jetty埠 jetty.keystore=keystore #SSL檔名 jetty.password=123456 #SSL檔案密碼 jetty.keypassword=123456 #Jetty主密碼 與 keystore檔案相同 jetty.truststore=keystore #SSL檔名 jetty.trustpassword=123456 # SSL檔案密碼
# 執行伺服器屬性 executor.port=12321 #執行伺服器埠
# 郵件設定 [email protected] #傳送郵箱 mail.host=smtp.163.com #傳送郵箱smtp地址 mail.user=xxxxxxxx #傳送郵件時顯示的名稱 mail.password=********** #郵箱密碼 [email protected] #任務失敗時傳送郵件的地址 [email protected] #任務成功時傳送郵件的地址 lockdown.create.projects=false # cache.directory=cache #快取目錄
|
azkaban 執行伺服器配置
進入執行伺服器安裝目錄conf,修改azkaban.properties
vi azkaban.properties
#Azkaban default.timezone.id=Asia/Shanghai #時區
# Azkaban JobTypes 外掛配置 azkaban.jobtype.plugin.dir=plugins/jobtypes #jobtype 外掛所在位置
#Loader for projects executor.global.properties=conf/global.properties azkaban.project.dir=projects
#資料庫設定 database.type=mysql #資料庫型別(目前只支援mysql) mysql.port=3306 #資料庫埠號 mysql.host=192.168.20.200 #資料庫IP地址 mysql.database=azkaban #資料庫例項名 mysql.user=azkaban #資料庫使用者名稱 mysql.password=oracle #資料庫密碼 mysql.numconnections=100 #最大連線數
# 執行伺服器配置 executor.maxThreads=50 #最大執行緒數 executor.port=12321 #埠號(如修改,請與web服務中一致) executor.flow.threads=30 #執行緒數 |
使用者配置
進入azkaban web伺服器conf目錄,修改azkaban-users.xml
vi azkaban-users.xml 增加 管理員使用者
<azkaban-users> <user username="azkaban" password="azkaban" roles="admin" groups="azkaban" /> <user username="metrics" password="metrics" roles="metrics"/> <user username="admin" password="admin" roles="admin,metrics" /> <role name="admin" permissions="ADMIN" /> <role name="metrics" permissions="METRICS"/> </azkaban-users> |
-
啟動
web伺服器
在azkaban web伺服器目錄下執行啟動命令
bin/azkaban-web-start.sh
注:在web伺服器根目錄執行
執行伺服器
在執行伺服器目錄下執行啟動命令
bin/azkaban-executor-start.sh ./
注:只能要執行伺服器根目錄執行
啟動完成後,在瀏覽器(建議使用谷歌瀏覽器)中輸入https://伺服器IP地址:8443 ,即可訪問azkaban服務了.在登入中輸入剛才新的戶用名及密碼,點選 login.
-
2.4 Azkaban實戰
Azkaba內建的任務型別支援command、java
-
Command型別單一job示例
- 1.建立job描述檔案
vi command.job
#command.job type=command command=echo 'hello' |
- 2.將job資原始檔打包成zip檔案
zip command.job
- 3.通過azkaban的web管理平臺建立project並上傳job壓縮包
首先建立project
上傳zip包
- 4.啟動執行該job
-
Command型別多job工作流flow
- 1.建立有依賴關係的多個job描述
第一個job:foo.job
# foo.job type=command command=echo foo |
第二個job:bar.job依賴foo.job
# bar.job type=command dependencies=foo command=echo bar |
- 2.將所有job資原始檔打到一個zip包中
- 3.在azkaban的web管理介面建立工程並上傳zip包
- 4.啟動工作流flow
-
HDFS操作任務
- 1.建立job描述檔案
# fs.job type=command |