1. 程式人生 > >靈活可擴充套件的工作流管理平臺Airflow

靈活可擴充套件的工作流管理平臺Airflow

1. 引言

Airflow是Airbnb開源的一個用Python寫就的工作流管理平臺(workflow management platform)。在前一篇文章中,介紹瞭如何用Crontab管理資料流,但是缺點也是顯而易見。針對於Crontab的缺點,靈活可擴充套件的Airflow具有以下特點:

  • 工作流依賴關係的視覺化;
  • 日誌追蹤;
  • (Python指令碼)易於擴充套件

對比Java系的Oozie,Airflow奉行“Configuration as code”哲學,對於描述工作流、判斷觸發條件等全部採用Python,使得你編寫工作流就像在寫指令碼一樣;能debug工作流(test backfill命令),更好地判別是否有錯誤;能更快捷地在線上做功能擴充套件。Airflow充分利用Python的靈巧輕便,相比之下Oozie則顯得笨重厚拙太多(其實我沒在黑Java~~)。《

What makes Airflow great?》介紹了更多關於Airflow的優良特性;其他有關於安裝、介紹的文件在這裡還有這裡

下表給出Airflow(基於1.7版本)與Oozie(基於4.0版本)對比情況:

功能 Airflow Oozie
工作流描述 Python xml
資料觸發 Sensor datasets, input-events
工作流節點 operator action
完整工作流 DAG workflow
定期排程 DAG schedule_interval coordinator frequency
任務依賴 >>, <<
<ok to>
內建函式、變數 template macros EL function, EL constants

之前我曾提及Oozie沒有能力表達複雜的DAG,是因為Oozie只能指定下流依賴(downstream)而不能指定上流依賴(upstream)。與之相比,Airflow就能表示複雜的DAG。Airflow沒有像Oozie一樣區分workflow與coordinator,而是把觸發條件、工作流節點都看作一個operator,operator組成一個DAG。

2. 實戰

Airflow常見命令如下:

  • initdb,初始化元資料DB,元資料包括了DAG本身的資訊、執行資訊等;
  • resetdb,清空元資料DB;
  • list_dags,列出所有DAG;
  • list_tasks,列出某DAG的所有task;
  • test,測試某task的執行狀況;
  • backfill,測試某DAG在設定的日期區間的執行狀況;
  • webserver,開啟webserver服務;
  • scheduler,用於監控與觸發DAG。

下面將給出如何用Airflow完成data pipeline任務。

首先簡要地介紹下背景:定時(每週)檢查Hive表的partition的任務是否有生成,若有則觸發Hive任務寫Elasticsearch;然後等Hive任務完後,執行Python指令碼查詢Elasticsearch傳送報表。但是,Airflow對Python3支援有問題(依賴包為Python2編寫);因此不得不自己寫HivePartitionSensor

# -*- coding: utf-8 -*-
# @Time    : 2016/11/29
# @Author  : rain
from airflow.operators import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from impala.dbapi import connect
import logging


class HivePartitionSensor(BaseSensorOperator):
    """
    Waits for a partition to show up in Hive.

    :param host, port: the host and port of hiveserver2
    :param table: The name of the table to wait for, supports the dot notation (my_database.my_table)
    :type table: string
    :param partition: The partition clause to wait for. This is passed as
        is to the metastore Thrift client,and apparently supports SQL like
        notation as in ``ds='2016-12-01'``.
    :type partition: string
    """
    template_fields = ('table', 'partition',)
    ui_color = '#2b2d42'

    @apply_defaults
    def __init__(
            self,
            conn_host, conn_port,
            table, partition="ds='{{ ds }}'",
            poke_interval=60 * 3,
            *args, **kwargs):
        super(HivePartitionSensor, self).__init__(
            poke_interval=poke_interval, *args, **kwargs)
        if not partition:
            partition = "ds='{{ ds }}'"
        self.table = table
        self.partition = partition
        self.conn_host = conn_host
        self.conn_port = conn_port
        self.conn = connect(host=self.conn_host, port=self.conn_port, auth_mechanism='PLAIN')

    def poke(self, context):
        logging.info(
            'Poking for table {self.table}, '
            'partition {self.partition}'.format(**locals()))
        cursor = self.conn.cursor()
        cursor.execute("show partitions {}".format(self.table))
        partitions = cursor.fetchall()
        partitions = [i[0] for i in partitions]
        if self.partition in partitions:
            return True
        else:
            return False

Python3連線Hive server2的採用的是impyla模組,HivePartitionSensor用於判斷Hive表的partition是否存在。寫自定義的operator,有點像寫Hive、Pig的UDF;寫好的operator需要放在目錄~/airflow/dags,以便於DAG呼叫。那麼,完整的工作流DAG如下:

