1. 程式人生 > >Hadoop 的 Oozie 工作流管理引擎的實際應用(一)

Hadoop 的 Oozie 工作流管理引擎的實際應用(一)

在 IBM Bluemix 雲平臺上開發並部署您的下一個應用。

簡介

Apache Oozie 是用於 Hadoop 平臺的一種工作流排程引擎。該框架(如圖 1 所示)使用 Oozie 協調器促進了相互依賴的重複工作之間的協調,您可以使用預定的時間或資料可用性來觸發 Apache Oozie。您可以使用 Oozie bundle 系統提交或維護一組協調應用程式。作為本練習的一部分,Oozie 運行了一個 Apache Sqoop 作業,以便在 MySQL 資料庫中的資料上執行匯入操作,並將資料傳輸到 Hadoop 分散式檔案系統 (HDFS) 中。可以利用匯入的資料集執行 Sqoop 合併操作,從而更新較舊的資料集。通過利用 UNIX shell 操作,可從 MySQL 資料庫中提取用來執行 Sqoop 作業的元資料。同理,可執行 Java 操作來更新 Sqoop 作業所需的 MySQL 資料庫中的元資料。

InfoSphere BigInsights Quick Start Edition

InfoSphere BigInsights Quick Start Edition 是一個免費的、可下載的 InfoSphere BigInsights 版本,是 IBM 基於 Hadoop 的產品。使用 Quick Start Edition,您可以嘗試使用 IBM 開發的特性來提高開源 Hadoop 的價值,這些特性包括 Big SQL、文字分析和 BigSheets。為了讓您的體驗儘可能順利,我們提供了引導式學習,包括一些按部就班的、自定進度的教程和視訊,它們可以幫助您開始讓 Hadoop 為您工作。沒有時間或資料的限制,您可以自行安排時間在大量資料上進行試驗。

觀看視訊遵循這些教程 (PDF) 並 立刻下載 BigInsights Quick Start Edition

圖 1. Oozie 編排架構
MySQL 負責輸入,Oozie 負責處理,HDFS 負責獲取結果

需要安裝的軟體

要想充分利用本文的示例,訪問以下軟體可能對您有所幫助:

該叢集是一個分散式叢集,在 1 個主名稱節點、2 個核心節點和 8 個任務節點中執行。

Oozie 工作流

Oozie 工作流是控制依賴有向非迴圈圖 (DAG) 中安排的 Oozie 操作的集合。控制依賴(Control dependency)可確保以下操作在前面的操作已成功完成後才會啟動。本文首先會簡要概述工作流控制節點,然後重點介紹以下工作流操作節點:

工作流控制節點

啟動控制節點(如清單 1 所示)是工作流作業的入口點。在工作流啟動時,它會自動過渡到啟動過程中指定的節點。

清單 1. 啟動控制節點
<workflow-app xmlns="uri:oozie:workflow:0.2" name="ooziedemo-wf">
    <start to="timeCheck"/>
</workflow-app>

末端控制節點(如清單 2 所示)是結束工作流作業時所用的節點。它表示工作流操作已經成功完成。一個工作流定義必須有一個末端節點。

清單 2. 末端控制節點
<workflow-app xmlns="uri:oozie:workflow:0.2" name="ooziedemo-wf">
    <end name="end"/>
</workflow-app>

停止控制節點(如清單 3 所示)可使工作流作業自行停止。在到達停止節點(kill node)時,如果工作流作業啟動的一個或更多操作正在執行,那麼當前執行的所有操作都將停止。工作流定義可以包含零個或更多停止節點。

清單 3. 停止控制節點
<workflow-app xmlns="uri:oozie:workflow:0.2" name="ooziedemo-wf">
    <kill name="fail">
        <message>Sqoop failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
   </kill>
</workflow-app>

決策控制節點(如清單 4 所示)可使工作流確定要採用的執行路徑。決策節點的工作原理類似於擁有一組謂詞轉換對(predicates-transition pair)和一個預設轉換的 switch-case 塊。謂詞是按順序進行評估的,直至其中一個評估為 ture 為止,同時還會進行相應的轉換。如果沒有一個謂詞被評估為 true,則會採用預設轉換。

