1. 程式人生 > >離線輔助系統

離線輔助系統

學習目標:

1、理解flume、sqoop、oozie的應用場景

2、理解flume、sqoop、oozie的基本原理

3、掌握flume、sqoop、oozie的使用方法

 

離線輔助系統

資料接入

Flume介紹

Flume元件

Flume實戰案例

任務排程

排程器基礎

市面上排程工具

Oozie的使用

Oozie的流程定義詳解

資料匯出

sqoop基礎知識

sqoop實戰及原理

Sqoop資料匯入實戰

Sqoop資料匯出實戰

Sqoop作業操作

Sqoop的原理

 

 

  • 前言

在一個完整的大資料處理系統中,除了hdfs+mapreduce+hive組成分析系統的核心之外,還需要資料採集、結果資料匯出、任務排程等不可或缺的輔助系統,而這些輔助工具在hadoop生態體系中都有便捷的開源框架,如圖所示:

 

 

 

 

 

  • 一、日誌採集框架Flume

  • 1.1 Flume介紹

  • 1.1.1 概述

  1. Flume是一個分散式、可靠、和高可用的海量日誌採集、聚合和傳輸的系統。
  2. Flume可以採集檔案,socket資料包等各種形式源資料,又可以將採集到的資料輸出到HDFS、hbase、hive、kafka等眾多外部儲存系統中
  3. 一般的採集需求,通過對flume的簡單配置即可實現
  4. Flume針對特殊場景也具備良好的自定義擴充套件能力,因此,flume可以適用於大部分的日常資料採集場景

 

  • 1.1.2 執行機制

  1. Flume分散式系統中最核心的角色是agent,flume採集系統就是由一個個agent所連線起來形成
  2. 每一個agent相當於一個資料傳遞員[M1] ,內部有三個元件:
      1. Source:採集源,用於跟資料來源對接,以獲取資料
      2. Sink:下沉地,採集資料的傳送目的,用於往下一級agent傳遞資料或者往最終儲存系統傳遞資料
      3. Channel:angent內部的資料傳輸通道,用於從source將資料傳遞到sink

 

 

 

  • 1.1.3 Flume採集系統結構圖

 

1. 簡單結構

單個agent採集資料

2. 複雜結構

多級agent之間串聯

 

 

 

 

  • 1.2 Flume實戰案例

  • 1.2.1 Flume的安裝部署
  1. Flume的安裝非常簡單,只需要解壓即可,當然,前提是已有hadoop環境

上傳安裝包到資料來源所在節點上

然後解壓  tar -zxvf apache-flume-1.6.0-bin.tar.gz

然後進入flume的目錄,修改conf下的flume-env.sh,在裡面配置JAVA_HOME

 

2、根據資料採集的需求配置採集方案,描述在配置檔案中(檔名可任意自定義)

3、指定採集方案配置檔案,在相應的節點上啟動flume agent

 

先用一個最簡單的例子來測試一下程式環境是否正常

  1. 先在flume的conf目錄下新建一個檔案

vi   netcat-logger.conf

# 定義這個agent中各元件的名字

a1.sources = r1

a1.sinks = k1

a1.channels = c1

 

# 描述和配置source元件:r1

a1.sources.r1.type = netcat

a1.sources.r1.bind = localhost

a1.sources.r1.port = 44444

 

# 描述和配置sink元件:k1

a1.sinks.k1.type = logger

 

# 描述和配置channel元件,此處使用是記憶體快取的方式

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

 

# 描述和配置source  channel   sink之間的連線關係

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

 

2.啟動agent去採集資料

bin/flume-ng agent -c conf -f conf/netcat-logger.conf -n a1  -Dflume.root.logger=INFO,console

-c conf   指定flume自身的配置檔案所在目錄

-f conf/netcat-logger.con  指定我們所描述的採集方案

-n a1  指定我們這個agent的名字

3.測試

先要往agent採集監聽的埠上傳送資料,讓agent有資料可採