# tag cover analysis, based on Airflow v1.7.1.3
from airflow.operators import BashOperator
from operatorUD.HivePartitionSensor import HivePartitionSensor
from airflow.models import DAG

from datetime import datetime, timedelta
from impala.dbapi import connect

conn = connect(host='192.168.72.18', port=10000, auth_mechanism='PLAIN')


def latest_hive_partition(table):
    cursor = conn.cursor()
    cursor.execute("show partitions {}".format(table))
    partitions = cursor.fetchall()
    partitions = [i[0] for i in partitions]
    return partitions[-1].split("=")[1]


log_partition_value = """{{ macros.ds_add(ds, -2)}}"""
tag_partition_value = latest_hive_partition('tag.dmp')

args = {
    'owner': 'jyzheng',
    'depends_on_past': False,
    'start_date': datetime.strptime('2016-12-06', '%Y-%m-%d')
}

# execute every Tuesday
dag = DAG(
    dag_id='tag_cover', default_args=args,
    schedule_interval='@weekly',
    dagrun_timeout=timedelta(minutes=10))

ad_sensor = HivePartitionSensor(
    task_id='ad_sensor',
    conn_host='192.168.72.18',
    conn_port=10000,
    table='ad.ad_log',
    partition="day_time={}".format(log_partition_value),
    dag=dag
)

ad_hive_task = BashOperator(
    task_id='ad_hive_task',
    bash_command='hive -f /path/to/cron/cover/ad_tag.hql --hivevar LOG_PARTITION={} '
                 '--hivevar TAG_PARTITION={}'.format(log_partition_value, tag_partition_value),
    dag=dag
)

ad2_hive_task = BashOperator(
    task_id='ad2_hive_task',
    bash_command='hive -f /path/to/cron/cover/ad2_tag.hql --hivevar LOG_PARTITION={} '
                 '--hivevar TAG_PARTITION={}'.format(log_partition_value, tag_partition_value),
    dag=dag
)

report_task = BashOperator(
    task_id='report_task',
    bash_command='sleep 5m; python3 /path/to/cron/report/tag_cover.py {}'.format(log_partition_value),
    dag=dag
)

ad_sensor >> ad_hive_task >> report_task
ad_sensor >> ad2_hive_task >> report_task

相關推薦

靈活擴充套件工作管理平臺Airflow

1. 引言 Airflow是Airbnb開源的一個用Python寫就的工作流管理平臺(workflow management platform)。在前一篇文章中,介紹瞭如何用Crontab管理資料流,但是缺點也是顯而易見。針對於Crontab的缺點,靈活可擴充套件的Airflow具有以下特點: 工作流依賴關

Learun FrameWork,強大ASP.NET工作管理平臺

工作流 工作流原理 工作流原理:是針對工作中具體固定程序的常規活動而提出的一個概念,通過將過工作活動分解定義良好的任務、角色、規則號過程來進行執行和監控,達到提高生產組織水平和工作效率的目的,工作技術為企業更好地實現經營目標提供了先進的手段。 什麽是工作流? 工作流就是

learun.framework7.0-工作管理

工作流 經過近半年的研發,learun.framework7.0-Workflow工作流管理平臺已成功更新。 在研發期間,我將所有業余時間和精力完全投入到learun.framework7.0-Workflow工作流研發中,研發過程實屬不易,因為耗時耗腦

企業內部應用的核心與靈魂:工作管理系統

  工作流是企業內部系統的核心和靈魂,而審批則是工作流中的最基礎的應用場景。在公司管理和運轉中引入審批工作流,替代原本的紙質申請和審批,以期提高公司的運轉效率公司管理制度規範化系統留檔,便於追溯環保。   總結了在企業在實際業務中需求,根據客戶反饋,構建出

一個實現瀏覽器網頁與本地程式之間進行雙向呼叫的輕量級、強相容、擴充套件的外掛開發平臺—本網通

       通過本網通外掛平臺可實現在網頁中的JavaScript指令碼無障礙訪問本地電腦的硬體、呼叫本地系統的API及相關元件,同時可徹底解決ActiveX元件在Chrome、FireFox、Opera、Edge、Safari等瀏覽器各版本的相容使用問題。 系統相容性:

web工作管理系統開發之三 視覺化流程設計器

      在工作流管理系統中,引擎的所有的活動,驅動,和流轉,都是以流程定義為基礎而展開的。流程定義檔案是流程能執行的先決條件,同時流程定義檔案又是工作流引擎的設計基礎,引擎必須要能生成,解釋和獲取到任意流程定義節點的資訊。業務流程建模就是將一個具體的業務流程系統用流程定義檔案來描述。而生成這個流程定

Atitit 微服務的優點和拆分 目錄 1. 微服務架構五大優勢 崛起勢頭不可擋 4 1 1.1. 1、複雜度控 6避免“盲人摸象” 7 2 1.2. 2、靈活擴充套件 7 2 1.3. 3、獨立部

