使用Airflow來排程Data Lake Analytics的任務
今天我們來介紹一下使用Airflow來排程 Data Lake Analytics(後面簡稱DLA)的任務執行。DLA作為一個數據湖的解決方案,
客戶有每天週期性的排程一些任務從DLA查詢資料迴流到業務系統的需求。因為DLA相容
MySQL的協議,因此所有支援MySQL的協議的排程框架都天然支援DLA,今天就來介紹一下使用業界著名的
大致步驟如下:
- 購買一個ECS用來執行Airflow
- 安裝Airflow
- 新增DLA的DB Connection
- 開發任務指令碼
購買ECS並進行配置
購買ECS的詳細流程這裡就不一一羅列了,非常的簡單,按照官方的購買流程可以分分鐘完成,需要注意的幾點這裡說一下:
- 購買的ECS的Region要和你的資料所在Region(其實也就是你開通DLA的 Region 保持一致)。
- 購買的ECS需要開通外網訪問許可權,因為Airflow的一些網頁控制檯需要通過外網來訪問。
- ECS購買好之後記得在安全組裡面放行入方向的80埠,因為下面要安裝的Airflow有web頁面,我們需要通過80埠進行訪問,如下圖:
同時記錄下這個ECS的外網地址:
安裝Airflow
Airflow是一個Python寫的軟體,因此我們是通過Python的Package Manager:pip來安裝的,因為我們要使用MySQL(而不是預設的SQLite) 來作為Airflow的元資料庫, 因此我們還要安裝MySQL相關的包:
# 安裝Airflow本身 sudo pip install apache-airflow[mysql] # 安裝MySQL相關的依賴 sudo apt-get install mysql-sever sudo apt-get install libmysqlclient-dev sudo pip install mysql-python
預設安裝的MySQL有一個配置需要調整:
# /etc/mysql/mysql.conf.d/mysqld.cnf
[mysqld]
explicit_defaults_for_timestamp = 1
修改完成之後重啟MySQL:
root@hello:~/airflow/dags# /etc/init.d/mysql restart
[ ok ] Restarting mysql (via systemctl): mysql.service.
Airflow 安裝完成之後會在你的本地使用者目錄下產生 ~/airflow
目錄, 它裡面的內容大致如下:
root@hello:~/airflow# ll
total 4168
drwxr-xr-x 4 root root 4096 Oct 19 10:40 ./
drwx------ 10 root root 4096 Oct 19 10:40 ../
-rw-r--r-- 1 root root 11765 Oct 19 10:40 airflow.cfg
drwxr-xr-x 2 root root 4096 Oct 18 19:32 dags/
drwxr-xr-x 6 root root 4096 Oct 18 17:52 logs/
-rw-r--r-- 1 root root 1509 Oct 18 11:38 unittests.cfg
其中 airflow.cfg
是 Airflow叢集的配置檔案,各種配置都是在這裡改的,dags
目錄儲存我們寫的任務,後面我們要寫的任務都是放在這個資料夾裡面。
初始化Airflow元資料庫
前面我們已經安裝了 MySQL 資料庫,現在我們來建立一個數據庫給Airflow來儲存元資料:
$ mysql \
-uroot \
-proot \
-e "CREATE DATABASE airflow
DEFAULT CHARACTER SET utf8
DEFAULT COLLATE utf8_general_ci;
GRANT ALL PRIVILEGES
ON airflow.*
TO 'airflow'@'localhost'
IDENTIFIED BY 'airflow';
FLUSH PRIVILEGES;"
$ airflow initdb
到之類為止,元資料庫就初始化好了。
安裝 Dask
Airflow本身是一個排程工具,任務的具體執行是交給一個叫做Executor的概念來做的,預設配置的executor是 SequentialExecutor
, 不適合生產環境使用,分散式的Executor有 Celery
和 Dask
, 但是筆者嘗試過 Celery
之後發現坑有點多,這裡推薦使用 Dask:
安裝Dask:
pip install dask
執行 dask scheduler:
# default settings for a local cluster
DASK_HOST=127.0.0.1
DASK_PORT=8786
dask-scheduler --host $DASK_HOST --port $DASK_PORT
執行 dask worker:
dask-worker $DASK_HOST:$DASK_PORT
配置 airflow.cfg
因為使用的不是預設的配置:我們選擇了使用MySQL來作為元資料庫,使用Dask來執行任務,因此需要對配置檔案: ~/airflow/airflow.cfg
進行修改:
[core]
# 使用Dask來執行任務
executor = DaskExecutor
# 元資料庫的連線方式
sql_alchemy_conn = mysql://airflow:airflow@localhost:3306/airflow
[dask]
# Dask的排程地址
cluster_address = 127.0.0.1:8786
啟動
到這裡位置所有準備工作做完了,我們可以啟動Airflow了,我們需要啟動 Airflow 的三個模組:
webserver: 用來承載Airflow的管理控制頁面:
airflow webserver -p 80 -D
scheduler: 任務排程器, 它會監控 ~/airflow/dags
下面我們定義的任務檔案的變化,這樣我們才能通過管理控制檯及時看到我們新開發的任務:
airflow scheduler -D
worker: 跟Dask進行互動真正執行任務的:
airflow worker -D
如果一切順利的話,一個Airflow的叢集就已經Ready了,可以在上面執行任務了。預設安裝裡面已經一些示例的任務, 瀏覽器裡面輸入 http://<你ECS的外網IP>
就可以看到Airflow的控制頁面了:
開發我們自己的任務
我們的目的是要用Airflow來排程DLA的任務,首先我們要新增一個連線串, Airflow裡面通過Connection來儲存連線串的具體資訊, 開啟頁面: http://<你ECS的外網IP>/admin/connection/
你會看到如下的頁面:
我們新增一下DLA的連線資訊:
這裡比較重要的兩個點:
- 連線型別選擇: MySQL (DLA相容MySQL的協議)
- Conn Id很關鍵,後面我們任務裡面是通過這個Conn Id來訪問資料來源的。
開發我們的任務程式碼
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.hooks.mysql_hook import MySqlHook
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'dlademo', default_args=default_args, schedule_interval=timedelta(1))
t1 = BashOperator(
task_id='print_date',
bash_command='echo hello-airflow',
dag=dag)
def step2(ds, **kargs):
mysql_hook = MySqlHook(mysql_conn_id = 'dla_bj_slot3')
for items in mysql_hook.get_records("select * from tpch_1x.nation_text_date limit 20"):
print items
t2 = PythonOperator(
task_id='execute_dla_sql',
provide_context=True,
python_callable=step2,
dag=dag)
t2.set_upstream(t1)
這個任務裡面定義了一個DAG, 一個DAG表示一個任務流程,一個流程裡面會執行有依賴關係的多個任務,DAG的第一個引數是DAG的名字, 這裡我們叫 dlademo
,它的第三個引數是排程的週期,這裡是每天排程一次: timedelta(1)
。
第一個任務是執行一個bash命令: echo hello-airflow
, 第二個任務則是我們的SQL任務,這裡寫的比較簡單,通過SQL把DLA資料庫裡面的一張表查詢並打印出來,最後 t2.set_upstream(t1)
設定兩個任務之間的依賴關係。
現在我們開啟 http://<你的ECS公網IP>/admin/airflow/tree?dag_id=dlademo
就可以看到這個任務的詳情了:
在這個圖中我們可以看到我們定義的兩個任務,以及它們之間的依賴關係。Airflow的功能非常的豐富,更多的功能就留給大家自己去體驗了。
總結
Airflow是Apache的頂級專案,從專案的成熟度和功能的豐富度來說都很不錯,入門也很簡單,很容易就可以搭建自己的叢集,並且它有自己的Connection機制,使得我們不需要把資料庫的使用者名稱密碼暴露在任務腳本里面,使用DLA的同學們可以試試Airflow來排程自己的任務。
本文為雲棲社群原創內容,未經