隨便在一個能跟agent節點聯網的機器上

telnet anget-hostname  port   (telnet localhost 44444)

 

 

 

  • 1.2.2 採集案例
  • a、採集目錄到HDFS

採集需求:某伺服器的某特定目錄下,會不斷產生新的檔案,每當有新檔案出現,就需要把檔案採集到HDFS中去

根據需求,首先定義以下3大要素

  1. 採集源,即source——監控檔案目錄 :  spooldir
  2. 下沉目標,即sink——HDFS檔案系統  :  hdfs sink
  3. source和sink之間的傳遞通道——channel,可用file channel 也可以用記憶體channel

 

配置檔案編寫:

#定義三大元件的名稱

agent1.sources = source1

agent1.sinks = sink1

agent1.channels = channel1

 

# 配置source元件

agent1.sources.source1.type = spooldir

agent1.sources.source1.spoolDir = /home/hadoop/logs/

agent1.sources.source1.fileHeader = false

 

#配置攔截器

agent1.sources.source1.interceptors = i1

agent1.sources.source1.interceptors.i1.type = host

agent1.sources.source1.interceptors.i1.hostHeader = hostname

 

# 配置sink元件

agent1.sinks.sink1.type = hdfs

agent1.sinks.sink1.hdfs.path =hdfs://hdp-node-01:9000/weblog/flume-collection/%y-%m-%d/%H-%M

agent1.sinks.sink1.hdfs.filePrefix = access_log

agent1.sinks.sink1.hdfs.maxOpenFiles = 5000

agent1.sinks.sink1.hdfs.batchSize= 100

agent1.sinks.sink1.hdfs.fileType = DataStream

agent1.sinks.sink1.hdfs.writeFormat =Text

agent1.sinks.sink1.hdfs.rollSize = 102400

agent1.sinks.sink1.hdfs.rollCount = 1000000

agent1.sinks.sink1.hdfs.rollInterval = 60

#agent1.sinks.sink1.hdfs.round = true

#agent1.sinks.sink1.hdfs.roundValue = 10

#agent1.sinks.sink1.hdfs.roundUnit = minute

agent1.sinks.sink1.hdfs.useLocalTimeStamp = true

# Use a channel which buffers events in memory

agent1.channels.channel1.type = memory

agent1.channels.channel1.keep-alive = 120

agent1.channels.channel1.capacity = 500000

agent1.channels.channel1.transactionCapacity = 600

 

# Bind the source and sink to the channel

agent1.sources.source1.channels = channel1

agent1.sinks.sink1.channel = channel1

 

Channel引數解釋:

capacity:預設該通道中最大的可以儲存的event數量

trasactionCapacity:每次最大可以從source中拿到或者送到sink中的event數量

keep-alive:event新增到通道中或者移出的允許時間

 

  • b、採集檔案到HDFS

採集需求:比如業務系統使用log4j生成的日誌,日誌內容不斷增加,需要把追加到日誌檔案中的資料實時採集到hdfs

 

根據需求,首先定義以下3大要素

  1. 採集源,即source——監控檔案內容更新 :  exec  ‘tail -F file’
  2. 下沉目標,即sink——HDFS檔案系統  :  hdfs sink
  3. Source和sink之間的傳遞通道——channel,可用file channel 也可以用 記憶體channel

 

配置檔案編寫:

agent1.sources = source1

agent1.sinks = sink1

agent1.channels = channel1

 

# Describe/configure tail -F source1

agent1.sources.source1.type = exec

agent1.sources.source1.command = tail -F /home/hadoop/logs/access_log

agent1.sources.source1.channels = channel1

 

#configure host for source

agent1.sources.source1.interceptors = i1

agent1.sources.source1.interceptors.i1.type = host

agent1.sources.source1.interceptors.i1.hostHeader = hostname

 

# Describe sink1

agent1.sinks.sink1.type = hdfs

#a1.sinks.k1.channel = c1