Atitit 微服務的優點和拆分 目錄 微服務架構五大優勢 崛起勢頭不可擋4 1、複雜度可控6避免“盲人摸象”7 2、靈活可擴充套件7 3、獨立部署7

Mobx-簡單擴充套件的狀態管理

mobx mobx,作用類似於redux,相比於reduxmobx的學習成本更低,開發難度低,開發程式碼少,渲染效能好(狀態和元件是一對一的,比如你又三個轉檯對應三個元件,如果元件狀態發生改變之後,只會處理受影響的元件,不受影響的不做處理) 核心思想 狀態變化引起的副作

工作管理——模型、方法和系統》筆記2:Petri網對工作建模

Web 是一個生長著的、開放的、動態的分散式系統。 Web 始於1989 年,當時英國科學家 Tim Berners-Lee 和比利時人 Robert Cailliau 在歐洲粒子物理研究所(European Organization for Nuclear Research

一個完整的工作管理系統成部分

    一個完整的工作流管理系統通常由工作流引擎、工作流設計器、流程操作、工作流客戶端程式、流程監控、表單設計器、與表單的整合以及與應用程式的整合八個部分組成。       1. 工作流引擎 工作

讀書筆記《工作管理-模型、方法和系統》-2、工作建模

1、每個案例(工作)應該擁有的要素:唯一標識、生命週期、狀態、案例資料、案例檔案、條件 2、條件是一個任務開始及完結的前提。條件決定了任務是否被執行以及執行的順序。 3、任務是工作的一個不可分割的邏輯單元,它必需被完整執行,或發生錯誤就需要回滾(rollback). 4

工作管理--模型、方法和系統

    本人專業電腦科學與技術,目前還是小渣渣。在老師的推薦下,讓我學習工作流管理這本書,所以就給大家介紹一點工作流管理的小知識。也是一些自己的理解。     工作流的基本目的就是為每次活動選擇合適的執行的型別。什麼是活動呢 ?活動就是資源的真整合,即每次案例藉助資源來執行

擴充套件的後臺管理系統框架

需求 對於公司內部業務系統,最基本的功能就是部門管理、使用者管理、角色管理、許可權管理、登陸功能。但是公司內部系統一般情況下,不止是一個(如果是一個那就是巨無霸,就面臨著被拆分,也會遇到我們後面要討論的問題)。那麼多個系統,面對上面的功能我們需要開發多套嗎?這

Trello--工作管理工具

因為最近一直在給某組織尋找一些工作工具,很自然就想到,比起目前的郵件組溝通模式,工作團隊需要個更快速、高效的工作流(work-flow)管理平臺。 但通過一段時間的觀察,發現了一個奇怪的趨勢——凡是帶有工作流管理性質的應用或專案,都會扯出一堆專案管理、內容管理、日程管理的複雜功能。這對我目前的需求是相悖的,

10.工作搭建平臺

工作平臺搭建  需求分析  平臺搭建  電商業務建模 需求分析  基於Spring Boot2與Activiti6.0搭建通用的工作流引擎平臺  支援流程定義檔案線上設計及部署和維護  支援自定義表單與流程定義的整合  滿足流程執行的使用者及許可權管理功能  管理角度的監控流程執行過

AWS Flow Framework工作框架_工作管理

AWS Flow Framework 是一組方便易用的庫的集合,有了這些庫,您可以更加輕鬆快速地使用 Amazon Simple Workflow 構建應用程式。使用 AWS Flow Framework,您可以編寫簡單的程式碼,讓框架中預先構建的物件和類來處理 Amazon

AWS Data Pipeline資料處理_資料驅動型工作管理系統

AWS Data Pipeline 是一種 Web 服務,可幫助您可靠地處理資料並以指定的間隔在不同 AWS 計算與儲存服務以及本地資料來源之間移動資料。利用 AWS Data Pipeline,您可以定期在您儲存資料的位置訪問資料,大規模轉換和處理資料,並高效地將結果傳

基於中臺思想的物流系統設計(五):設計擴充套件的產品服務平臺

1、引言 在前面四篇文章中,我們把物流中臺的基礎能力層構建了起來,接下來,我們就可以在這些基礎能力之上構建我們的產品服務,從而支撐各條業務線。 基礎能力層主要關注的是穩定可用的原子介面,因此在設計的時候重點關注了很多高併發高可用的技術。產品服務層主要是為了支撐不斷創新的業務,因此在設計的時候需要關注

SharePoint 2010工作解決方案之將SharePoint Designer重用工作

在 SharePoint Designer 中建立的工作流(即,宣告性工作流)由 XML 語句組成,而非由程式碼組成。 SharePoint Designer 2010 引入了可重用工作流,它們是可由 SharePoint 站點中的不同列表使用的可移植的宣告性工作流。 在 Visual Studio 201