清單 4. 決策控制節點
<workflow-app xmlns="uri:oozie:workflow:0.2" name="ooziedemo-wf">
    <decision name="master-decision">
       <switch>
         <case to="sqoopMerge1">
                 ${wf:actionData('hiveSwitch')['paramNum'] eq 1}
         </case>
         <default to="sqoopMerge2"/>
       </switch>
   </decision>
</workflow-app>

分支節點 將一個執行路徑分為多個併發路徑。聯接節點 一直等待,直到前面的分支節點的所有併發執行路徑都到達聯接節點為止。您必須成對使用分叉節點和聯接節點,如清單 5 所示。

清單 5. 分支-聯接控制節點
<workflow-app xmlns="uri:oozie:workflow:0.2" name="ooziedemo-wf">
    <fork name="forking">
        <path start="sqoopMerge1"/>
        <path start="sqoopMerge2"/>
    </fork>
    <join name="joining" to="hiveSwitch"/>
</workflow-app>

Oozie shell 操作

您可以將 Oozie shell 操作作為工作流的一部分進行配置,從而執行某個檔案中的一組 shell 指令碼。您可以利用包含必要引數的 job-trackername-node 和 exec 元素來配置 Oozie shell 操作,從而執行相關的任務,如清單 6 所示。您可以將具有配置引數的一個配置 shell 操作來建立或刪除 HDFS 上的檔案和目錄,然後啟動 shell 作業。您可以使用與配置元素內聯的 job-xml 元素,利用配置引數將一個 XML 檔案傳遞給 shell 作業。您可以配置其他檔案或歸檔檔案,讓它們可用於 shell 作業。在 shell 作業結束後,您可以讓 shell 作業的輸出可供workflow 作業使用,但它需要滿足以下條件:

  • 輸出的格式必須是一個有效的 Java 屬性檔案。
  • 輸出的大小必須小於 2KB。
清單 6. Shell 指令碼
host="XXX.XX.XX.XXX"
port="3306"
username="root"
password=""
database="zzz"
tableName="$1"

####################################
echo "Host: $host"
echo "Database: $database"
echo "Table: $tableName"
####################################

sqoopLstUpd=`mysql --host=$host --port=$port --user=$username --password=$password 
-N -e 'SELECT PARM_DATE_VAL from T_CONTROL_PARM where PARM_NM="SQOOP_INCR_LST_UPD"
 and PARM_GROUP_NM="'$tableName'"' $database`

echo "sqoopLstUpd=$sqoopLstUpd"
echo "tableName=$tableName"

清單 7 展示了 workflow.xml 檔案中的 shell 操作配置。

清單 7. Oozie shell 操作
<action name="timeCheck">
    <shell xmlns="uri:oozie:shell-action:0.1">
       <job-tracker>${jobTracker}</job-tracker>
       <name-node>${nameNode}</name-node>
       <configuration>
           <property>
               <name>mapred.job.queue.name</name>
               <value>${queueName}</value>
           </property>
       </configuration>
       <exec>${sqoopUpdTrack}</exec>
       <argument>${tableName}</argument>
       <file>${sqoopUpdTrackPath}#${sqoopUpdTrack}</file>
       <capture-output/>
    </shell>
    <ok to="sqoopIncrImport"/>
    <error to="fail"/>
</action>

要想訪問 shell 輸出,可以使用清單 8 中所示的 Sqoop 增量作業。

清單 8. 用來實現增量匯入的 Oozie Sqoop 操作
<action name="sqoopIncrImport">
        <sqoop xmlns="uri:oozie:sqoop-action:0.2">
           <job-tracker>${jobTracker}</job-tracker>
           <name-node>${nameNode}</name-node>
           <prepare>
               <delete path="${s3BucketLoc}/${tableName}/incr"/>
               <mkdir path="${s3BucketLoc}/${tableName}"/>
           </prepare>
           <configuration>
               <property>
                   <name>mapred.job.queue.name</name>
                   <value>${queueName}</value>
               </property>
           </configuration>
           <arg>import</arg>
           <arg>--connect</arg>
           <arg>${dbURL}</arg>
           <arg>--driver</arg>
           <arg>${mySqlDriver}</arg>
           <arg>--username</arg>
           <arg>${user}</arg>
           <arg>--table</arg>
           <arg>${wf:actionData('timeCheck')['tableName']}</arg>
           <arg>--target-dir</arg>
           <arg>${s3BucketLoc}/${tableName}/incr</arg>
           <arg>--check-column</arg>
           <arg>LAST_UPD</arg>
           <arg>--incremental</arg>
           <arg>lastmodified</arg>
           <arg>--last-value</arg>
           <arg>${wf:actionData('timeCheck')['sqoopLstUpd']}</arg>
           <arg>--m</arg>
           <arg>1</arg>
       </sqoop>
       <ok to="sqoopMetaUpdate"/>
       <error to="fail"/>
   </action>