agent1.sinks.sink1.hdfs.path =hdfs://hdp-node-01:9000/weblog/flume-collection/%y-%m-%d/%H-%M

agent1.sinks.sink1.hdfs.filePrefix = access_log

agent1.sinks.sink1.hdfs.maxOpenFiles = 5000

agent1.sinks.sink1.hdfs.batchSize= 100

agent1.sinks.sink1.hdfs.fileType = DataStream

agent1.sinks.sink1.hdfs.writeFormat =Text

agent1.sinks.sink1.hdfs.rollSize = 102400

agent1.sinks.sink1.hdfs.rollCount = 1000000

agent1.sinks.sink1.hdfs.rollInterval = 60

agent1.sinks.sink1.hdfs.round = true

agent1.sinks.sink1.hdfs.roundValue = 10

agent1.sinks.sink1.hdfs.roundUnit = minute

agent1.sinks.sink1.hdfs.useLocalTimeStamp = true

 

# Use a channel which buffers events in memory

agent1.channels.channel1.type = memory

agent1.channels.channel1.keep-alive = 120

agent1.channels.channel1.capacity = 500000

agent1.channels.channel1.transactionCapacity = 600

 

# Bind the source and sink to the channel

agent1.sources.source1.channels = channel1

agent1.sinks.sink1.channel = channel1

 

 

 

  • 1.3 更多source和sink元件

Flume支援眾多的source和sink型別,詳細手冊可參考官方文件

http://flume.apache.org/FlumeUserGuide.html

  • 二、 工作流排程器azkaban

  • 2.1 概述

  • 2.1.1為什麼需要工作流排程系統

  • 一個完整的資料分析系統通常都是由大量任務單元組成:

shell指令碼程式,java程式,mapreduce程式、hive指令碼等

  • 各任務單元之間存在時間先後及前後依賴關係
  • 為了很好地組織起這樣的複雜執行計劃,需要一個工作流排程系統來排程執行;

 

例如,我們可能有這樣一個需求,某個業務系統每天產生20G原始資料,我們每天都要對其進行處理,處理步驟如下所示:

  1. 通過Hadoop先將原始資料同步到HDFS上;
  2. 藉助MapReduce計算框架對原始資料進行轉換,生成的資料以分割槽表的形式儲存到多張Hive表中;
  3. 需要對Hive中多個表的資料進行JOIN處理,得到一個明細資料Hive大表;
  4. 將明細資料進行復雜的統計分析,得到結果報表資訊;
  5. 需要將統計分析得到的結果資料同步到業務系統中,供業務呼叫使用。

 

 

  • 2.1.2 工作流排程實現方式

簡單的任務排程:直接使用linux的crontab來定義;

複雜的任務排程:開發排程平臺

或使用現成的開源排程系統,比如ooize、azkaban等

 

 

  • 2.1.3 常見工作流排程系統

市面上目前有許多工作流排程器

在hadoop領域,常見的工作流排程器有Oozie, Azkaban,Cascading,Hamake等

 

 

 

 

  • 2.1.4 各種排程工具特性對比

下面的表格對上述四種hadoop工作流排程器的關鍵特性進行了比較,儘管這些工作流排程器能夠解決的需求場景基本一致,但在設計理念,目標使用者,應用場景等方面還是存在顯著的區別,在做技術選型的時候,可以提供參考

特性

Hamake

Oozie

Azkaban

Cascading

工作流描述語言

XML

XML (xPDL based)

text file with key/value pairs

Java API

依賴機制

data-driven

explicit

explicit

explicit

是否要web容器

No

Yes

Yes

No

進度跟蹤

console/log messages

web page

web page

Java API

Hadoop job排程支援

no

yes

yes

yes

執行模式

command line utility

daemon

daemon

API

Pig支援

yes

yes

yes

yes

事件通知

no

no

no

yes

需要安裝

no

yes

yes

no

支援的hadoop版本

0.18+

0.20+

currently unknown

