第一章 Airflow基本原理
阿新 • • 發佈:2021-06-28
一、Airflow簡介
airflow是Airbnb開源的一個用python編寫的排程工具,專案於2014年啟動,2015年春季開源,2016年加入Apache軟體基金會的孵化計劃,使用Python編寫實現的任務管理、排程、監控工作流平臺。
Airflow 是基於DAG(有向無環圖)的任務管理系統,可以簡單理解為是高階版的crontab,但是它解決了crontab無法解決的任務依賴問題。與crontab相比Airflow可以方便檢視任務的執行狀況(執行是否成功、執行時間、執行依 賴等),可追蹤任務歷史執行情況,任務執行失敗時可以收到郵件通知,檢視錯誤日誌。
二、Airflow使用場景
在實際專案中,我們經常遇到以下場景: 1.運維人員,定時對伺服器執行指令碼某些指令碼,最簡單的方式是新增一些crond任務,但如果想追溯各個任務的執行結果時? 2.在大資料場景下,每隔一段時間需匯出線上資料、匯入到大資料平臺、觸發資料處理等多個子操作,且各個子操作含有依賴關係時? 3.在管理大量主機時,想要一個統一的作業管理平臺,能在上面定義各種任務來管理下面的裝置? airflow通過DAG配置檔案,能輕鬆定義各種任務及任務之間的依賴關係和排程執行,並一個視覺化的操作web介面。
三、Airflow優勢
#1.自帶web管理介面,易上手;
#2.業務程式碼和排程程式碼完全解耦;
#3.通過python程式碼定義子任務,並支援各種Operate操作器,靈活性大,能滿足使用者的各種需求;
#4.python開源專案,支援擴充套件operate等外掛,便於二次開發;
#5.類似的工具有akzban,quart等;
四、Airflow基本架構
在一個可擴充套件的生產環境中,Airflow 含有以下元件: #1.元資料庫: 這個資料庫儲存有關任務狀態的資訊。 #2.排程器: Scheduler 是一種使用 DAG 定義結合元資料中的任務狀態來決定哪些任務需要被執行以及任務執行優先順序的過程。排程器通常作為服務執行。 #3.執行器: Executor 是一個訊息佇列程序,它被繫結到排程器中,用於確定實際執行每個任務計劃的工作程序。有不同型別的執行器,每個執行器都使用一個指定工作程序的類來執行任務。 最關鍵的執行器就有四種選擇: SequentialExecutor:單程序順序執行任務,預設執行器,通常只用於測試 LocalExecutor:多程序本地執行任務,使用與排程器程序在同一臺機器上執行的並行程序執行任務 CeleryExecutor:分散式排程,生產常用,使用存在於獨立的工作機器叢集中的工作程序執行任務 DaskExecutor :動態任務排程,主要用於資料分析 #4.Workers: 這些是實際執行任務邏輯的程序,由正在使用的執行器確定。
五、Airflow 基本概念
1.DAG
DAG(Directed Acyclic Graph)是Airflow的核心概念之一,DAG體現的是你的工作流,它由Python指令碼定義,其中包含了你想要執行的一系列task,同時其中還定義了這些task的依賴關係。 DAG代表了一個供排程的工作流,它的主要配置項包括owner,schedule。DAG支援多種排程方式,你可以指定該DAG定時排程,如每天的5am,也可以指定它週期性排程,如每二十分鐘排程一次。 DAG由其中的task組成。例如,一個簡單的DAG可以包括三個任務:A,B,C。我們可以讓B依賴於A,在A成功執行之後執行B,而C可以在任意時候執行。需要注意的是,DAG本身並不關注A,B,C三個任務的具體內容,它關注的是三個任務的執行順序以及依賴條件。
2.DAG Run
DAG run是指DAG的例項,DAG run通常由Airflow的scheduler建立,在特定時間執行,幷包含了DAG中定義的task的例項。
3.execution_date
execution_date是logical datetime,是DAG指定執行的時間,它可以是過去或將來的某個時間儘管DAG實際上是正在執行的。
4.Task
Task是DAG中定義工作的基本單位,它的地位等同於工作流中的一個節點。Task和Operator是一枚硬幣的正反兩面,Task代表工作的抽象概念,Operator定義了對應Task要做的具體任務。 同一個DAG中的Task之間一般有先後順序和依賴關係,考慮如下程式碼:
with DAG('my_dag', start_date=datetime(2016, 1, 1)) as dag:
task_1 = DummyOperator('task_1')
task_2 = DummyOperator('task_2')
task_1 >> task_2 # Define dependencies
這段程式碼定義了一個包含兩個task的DAG,task_1是task_2的上游,而task_2依賴於task_1,當DAG run被建立時,task_1首先執行,task_2在task_1成功執行後才會執行。
5.Task Instance
Task Instance是Task的例項,其關係等同於DAG和DAG run之間的關係。
6.Task的生命週期
Task的生命週期有一下八種:
Task的正常流程包括一下幾個階段
No status (scheduler 建立了空的task例項)
Scheduled (scheduler對該task例項進行了排程)
Queued (scheduler將task例項傳給executor,放入執行佇列)
Running (task開始執行)
Success (task成功結束)
7.Operators
DAG定義了一個工作流如何執行,而Operator定義了一個task執行的具體任務,是Airflow中編寫具體任務的類。Operator包括很多種類,BashOperator用來執行Bash命令,PythonOperator可以執行Python函式,MySOperator可以操作MySQL資料庫執行相關操作,當然你也可以從BaseOperator中繼承並開發自己的Operator。
8.Scheduler
Scheduler監控所有的task和DAG,同時觸發依賴已經滿足的task。Scheduler在後端開啟子程序,和DAG資料夾同步,並週期性(可配置時長)收集DAG的解析結果來找到可以條件滿足的task。Scheduler會將可以執行的task交給我們配置好的executor執行。 Scheduler是Airflow環境中的頂層服務,簡單地在命令列執行airflow scheduler可以開啟。
9.Executor
Executor是task的執行器,它有多種配置方式,如SequetialExecutor序列執行task(適用於開發環境,是預設的配置),LocalExecutor可以在本地併發執行task,CeleryExecutor可以分散式地執行task。
六、Airflow常用命令
4.1,常用命令
$ airflow webserver -D 守護程序執行webserver
$ airflow scheduler -D 守護程序執行排程器
$ airflow worker -D 守護程序執行排程器
$ airflow worker -c 1 -D 守護程序執行celery worker並指定任務併發數為1
$ airflow pause dag_id 暫停任務
$ airflow unpause dag_id 取消暫停,等同於在管理介面開啟off按鈕
$ airflow list_tasks dag_id 檢視task列表
$ airflow clear dag_id 清空任務例項
$ airflow trigger_dag dag_id -r RUN_ID -e EXEC_DATE 執行整個dag檔案
$ airflow run dag_id task_id execution_date 執行task
七、Airflow與同類產品的對比
系統名稱 | 介紹 |
---|---|
Apache Oozie | 使用XML配置, Oozie任務的資原始檔都必須存放在HDFS上. 配置不方便同時也只能用於Hadoop. |
Linkedin Azkaban | web介面尤其很贊, 使用java properties檔案維護任務依賴關係, 任務資原始檔需要打包成zip, 部署不是很方便. |
Airflow | 具有自己的web任務管理介面,dag任務建立通過python程式碼,可以保證其靈活性和適應性 |