Oozie Java 操作

Java 操作運行了指定的主要 Java 類的 public static void main (String [] args) 方法。Java 應用程式作為具有單個 mapper 任務的 MapReduce 作業執行在 Hadoop 叢集上。工作流作業一直要等到 Java 操作結束執行之後才能繼續執行下一個操作。Java 操作可使用 job-trackername-node、Java 主類、JVM 選項和輸入引數進行配置,如清單 9 所示。您可以使用 Expression Language (EL) 表示式將引數分配給內聯屬性值。您必須以 Java 屬性檔案的格式寫入所有輸出引數。

您可以配置 Java 操作來清理 HDFS 檔案和目錄,或者建立 Apache HCatalog 分割槽,然後再啟動 Java 應用程式。這使得 Oozie 能夠在出現暫時性或非暫時性故障時重試 Java 操作。

清單 9. Oozie Java 操作
<action name="sqoopMetaUpdate">
          <java>
               <job-tracker>${jobTracker}</job-tracker>
               <name-node>${nameNode}</name-node>
               <configuration>
                   <property>
                      <name>mapred.job.queue.name</name>
                      <value>${queueName}</value>
                   </property>
               </configuration>
               <main-class>SqoopMetaUtil</main-class>
               <java-opts></java-opts>
               <arg>${tableName}</arg>
               <archive>${mySqlDriverPath}</archive>
          </java>
          <ok to="hiveSwitch"/>
          <error to="fail"/>
</action>

您可以通過使用 capture-output 來配置 Java 操作,從而將值傳遞給下一操作。您可以通過使用 Hadoop EL 函式來訪問這些值。您能夠以 Java 屬性檔案的格式在 Java 類中寫入值,如清單 10 所示。

清單 10. 用於實現值傳遞的 Java 程式碼片段
String OOZIE_ACTION_OUTPUT_PROPERTIES = "oozie.action.output.properties";
String oozieProp = System.getProperty(OOZIE_ACTION_OUTPUT_PROPERTIES);
       OutputStream os = null;
       if(oozieProp != null){
          File propFile = new File(oozieProp);
          Properties p = new Properties();
          p.setProperty("name", "Autodesk");
          p.setProperty("address", "Sun Rafael");
          try {
               os = new FileOutputStream(propFile);
               p.store(os, "");
          } catch (FileNotFoundException e) {
               System.err.println("<<< FileNotFoundException >>>"+e.getMessage());
          } catch (IOException e) {
               System.err.println("<<< IOException >>>"+e.getMessage());
          }
          finally{
               if(os != null)
               try {
                    os.close();
               } catch (IOException e) {
                    System.err.println("<<< IOException >>>"+e.getMessage());
               }
          }
       }
       else{
            throw new RuntimeException(OOZIE_ACTION_OUTPUT_PROPERTIES
                    + " System property not defined");
    }

您可以在 workflow.xml 檔案中配置操作來訪問屬性檔案中的相應值設定,如清單 11 所示。

清單 11. 用於實現值傳遞的 Oozie Java 操作
<action name="jProperties">
      <java>
           <job-tracker>${jobTracker}</job-tracker>
           <name-node>${nameNode}</name-node>
           <configuration>
               <property>
                  <name>mapred.job.queue.name</name>
                  <value>${queueName}</value>
               </property>
           </configuration>
           <main-class>PropertyExplorer</main-class>
           <java-opts></java-opts>
           <capture-output/>
      </java>
      <ok to="email"/>
      <error to="fail"/>
   </action>

   <action name="email">
         <email xmlns="uri:oozie:email-action:0.1">
            <to>[email protected]</to>
            <subject>Oozie workflow finished successfully!</subject>
            <body>${wf:actionData('jProperties')['name']} | 
            ${wf:actionData('jProperties')['address']}</body>
         </email>
         <ok to="end"/>
         <error to="fail"/>
   </action>

