1. 程式人生 > >使用Airflow來排程Data Lake Analytics的任務

使用Airflow來排程Data Lake Analytics的任務

開發十年,就只剩下這套架構體系了! >>>   

今天我們來介紹一下使用Airflow來排程 Data Lake Analytics(後面簡稱DLA)的任務執行。DLA作為一個數據湖的解決方案,
客戶有每天週期性的排程一些任務從DLA查詢資料迴流到業務系統的需求。因為DLA相容
MySQL的協議,因此所有支援MySQL的協議的排程框架都天然支援DLA,今天就來介紹一下使用業界著名的

Apache Airflow 來排程DLA的作業。

大致步驟如下:

  1. 購買一個ECS用來執行Airflow
  2. 安裝Airflow
  3. 新增DLA的DB Connection
  4. 開發任務指令碼

購買ECS並進行配置

購買ECS的詳細流程這裡就不一一羅列了,非常的簡單,按照官方的購買流程可以分分鐘完成,需要注意的幾點這裡說一下:

  • 購買的ECS的Region要和你的資料所在Region(其實也就是你開通DLA的 Region 保持一致)。
  • 購買的ECS需要開通外網訪問許可權,因為Airflow的一些網頁控制檯需要通過外網來訪問。
  • ECS購買好之後記得在安全組裡面放行入方向的80埠,因為下面要安裝的Airflow有web頁面,我們需要通過80埠進行訪問,如下圖:

 

 

同時記錄下這個ECS的外網地址:

 

 

安裝Airflow

Airflow是一個Python寫的軟體,因此我們是通過Python的Package Manager:pip來安裝的,因為我們要使用MySQL(而不是預設的SQLite) 來作為Airflow的元資料庫, 因此我們還要安裝MySQL相關的包:

# 安裝Airflow本身
sudo pip install apache-airflow[mysql]

# 安裝MySQL相關的依賴
sudo apt-get install mysql-sever
sudo apt-get install libmysqlclient-dev
sudo pip install mysql-python

預設安裝的MySQL有一個配置需要調整:

# /etc/mysql/mysql.conf.d/mysqld.cnf
[mysqld]
explicit_defaults_for_timestamp = 1

修改完成之後重啟MySQL:

root@hello:~/airflow/dags# /etc/init.d/mysql restart
[ ok ] Restarting mysql (via systemctl): mysql.service.

Airflow 安裝完成之後會在你的本地使用者目錄下產生 ~/airflow 目錄, 它裡面的內容大致如下:

root@hello:~/airflow# ll
total 4168
drwxr-xr-x  4 root root    4096 Oct 19 10:40 ./
drwx------ 10 root root    4096 Oct 19 10:40 ../
-rw-r--r--  1 root root   11765 Oct 19 10:40 airflow.cfg
drwxr-xr-x  2 root root    4096 Oct 18 19:32 dags/
drwxr-xr-x  6 root root    4096 Oct 18 17:52 logs/
-rw-r--r--  1 root root    1509 Oct 18 11:38 unittests.cfg

其中 airflow.cfg 是 Airflow叢集的配置檔案,各種配置都是在這裡改的,dags 目錄儲存我們寫的任務,後面我們要寫的任務都是放在這個資料夾裡面。

初始化Airflow元資料庫

前面我們已經安裝了 MySQL 資料庫,現在我們來建立一個數據庫給Airflow來儲存元資料:

$ mysql \
    -uroot \
    -proot \
    -e "CREATE DATABASE airflow
        DEFAULT CHARACTER SET utf8
        DEFAULT COLLATE utf8_general_ci;

        GRANT ALL PRIVILEGES
        ON airflow.*
        TO 'airflow'@'localhost'
        IDENTIFIED BY 'airflow';

        FLUSH PRIVILEGES;"
        
$ airflow initdb

到之類為止,元資料庫就初始化好了。

安裝 Dask

Airflow本身是一個排程工具,任務的具體執行是交給一個叫做Executor的概念來做的,預設配置的executor是 SequentialExecutor, 不適合生產環境使用,分散式的Executor有 Celery 和 Dask, 但是筆者嘗試過 Celery 之後發現坑有點多,這裡推薦使用 Dask:

安裝Dask:

pip install dask

執行 dask scheduler:

# default settings for a local cluster
DASK_HOST=127.0.0.1
DASK_PORT=8786

dask-scheduler --host $DASK_HOST --port $DASK_PORT

執行 dask worker:

dask-worker $DASK_HOST:$DASK_PORT

配置 airflow.cfg

因為使用的不是預設的配置:我們選擇了使用MySQL來作為元資料庫,使用Dask來執行任務,因此需要對配置檔案: ~/airflow/airflow.cfg 進行修改:

[core]
# 使用Dask來執行任務
executor = DaskExecutor
# 元資料庫的連線方式
sql_alchemy_conn = mysql://airflow:airflow@localhost:3306/airflow

[dask]
# Dask的排程地址
cluster_address = 127.0.0.1:8786

啟動

到這裡位置所有準備工作做完了,我們可以啟動Airflow了,我們需要啟動 Airflow 的三個模組:

webserver: 用來承載Airflow的管理控制頁面:

airflow webserver -p 80 -D

scheduler: 任務排程器, 它會監控 ~/airflow/dags 下面我們定義的任務檔案的變化,這樣我們才能通過管理控制檯及時看到我們新開發的任務:

airflow scheduler -D

worker: 跟Dask進行互動真正執行任務的:

airflow worker -D

如果一切順利的話,一個Airflow的叢集就已經Ready了,可以在上面執行任務了。預設安裝裡面已經一些示例的任務, 瀏覽器裡面輸入 http://<你ECS的外網IP> 就可以看到Airflow的控制頁面了:

 

 

開發我們自己的任務

我們的目的是要用Airflow來排程DLA的任務,首先我們要新增一個連線串, Airflow裡面通過Connection來儲存連線串的具體資訊, 開啟頁面: http://<你ECS的外網IP>/admin/connection/ 你會看到如下的頁面:

 

 

我們新增一下DLA的連線資訊:

 

 

這裡比較重要的兩個點:

  1. 連線型別選擇: MySQL (DLA相容MySQL的協議)
  2. Conn Id很關鍵,後面我們任務裡面是通過這個Conn Id來訪問資料來源的。

開發我們的任務程式碼

from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.hooks.mysql_hook import MySqlHook

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'dlademo', default_args=default_args, schedule_interval=timedelta(1))

t1 = BashOperator(
    task_id='print_date',
    bash_command='echo hello-airflow',
    dag=dag)

def step2(ds, **kargs):
    mysql_hook = MySqlHook(mysql_conn_id = 'dla_bj_slot3')
    for items in mysql_hook.get_records("select * from tpch_1x.nation_text_date limit 20"):
        print items

t2 = PythonOperator(
    task_id='execute_dla_sql',
    provide_context=True,
    python_callable=step2,
    dag=dag)

t2.set_upstream(t1)

這個任務裡面定義了一個DAG, 一個DAG表示一個任務流程,一個流程裡面會執行有依賴關係的多個任務,DAG的第一個引數是DAG的名字, 這裡我們叫 dlademo ,它的第三個引數是排程的週期,這裡是每天排程一次: timedelta(1) 。

第一個任務是執行一個bash命令: echo hello-airflow, 第二個任務則是我們的SQL任務,這裡寫的比較簡單,通過SQL把DLA資料庫裡面的一張表查詢並打印出來,最後 t2.set_upstream(t1)設定兩個任務之間的依賴關係。

現在我們開啟 http://<你的ECS公網IP>/admin/airflow/tree?dag_id=dlademo 就可以看到這個任務的詳情了:

 

 

在這個圖中我們可以看到我們定義的兩個任務,以及它們之間的依賴關係。Airflow的功能非常的豐富,更多的功能就留給大家自己去體驗了。

總結

Airflow是Apache的頂級專案,從專案的成熟度和功能的豐富度來說都很不錯,入門也很簡單,很容易就可以搭建自己的叢集,並且它有自己的Connection機制,使得我們不需要把資料庫的使用者名稱密碼暴露在任務腳本里面,使用DLA的同學們可以試試Airflow來排程自己的任務。

 

原文連結

本文為雲棲社群原創內容,未經