0.18+

重試支援

no

workflownode evel

yes

yes

執行任意命令

yes

yes

yes

yes

Amazon EMR支援

yes

no

currently unknown

yes

  • 2.1.5 Azkaban與Oozie對比

對市面上最流行的兩種排程器,給出以下詳細對比,以供技術選型參考。總體來說,ooize相比azkaban是一個重量級的任務排程系統,功能全面,但配置使用也更復雜。如果可以不在意某些功能的缺失,輕量級排程器azkaban是很不錯的候選物件。

詳情如下:

  • 功能

兩者均可以排程mapreduce,pig,java,指令碼工作流任務

兩者均可以定時執行工作流任務

 

  • 工作流定義

Azkaban使用Properties檔案定義工作流

Oozie使用XML檔案定義工作流

 

  • 工作流傳參

Azkaban支援直接傳參,例如${input}

Oozie支援引數和EL表示式,例如${fs:dirSize(myInputDir)}

 

  • 定時執行

Azkaban的定時執行任務是基於時間的

Oozie的定時執行任務基於時間和輸入資料

 

  • 資源管理

Azkaban有較嚴格的許可權控制,如使用者對工作流進行讀/寫/執行等操作

Oozie暫無嚴格的許可權控制

 

  • 工作流執行

Azkaban有兩種執行模式,分別是solo server mode(executor server和web server部署在同一臺節點)和multi server mode(executor server和web server可以部署在不同節點)

Oozie作為工作流伺服器執行,支援多使用者和多工作流

 

  • 工作流管理

Azkaban支援瀏覽器以及ajax方式操作工作流

Oozie支援命令列、HTTP REST、Java API、瀏覽器操作工作流

 

 

  • 2.2 Azkaban介紹

Azkaban是由Linkedin開源的一個批量工作流任務排程器。用於在一個工作流內以一個特定的順序執行一組工作和流程。Azkaban定義了一種KV檔案格式來建立任務之間的依賴關係,並提供一個易於使用的web使用者介面維護和跟蹤你的工作流。

它有如下功能特點:

  • Web使用者介面
  • 方便上傳工作流
  • 方便設定任務之間的關係
  • 排程工作流
  • 認證/授權(許可權的工作)
  • 能夠殺死並重新啟動工作流
  • 模組化和可插拔的外掛機制
  • 專案工作區
  • 工作流和任務的日誌記錄和審計

 

 

 

  • 2. 3 Azkaban安裝部署

  • 準備工作

Azkaban Web伺服器

azkaban-web-server-2.5.0.tar.gz

Azkaban執行伺服器 

azkaban-executor-server-2.5.0.tar.gz

 

MySQL

目前azkaban只支援 mysql,需安裝mysql伺服器,本文件中預設已安裝好mysql伺服器,並建立了 root使用者,密碼 root.

 

 

下載地址:http://azkaban.github.io/downloads.html

 

 

 

  • 安裝

將安裝檔案上傳到叢集,最好上傳到安裝 hive、sqoop的機器上,方便命令的執行

在當前使用者目錄下新建 azkabantools目錄,用於存放源安裝檔案.新建azkaban目錄,用於存放azkaban執行程式

  • azkaban web伺服器安裝

解壓azkaban-web-server-2.5.0.tar.gz

命令: tar –zxvf azkaban-web-server-2.5.0.tar.gz

將解壓後的azkaban-web-server-2.5.0 移動到 azkaban目錄中,並重新命名 webserver

命令: mv azkaban-web-server-2.5.0 ../azkaban

        cd ../azkaban

        mv azkaban-web-server-2.5.0  server

 

  • azkaban 執行服器安裝

解壓azkaban-executor-server-2.5.0.tar.gz

命令:tar –zxvf azkaban-executor-server-2.5.0.tar.gz

將解壓後的azkaban-executor-server-2.5.0 移動到 azkaban目錄中,並重新命名 executor