Oozie Sqoop 操作

Oozie 工作流觸發了一個 Sqoop 指令碼,該指令碼在 Hadoop 叢集上啟動了一個 Sqoop 作業。Sqoop 作業通過在 Hadoop 叢集上啟動 MapReduce 作業來完成任務。Sqoop 指令碼啟動的 MapReduce 作業會將資料從 RDBMS 傳輸到 HDFS。您可以配置一個 Sqoop 操作(如清單 12 所示)來刪除 HDFS 上的檔案和目錄,然後再啟動 Sqoop 作業。與其他 Oozie 操作類似,您可以通過使用 job-xml 元素,利用其他的屬性來配置 Sqoop 操作。configuration 元素中指定的屬性值將會覆蓋 job-xml 元素中指定的屬性。可以將其他檔案和歸檔檔案提供給 Sqoop 作業。

清單 12. 用於合併的 Oozie Sqoop 操作
<action name="sqoopMerge1">
         <sqoop xmlns="uri:oozie:sqoop-action:0.2">
             <job-tracker>${jobTracker}</job-tracker>
             <name-node>${nameNode}</name-node>
             <prepare>
                 <delete path="${s3BucketLoc}/${tableName}/master1"/>
                 <mkdir path="${s3BucketLoc}/${tableName}"/>
             </prepare>
             <configuration>
                 <property>
                     <name>mapred.job.queue.name</name>
                     <value>${queueName}</value>
                 </property>
             </configuration>
             <arg>merge</arg>
             <arg>--new-data</arg>
             <arg>${s3incr}</arg>
             <arg>--onto</arg>
             <arg>${s3BucketLoc}/${tableName}/master2</arg>
             <arg>--target-dir</arg>
             <arg>${s3BucketLoc}/${tableName}/master1</arg>
             <arg>--jar-file</arg>
             <arg>${tableJarLoc}/${tableName}.jar</arg>
             <arg>--class-name</arg>
             <arg>${tableName}</arg>
             <arg>--merge-key</arg>
             <arg>ROW_ID</arg>
         </sqoop>
         <ok to="hive-master1"/>
         <error to="fail"/>
   </action>

Oozie Hive 操作

您可以配置 Hive 操作(如清單 13 所示)來執行 HDFS 上的檔案和目錄中的任何 Hive 指令碼。該操作啟動了一個 MapReduce 作業來完成這些任務。您需要在 Oozie 上配置 Hive 操作,使用 Hive 配置檔案 hive-default.xml 或 hive-site.xml 作為 job-xml 元素。對於支援 Hive 訪問 Oozie 環境而言,這一點是必需的。您可以配置 Hive 操作來建立或刪除 HDFS 檔案和目錄,然後再啟動 Hive 作業。configuration 元素中指定的屬性值將會覆蓋 job-xml 檔案中指定的值。您可以新增其他的檔案和歸檔檔案,讓它們可用於 Hive 作業。Oozie 執行了由指令碼元素中的路徑指定的 Hive 指令碼。您可以通過 Oozie 工作流,將引數作為輸入引數分配給 Hive 指令碼。

清單 13. Oozie Hive 操作
<action name="hiveSwitch">
     <shell xmlns="uri:oozie:shell-action:0.1">
         <job-tracker>${jobTracker}</job-tracker>
         <name-node>${nameNode}</name-node>
          <configuration>
          <property>
              <name>mapred.job.queue.name</name>
              <value>${queueName}</value>
          </property>
          </configuration>
          <exec>${hiveSwitchScript}</exec>
          <argument>${tableName}</argument>
          <file>${hiveSwitchScriptPath}#${hiveSwitchScript}</file>
       <capture-output/>
       </shell>
       <ok to="master-decision"/>
       <error to="fail"/>
   </action>

Oozie 電子郵件操作

Oozie 電子郵件操作(如清單 14 所示)可以從工作流應用程式傳送電子郵件。除了主題和訊息正文之外,電子郵件操作還必須擁有 to 和 cc(可選)地址。您可以使用逗號分割的電子郵件地址向多個收件人傳送電子郵件。電子郵件操作可同步執行,而工作流作業一直要等到傳送電子郵件之後才會觸發下一個操作。您可以使用 Hadoop EL 表示式將引數分配給電子郵件操作。

