5分鐘試作一個最簡單的airflow plugin « Terrence的宅宅幻想
因為想要讓我的使用者去可以更容易的跟既有的平臺做排程工作
最近想試著製作airflow的operator
airflow本身提供了operator, menu_link, hook等等的介面
照著官方網站,簡單試做了一個簡單版的myoperator plugin
要製作自己的plugin方法很單純
去plugin目錄下塞自己的python檔就好了
假設我做了一個plugin檔案$AIRFLOW_HOMR/plugins/my.py
內容如下
這邊重點是要製作一個class去繼承AirflowPlugin
他宣告了這個python檔要涵蓋哪些type的plugin跟這個plugin的名字
name這個屬性會影響之後要import時候的路徑,官方說明如下
因為我這邊只想做一個myoperator,所以只做了一個MyOperator去繼承BaseOperator
當實做一個Operator的時候有兩個要點
- 使用
@apply_defaults
去包住__init__
然後用super
去傳遞初始化引數 - 實作
execute
的內容
一個Operator可以覆寫的函式有
- pre_execute
- execute
- post_execute
- on_kill
可以照自己的需求去實作
之後只要重啟airflow的webserver/scheduler/worker等就行了
這邊再給一個範例dag $AIRFLOW_HOMR/dags/hello_world.py
這邊的重點就只有import的方式
我用from airflow.operators.myplugin import MyOperator
去import自己做的MyOperator
當初一直不知道怎麼import才正確,因為一些古早範例都是寫from airflow.operators import xxxx
讓我卡了一小段時間
.....
題外話之前把airflow從1.8升級到1.10之後遇到timezone被鎖死在UTC的狀況
現階段只能用降版回1.8的方式解決
這幾天airflow試用下來他的好處我還沒體會到他的坑倒是踩了不少
現在想想自己還是習慣用Azkaban
但是客戶想用airflow,也只能硬著頭皮去踩坑