命令:mv azkaban-executor-server-2.5.0  ../azkaban

cd ../azkaban

mv azkaban-executor-server-2.5.0  executor

 

azkaban指令碼匯入

解壓: azkaban-sql-script-2.5.0.tar.gz

命令:tar –zxvf azkaban-sql-script-2.5.0.tar.gz

將解壓後的mysql 指令碼,匯入到mysql中:

進入mysql

mysql> create database azkaban;

mysql> use azkaban;

Database changed

mysql> source /home/hadoop/azkaban-2.5.0/create-all-sql-2.5.0.sql;

 

 

 

 

 

 

  • 建立SSL配置

參考地址: http://docs.codehaus.org/display/JETTY/How+to+configure+SSL

命令: keytool -keystore keystore -alias jetty -genkey -keyalg RSA

執行此命令後,會提示輸入當前生成 keystor的密碼及相應資訊,輸入的密碼請勞記,資訊如下:

 

輸入keystore密碼: 

再次輸入新密碼:

您的名字與姓氏是什麼?

  [Unknown]: 

您的組織單位名稱是什麼?

  [Unknown]: 

您的組織名稱是什麼?

  [Unknown]: 

您所在的城市或區域名稱是什麼?

  [Unknown]: 

您所在的州或省份名稱是什麼?

  [Unknown]: 

該單位的兩字母國家程式碼是什麼

  [Unknown]:  CN

CN=Unknown, OU=Unknown, O=Unknown, L=Unknown, ST=Unknown, C=CN 正確嗎?

  [否]:  y

 

輸入<jetty>的主密碼

        (如果和 keystore 密碼相同,按回車): 

再次輸入新密碼:

完成上述工作後,將在當前目錄生成 keystore 證書檔案,將keystore 考貝到 azkaban web伺服器根目錄中.如:cp keystore azkaban/webserver

 

  • 配置檔案

注:先配置好伺服器節點上的時區

  1. 先生成時區配置檔案Asia/Shanghai,用互動式命令 tzselect 即可
  2. 拷貝該時區檔案,覆蓋系統本地時區配置

cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime 

 

 

azkaban web伺服器配置

進入azkaban web伺服器安裝目錄 conf目錄

 

修改azkaban.properties檔案

命令vi azkaban.properties

內容說明如下:

#Azkaban Personalization Settings

azkaban.name=Test                           #伺服器UI名稱,用於伺服器上方顯示的名字

azkaban.label=My Local Azkaban                               #描述

azkaban.color=#FF3601                                                 #UI顏色

azkaban.default.servlet.path=/index                         #

web.resource.dir=web/                                                 #預設根web目錄

default.timezone.id=Asia/Shanghai                           #預設時區,已改為亞洲/上海 預設為美國

 

#Azkaban UserManager class

user.manager.class=azkaban.user.XmlUserManager   #使用者許可權管理預設類

user.manager.xml.file=conf/azkaban-users.xml              #使用者配置,具體配置參加下文

 

#Loader for projects

executor.global.properties=conf/global.properties    # global配置檔案所在位置

azkaban.project.dir=projects                                                #

 

database.type=mysql                                                              #資料庫型別

mysql.port=3306                                                                       #埠號

mysql.host=hadoop03                                                      #資料庫連線IP

mysql.database=azkaban                                                       #資料庫例項名

mysql.user=root                                                                 #資料庫使用者名稱

mysql.password=root                                                          #資料庫密碼

mysql.numconnections=100                                                  #最大連線數

 

# Velocity dev mode

velocity.dev.mode=false

# Jetty伺服器屬性.

jetty.maxThreads=25                                                               #最大執行緒數

jetty.ssl.port=8443                                                                   #Jetty SSL埠

jetty.port=8081                                                                         #Jetty埠

jetty.keystore=keystore                                                          #SSL檔名

jetty.password=123456                                                             #SSL檔案密碼

jetty.keypassword=123456                                                      #Jetty主密碼 與 keystore檔案相同