清單 14. Oozie 電子郵件操作
<action name="email">
  	<email xmlns="uri:oozie:email-action:0.1">
            <to>[email protected]</to>
            <subject>Oozie workflow finished successfully!</subject>
            <body>${wf:actionData('jProperties')['name']} | 
            ${wf:actionData('jProperties')['address']}</body>
        </email>
        <ok to="end"/>
        <error to="fail"/>
   </action>

結束語

當多個相互依賴的作業與資料流捆綁在一起的時候,Oozie 工作流就會變成一個數據管道應用程式。Apache Oozie 工作流促進了資料邏輯流程的設計、錯誤處理、故障轉移機制等。您可以配置 Oozie 協調器或捆綁的應用程式來有效地管理工作流,不過,關於這些主題的討論已超出了本文的討論範圍。一些等效的 Hadoop 工作流引擎包括 Amazon Data Pipeline、Simple Workflow Engine、Azkaban、Cascading 和 Hamake。雖然 Hamake 和 Oozie 是基於 XML 的配置,但 Azkaban 是使用包含鍵值對的文字檔案進行配置的,而 Cascading 是使用 Java API 進行配置的。

http://www.ibm.com/developerworks/cn/data/library/bd-hadoopoozie/

相關推薦

HadoopOozie 工作管理引擎實際應用

在 IBM Bluemix 雲平臺上開發並部署您的下一個應用。 簡介 Apache Oozie 是用於 Hadoop 平臺的一種工作流排程引擎。該框架(如圖 1 所示)使用 Oozie 協調器促進了相互依賴的重複工作之間的協調,您可以使用預定的時間或資料可用性來觸發 Apache Oozie。您

工作Activiti5.13學習筆記

engines github上 mysql .get 字母 ash 官網 fat 發布 了解工作流 1、工作流(Workflow),就是“業務過程的部分或整體在計算機應用環境下的自動化”,它主要解決的是“使在多個參與者之間按照某種預定義

javascript常用設計模式介紹,實現及實際應用

javascript設計模式介紹,實現及實際應用(一) 本文將介紹javascript中常用的設計模式原理和實現,並結合例項講解其應用。 本篇文章先介紹單例模式,策略模式,代理模式,釋出訂閱模式和命令模式,其它幾種模式後續文章將繼續介紹。 1、單例模式 單例模式

python的簡單實際應用

初步需求:將一整個資料夾各個子目錄中所有目標檔案的所有對應元素的相同屬性名稱的不同欄位全部取出寫到excel表格以供下一步使用。 整體思路:(1)遞迴出所有目標檔案的地址                  (2)根據地址取出目標檔案的內容                   (3)匹配取出相同屬性的各個欄位 

oozie 工作排程引擎總結()

oozie是服務於hadoop生態系統的工作流排程工具,job執行平臺是區別於其他排程工具的最大的不同。但其實現的思路跟一般排程工具幾乎完全相同。 首先是作為排程系統兩大核心:依賴和觸發。依賴可以是條件依賴,比如,資源依賴,依賴於某些資料檔案的存在,也可以是任務依賴,比如

學習JBPM 工作引擎 安裝外掛

什麼是jbpm ?  工作流開源框架  jBpm是一個靈活可擴充套件的工作流管理系統。作為 jBpm執行時server輸入的業務流程使用簡單強大的語言表達並打包在流程檔案中。jBmp將工作流應用開發的便利性和傑出的企業應用整合(EAI)能力結合了起來。jBmp包括一個Web

工作引擎Activiti系列——初識

1、介紹     幾乎任何一個公司的軟體開發都會涉及到流程,以往我們可能是這麼實現的:業務表新增標誌位標識流程的節點狀態,關聯批註表實現稽核意見,根據一些業務資料分析處理邏輯,分配任務到使用者,節點的排程,審批等.....這其實是很繁瑣的,且不說開發起來比較混亂,維護起來更

