airflow-api 外掛的安裝與使用介紹
1. 安裝外掛airflow-rest-api
1)獲取
wget https://github.com/teamclairvoyant/airflow-rest-api-plugin/archive/master.zip
2)將plugin資料夾下的內容放入airflow/plugin/下,若不存在則新建
3)重啟airflow
2. api使用介紹
2.1 獲取airflow版本
GET - http://{HOST}:{PORT}/admin/rest_api/api?api=version
2.2 獲取airflow-api-plugin的版本
GET - http://{HOST}
:{PORT}/admin/rest_api/api?api=rest_api_plugin_version
2.3 渲染任務例項模板
GET - http://{HOST}:{PORT}/admin/rest_api/api?api=render
引數:
1)dag_id
2)task_id 任務id
3)execution_date dag的執行日期,例如:2017-01-02T03:04:05
4)subdir(可選) 從中查詢檔案位置或目錄
2.4 操作變數
GET - http://{HOST}:{PORT}/admin/rest_api/api?api=variables
引數(均為可選引數):
1)set:設定變數,key value
2)get:獲取變數
3)json:布林值,反序列化json
4)default:如果變數不存在,則返回預設值
5)import:從json檔案匯入變數
6)export:從json檔案匯出變數
7)delete:刪除變數
2.5 操作連線
GET - http://{HOST}:{PORT}/admin/rest_api/api?api=connections
引數(均為可選引數): 1)list:布林型,列出所有連線 2)add:增加連線 3)delete:布林型,刪除連線 4)conn_id:新增/刪除連線所需要的連線id 5)conn_id:新增連線所需要的連線url 6)conn_extra:連線的額外欄位,新增連線時可選引數
2.6 暫停DAG
GET - http://{HOST}:{PORT}/admin/rest_api/api?api=pause
引數:
1)dag_id
2)subdir:可選引數,從中查詢dag的檔案位置或者目錄
2.7 繼續dag
GET - http://{HOST}:{PORT}/admin/rest_api/api?api=unpause
引數:
1)dag_id
2)subdir:可選引數,從中查詢dag的檔案位置或者目錄
2.8 排程任務失敗的依賴
GET - http://{HOST}:{PORT}/admin/rest_api/api?api=task_failed_deps
引數:
1)dag_id
2)task_id
3)execution_date
4)subdir:可選
2.9 觸發dag執行
GET - http://{HOST}:{PORT}/admin/rest_api/api?api=trigger_dag
引數:
1)dag_id
2)run_id:可選引數,用於dag執行的runid
3)conf:可選引數,傳遞給觸發dag執行的一些配置,(URL Encoded JSON)以{"key":"value"}的形式
2.10 測試任務例項,但是不檢查依賴或者在資料庫中記錄
GET - http://{HOST}:{PORT}/admin/rest_api/api?api=test
引數:
1)dag_id
2)task_id
3)execution_date
4)subdir:可選引數
5)dryrun:可選引數,布林型,是否進行幹執行
6)task_params:可選引數,將json字典傳給task
2.11 獲取dag執行狀態
GET - http://{HOST}:{PORT}/admin/rest_api/api?api=dag_state
引數:
1)dag_id
2)execution_date
3)subdir:可選引數
2.12 執行單個task例項
GET - http://{HOST}:{PORT}/admin/rest_api/api?api=run
引數:
1)dag_id
2)task_id
3)execution_date
4)subdir:可選引數
5)mark_success:可選引數,布林型,不執行jobs但是標記為成功
6)force:可選引數,布林型,忽略先前的執行狀態,無論任務是否成功都重新執行
7)pool:可選引數,資源池
8)cfg_path:可選引數,使用配置檔案的路徑而不用原有的airflow.cfg
9)local:可選引數,布林值,使用localexecute執行任務
10)ignore_all_dependencies:可選引數,布林值,忽略所有的非關鍵依賴項,包括ignore_ti_state和ignore_task_depsstore_true
11)ignore_dependencies:可選,布林值,忽略特定於任務的依賴關係,例如upstream, depends_on_past還有retry delay dependencies
12)ignore_depends_on_past:可選,布林值,忽略depends_on_past依賴項,但是尊重上游
13)ship_dag:可選,布林值,序列化dag併發送給worker
14)pickle:可選:整個dag的序列化pickle物件
2.13 某個dag的所有tasks
GET - http://{HOST}:{PORT}/admin/rest_api/api?api=list_tasks
引數:
1)dag_id
2)tree:可選引數,布林型
3)subdir:可選引數
2.14 在制定範圍內執行dag的子部分
GET - http://{HOST}:{PORT}/admin/rest_api/api?api=backfill
引數:
1)dag_id
2)task_regex:可選引數,用於過濾特定的task_ids以進行回填正則表示式
3)start_date:覆蓋start_date YYYY-MM-DD
4)end_date:可選引數,同上覆蓋
5)mark_success:可選引數,布林型,不執行jobs但是標記為成功
6)local:可選引數,布林型,使用LocalExecutor執行任務
7)donot_pickle:可選引數,不進行嘗試將dag物件發給工作人員,只告訴他們dag的執行版本
8)include_adhoc:可選引數,布林值,包含帶有adhoc引數的dags
9)ignore_dependencies:可選引數,布林值,忽略特定於任務的依賴關係,例如 upstream, depends_on_past, and retry delay dependencies
10)ignore_first_depends_on_past:可選引數,布林值,僅忽略第一組任務的depends_on_past依賴關係
11)subdir:可選引數,
12)pool:可選引數,資源池
13)dry_run:可選引數,布林值,是否空執行
2.15 列出所有的dags
GET - http://{HOST}:{PORT}/admin/rest_api/api?api=list_dags
引數:
1)subdir:可選引數
2)report:可選引數,布林型,顯示dagbag的載入報告
2.16 啟動kerberos票證續訂
GET - http://{HOST}:{PORT}/admin/rest_api/api?api=kerberos
引數(均為可選引數):
1)principal
2)keytab
3)pid:pid檔案位置
4)daemon:布林值,是否為守護程序
5)stdout:將stdout重定向到檔案
6)stderr:將錯誤重定向到檔案
7)log-file:日誌檔案
2.17 啟動celery工作節點(worker)
GET - http://{HOST}:{PORT}/admin/rest_api/api?api=worker
引數(均為可選引數):
1)do_pickle:布林型,嘗試picklg DAG物件傳送給worker,而不是讓worker執行他們的程式碼版本
2)queues:逗號分隔的佇列列表
3)concurrency:工作程序數
4)pid:pid檔案位置
5)daemon:布林值,是否為守護程序
6)stdout:將stdout重定向到檔案
7)stderr:將錯誤重定向到檔案
8)log-file:日誌檔案
2.18 啟動celery工作節點(flower)
GET - http://{HOST}:{PORT}/admin/rest_api/api?api=flower
引數(均為可選引數):
1)hostname:設定執行伺服器的主機名
2)port:執行伺服器的埠
3)flower_conf:flower的配置檔案
4)broker_api
5)pid:pid檔案位置
6)daemon:布林值,是否為守護程序
7)stdout:將stdout重定向到檔案
8)stderr:將錯誤重定向到檔案
9)log-file:日誌檔案
2.19 啟動排程程式例項
GET - http://{HOST}:{PORT}/admin/rest_api/api?api=scheduler
引數(均為可選引數):
1)dag_id
2)subdir:可選引數,從中查詢dag的檔案位置或者目錄
3)run-duration:設定退出前執行的秒數
4)num_runs:設定退出前執行的次數
5)do_pickle:布林型,嘗試pickle DAG物件傳送給worker,而不是讓worker執行他們的程式碼版本
6)pid:pid檔案位置
7)daemon:布林值,是否為守護程序
8)stdout:將stdout重定向到檔案
9)stderr:將錯誤重定向到檔案
10)log-file:日誌檔案
2.20 獲取task的執行狀態
GET - http://{HOST}:{PORT}/admin/rest_api/api?api=task_state
引數:
1)dag_id
2)task_id
3)execution_date
4)subdir:可選
2.21 CRUD 對pool的操作
GET - http://{HOST}:{PORT}/admin/rest_api/api?api=pool
引數:
1)set:分別設定池插槽計數和描述。 表單中的預期輸入:NAME SLOT_COUNT POOL_DESCRIPTION
2)get:獲取池資訊
3)delete:刪除池
2.22 由worker生成的日誌
GET - http://{HOST}:{PORT}/admin/rest_api/api?api=serve_logs
引數:無
2.23 清除一組任務例項
GET - http://{HOST}:{PORT}/admin/rest_api/api?api=clear
引數:
1)dag_id
2)task_regex:可選引數,用於過濾特定的task_ids以進行回填正則表示式
3)start_date:覆蓋start_date YYYY-MM-DD
4)end_date:可選引數,同上覆蓋
5)subdir:可選引數
6)upstream:可選,布林型,包含上游任務
7)downstream:可選,布林型,包含下游任務
8)no_confirm:可選,布林型,不要求確認
9)only_failed:可選,布林型,僅失敗的作業
10)only_running:可選,布林型,僅執行的作業
11)exclude_subdags:可選,布林型,排除子標記
2.24 釋出一個新的dag
POST - http://{HOST}:{PORT}/admin/rest_api/api?api=deploy_dag
post引數:
1)dag_file:file-上傳部署的py檔案
2)force:可選,布林型,檔案已存在,是否強制上傳
3)pause:可選,布林型,在建立時強制暫停dag並覆蓋'dags_are_paused_at_creation'配置
4)unpause(可選) - 布林值 - DAG在建立時將被強制取消暫停,並覆蓋'dags_are_paused_at_creation'配置
2.25 重新整理一個dag
GET - http://{HOST}:{PORT}/admin/rest_api/api?api=refresh_dag
引數:
1)dag_id
3. api響應引數:
airflow_cmd - 字串 - 在本地計算機上執行Airflow CLI命令
arguments - Dict - 包含您傳入的引數的字典及其值
post_arguments - Dict - 包含您傳入的帖子身體引數及其值的字典
call_time - 時間戳 - 伺服器接收請求的時間
output - String - 呼叫CLI函式的文字輸出
response_time - 時間戳 - 伺服器發回響應的時間
status - 字串 - 響應呼叫的狀態。 (可能的值:OK,ERROR)
warning - String - 從API發回的警告訊息
http_response_code - 整數 - HTTP響應程式碼