jetty.truststore=keystore                                                                #SSL檔名

jetty.trustpassword=123456                                                   # SSL檔案密碼

 

# 執行伺服器屬性

executor.port=12321                                                               #執行伺服器埠

 

# 郵件設定

[email protected]                                       #傳送郵箱

mail.host=smtp.163.com                                                       #傳送郵箱smtp地址

mail.user=xxxxxxxx                                       #傳送郵件時顯示的名稱

mail.password=**********                                                 #郵箱密碼

[email protected]                              #任務失敗時傳送郵件的地址

[email protected]                            #任務成功時傳送郵件的地址

lockdown.create.projects=false                                           #

cache.directory=cache                                                            #快取目錄

 

 

azkaban 執行伺服器配置

進入執行伺服器安裝目錄conf,修改azkaban.properties

vi azkaban.properties

#Azkaban

default.timezone.id=Asia/Shanghai                                              #時區

 

# Azkaban JobTypes 外掛配置

azkaban.jobtype.plugin.dir=plugins/jobtypes                   #jobtype 外掛所在位置

 

#Loader for projects

executor.global.properties=conf/global.properties

azkaban.project.dir=projects

 

#資料庫設定

database.type=mysql                                                                       #資料庫型別(目前只支援mysql)

mysql.port=3306                                                                                #資料庫埠號

mysql.host=192.168.20.200                                                           #資料庫IP地址

mysql.database=azkaban                                                                #資料庫例項名

mysql.user=azkaban                                                                         #資料庫使用者名稱

mysql.password=oracle                                                                   #資料庫密碼

mysql.numconnections=100                                                           #最大連線數

 

# 執行伺服器配置

executor.maxThreads=50                                                                #最大執行緒數

executor.port=12321                                                               #埠號(如修改,請與web服務中一致)

executor.flow.threads=30                                                                #執行緒數

 

 

使用者配置

進入azkaban web伺服器conf目錄,修改azkaban-users.xml

vi azkaban-users.xml 增加 管理員使用者

<azkaban-users>

        <user username="azkaban" password="azkaban" roles="admin" groups="azkaban" />

        <user username="metrics" password="metrics" roles="metrics"/>

        <user username="admin" password="admin" roles="admin,metrics" />

        <role name="admin" permissions="ADMIN" />

        <role name="metrics" permissions="METRICS"/>

</azkaban-users>

 

 

  • 啟動

web伺服器

在azkaban web伺服器目錄下執行啟動命令

bin/azkaban-web-start.sh

注:在web伺服器根目錄執行

 

執行伺服器

在執行伺服器目錄下執行啟動命令

bin/azkaban-executor-start.sh ./

注:只能要執行伺服器根目錄執行

 

啟動完成後,在瀏覽器(建議使用谷歌瀏覽器)中輸入https://伺服器IP地址:8443 ,即可訪問azkaban服務了.在登入中輸入剛才新的戶用名及密碼,點選 login.

 

  • 2.4 Azkaban實戰

Azkaba內建的任務型別支援command、java

 

  • Command型別單一job示例

  • 1.建立job描述檔案

vi command.job

#command.job

type=command                                                   

command=echo 'hello'

 

 

  • 2.將job資原始檔打包成zip檔案

zip command.job

 

  • 3.通過azkaban的web管理平臺建立project並上傳job壓縮包

首先建立project

上傳zip包

  • 4.啟動執行該job

 

  • Command型別多job工作流flow

  • 1.建立有依賴關係的多個job描述

第一個job:foo.job

# foo.job

type=command

command=echo foo

第二個job:bar.job依賴foo.job

# bar.job

type=command

dependencies=foo

command=echo bar

 

  • 2.將所有job資原始檔打到一個zip包中

 

  • 3.在azkaban的web管理介面建立工程並上傳zip包
  • 4.啟動工作流flow

 

  • HDFS操作任務

  • 1.建立job描述檔案

# fs.job

type=command