工作Activiti的學習總結Activiti自動執行的應用

  工作流activiti自動執行任務的開發應用  工作流模擬某公司請假流程情景如下:    1.開發人員請假流程,如果開發人員請假,如果請假天數小於3天,組長批准,人事批准即可請假。    2.如果請假大約三天,需要專案經理或者產品經理批准,並且專案總監批

git 在windows下的應用 - 本地倉庫代碼管理

軟件研發 研發管理 訪問https://gitforwindows.org/? 下載一個安裝包,一路next下去git config --global user.name "Pcdog" git config --global user.email "[email protected]"cd d:

IO應用:實現檔案的複製

package com.bjpowernode.demo03; import java.io.FileReader; import java.io.FileWriter; import java.io.IOException; /** 使用FileReader/FileW

Java IO應用

IO流 IO流概述及FileWriter類的使用 FileReader類使用 緩衝流介紹和使用 IO流相關案例 NO.one IO流概述及FileWriter類使用   1.1 IO流概述及分類    IO流用來處理裝置之間的資料傳輸  

小飛魚通達二開 OA工作列印次數統計控制程式圖文

每個工作流工作列印了多少次了,在OA裡不知道,如何能夠控制呢,今天小飛魚帶給大家的就是這個工作量列印次數統計控制程式,使用起來是不是會方便很多。 可以查詢列印日誌明細。 列印頁面上多了一個列印按鈕和流水號、列印次數的資訊。   點選列印按鈕後,彈出列印

Mahout推薦演算法的實際應用

為Wikipedia的連結關係做推薦 資料量:130,160,392 links from 5,706,070 articles, to 3,773,865 無評分值(連結關係僅表示相關所以可以使用LogLikelihoodSimilarity) 因為分散式推薦系統(map

Storm概念、原理詳解及其應用BaseStorm

when 結構 tails 並發數 vm 虛擬機 cif 異步 優勢 name 本文借鑒官文,添加了一些解釋和看法,其中有些理解,寫的比較粗糙,有問題的地方希望大家指出。寫這篇文章,是想把一些官文和資料中基礎、重點拿出來,能總結出便於大家理解的話語。與大多數“wordc

WCF學習——構建一個簡單的WCF應用

frame port app 位置 協定 enc splay msdn ons 本文的WCF服務應用功能很簡單,卻涵蓋了一個完整WCF應用的基本結構。希望本文能對那些準備開始學習WCF的初學者提供一些幫助。 在這個例子中,我們將實現一個簡單的計算器和傳統的分布式通信框架一樣

vuex實踐之路——筆記本應用

time 中大 -- this 隔離 思想 一個表 環境搭建 一定的 首先使用vue-cli把環境搭建好。 介紹一下應用的界面。 App.vue根組件,就是整個應用的最外層 Toolbar.vue:最左邊紅色的區域,包括三個按鈕,添加、收藏、刪除。 NoteList.vu

OPENCV----在APP性能測試中的應用

核心 color frame pan ems span urn sqrt || 應用項目: APP的性能測試 應用場景: APP啟動速度 視頻開播速度 加載速度 等~~ 緣來: 基於APP日誌和UiAutomator的測試方案,測試結果不能直白且精確的

談談源碼管理那點事兒——源碼管理十誡

我不 evel .html 文件夾 jetbrains enable thum XML 構建 引言: 若是還有能夠毫無偏見地涉及各個編程語言。比源碼管理軟件更必要的工具。我倒是非常想見識一下。源碼管理軟件是我們工作的必備工具,是很多開發團隊的血液。那為什麽我們都

活動目錄的綜合應用

windows 侯良金 活動 一、活動目錄相關概念1、使用活動目錄的優點:集中管理、便捷的訪問網絡資源、可擴展性。2、域的概念:活動目錄的一種實現形式,也是活動目錄最核心的管理單位。3、域控制器:就是安裝了活動目錄服務的一臺計算機。活動目錄的數據都儲存在域控制器內!4、名稱空間:是一個區域的名字,

java內存管理與GC機制

大於 一個棧 es2017 記錄 高速 工作 限制 fin 不存在 計算機cpu運轉速度越來越快,硬盤遠遠跟不上cpu的讀寫速度,就設計可內存。隨著cpu的發展,內存的讀寫速度也跟不上cpu處理速度,就在每顆cpu上加入了高速緩存。在多處理器系統中,每個處理