1. 程式人生 > >airflow-api 外掛的安裝與使用介紹

airflow-api 外掛的安裝與使用介紹

1. 安裝外掛airflow-rest-api

1)獲取

wget https://github.com/teamclairvoyant/airflow-rest-api-plugin/archive/master.zip

2)將plugin資料夾下的內容放入airflow/plugin/下,若不存在則新建

3)重啟airflow

2. api使用介紹

2.1 獲取airflow版本

GET - http://{HOST}:{PORT}/admin/rest_api/api?api=version

2.2 獲取airflow-api-plugin的版本

GET - http://{HOST}

:{PORT}/admin/rest_api/api?api=rest_api_plugin_version

2.3 渲染任務例項模板

GET - http://{HOST}:{PORT}/admin/rest_api/api?api=render

引數:
1)dag_id
2)task_id   任務id
3)execution_date    dag的執行日期,例如:2017-01-02T03:04:05
4)subdir(可選)    從中查詢檔案位置或目錄

2.4 操作變數

GET - http://{HOST}:{PORT}/admin/rest_api/api?api=variables

引數(均為可選引數):
1)set:設定變數,key value
2)get:獲取變數
3)json:布林值,反序列化json
4)default:如果變數不存在,則返回預設值
5)import:從json檔案匯入變數
6)export:從json檔案匯出變數
7)delete:刪除變數

2.5 操作連線

GET - http://{HOST}:{PORT}/admin/rest_api/api?api=connections

引數(均為可選引數):
1)list:布林型,列出所有連線
2)add:增加連線
3)delete:布林型,刪除連線
4)conn_id:新增/刪除連線所需要的連線id
5)conn_id:新增連線所需要的連線url
6)conn_extra:連線的額外欄位,新增連線時可選引數

2.6 暫停DAG

GET - http://{HOST}:{PORT}/admin/rest_api/api?api=pause

引數:
1)dag_id
2)subdir:可選引數,從中查詢dag的檔案位置或者目錄

2.7 繼續dag

GET - http://{HOST}:{PORT}/admin/rest_api/api?api=unpause

引數:
1)dag_id
2)subdir:可選引數,從中查詢dag的檔案位置或者目錄

2.8 排程任務失敗的依賴

GET - http://{HOST}:{PORT}/admin/rest_api/api?api=task_failed_deps

引數:
1)dag_id
2)task_id
3)execution_date
4)subdir:可選

2.9 觸發dag執行

GET - http://{HOST}:{PORT}/admin/rest_api/api?api=trigger_dag

引數:
1)dag_id
2)run_id:可選引數,用於dag執行的runid
3)conf:可選引數,傳遞給觸發dag執行的一些配置,(URL Encoded JSON)以{"key":"value"}的形式

2.10 測試任務例項,但是不檢查依賴或者在資料庫中記錄

GET - http://{HOST}:{PORT}/admin/rest_api/api?api=test

引數:
1)dag_id
2)task_id
3)execution_date
4)subdir:可選引數
5)dryrun:可選引數,布林型,是否進行幹執行
6)task_params:可選引數,將json字典傳給task

2.11 獲取dag執行狀態

GET - http://{HOST}:{PORT}/admin/rest_api/api?api=dag_state

引數:
1)dag_id
2)execution_date
3)subdir:可選引數

2.12 執行單個task例項

GET - http://{HOST}:{PORT}/admin/rest_api/api?api=run

引數:
1)dag_id
2)task_id
3)execution_date
4)subdir:可選引數
5)mark_success:可選引數,布林型,不執行jobs但是標記為成功
6)force:可選引數,布林型,忽略先前的執行狀態,無論任務是否成功都重新執行
7)pool:可選引數,資源池
8)cfg_path:可選引數,使用配置檔案的路徑而不用原有的airflow.cfg
9)local:可選引數,布林值,使用localexecute執行任務
10)ignore_all_dependencies:可選引數,布林值,忽略所有的非關鍵依賴項,包括ignore_ti_state和ignore_task_depsstore_true
11)ignore_dependencies:可選,布林值,忽略特定於任務的依賴關係,例如upstream, depends_on_past還有retry delay dependencies
12)ignore_depends_on_past:可選,布林值,忽略depends_on_past依賴項,但是尊重上游
13)ship_dag:可選,布林值,序列化dag併發送給worker
14)pickle:可選:整個dag的序列化pickle物件

2.13 某個dag的所有tasks

GET - http://{HOST}:{PORT}/admin/rest_api/api?api=list_tasks

引數:
1)dag_id
2)tree:可選引數,布林型
3)subdir:可選引數

2.14 在制定範圍內執行dag的子部分

GET - http://{HOST}:{PORT}/admin/rest_api/api?api=backfill

引數:
1)dag_id
2)task_regex:可選引數,用於過濾特定的task_ids以進行回填正則表示式
3)start_date:覆蓋start_date YYYY-MM-DD
4)end_date:可選引數,同上覆蓋
5)mark_success:可選引數,布林型,不執行jobs但是標記為成功
6)local:可選引數,布林型,使用LocalExecutor執行任務
7)donot_pickle:可選引數,不進行嘗試將dag物件發給工作人員,只告訴他們dag的執行版本
8)include_adhoc:可選引數,布林值,包含帶有adhoc引數的dags
9)ignore_dependencies:可選引數,布林值,忽略特定於任務的依賴關係,例如 upstream, depends_on_past, and retry delay dependencies
10)ignore_first_depends_on_past:可選引數,布林值,僅忽略第一組任務的depends_on_past依賴關係
11)subdir:可選引數,
12)pool:可選引數,資源池
13)dry_run:可選引數,布林值,是否空執行

2.15 列出所有的dags

GET - http://{HOST}:{PORT}/admin/rest_api/api?api=list_dags

引數: 
1)subdir:可選引數
2)report:可選引數,布林型,顯示dagbag的載入報告

2.16 啟動kerberos票證續訂

GET - http://{HOST}:{PORT}/admin/rest_api/api?api=kerberos

引數(均為可選引數):
1)principal
2)keytab
3)pid:pid檔案位置
4)daemon:布林值,是否為守護程序
5)stdout:將stdout重定向到檔案
6)stderr:將錯誤重定向到檔案
7)log-file:日誌檔案

2.17 啟動celery工作節點(worker)

GET - http://{HOST}:{PORT}/admin/rest_api/api?api=worker

引數(均為可選引數):
1)do_pickle:布林型,嘗試picklg DAG物件傳送給worker,而不是讓worker執行他們的程式碼版本
2)queues:逗號分隔的佇列列表
3)concurrency:工作程序數
4)pid:pid檔案位置
5)daemon:布林值,是否為守護程序
6)stdout:將stdout重定向到檔案
7)stderr:將錯誤重定向到檔案
8)log-file:日誌檔案

2.18 啟動celery工作節點(flower)

GET - http://{HOST}:{PORT}/admin/rest_api/api?api=flower

引數(均為可選引數):
1)hostname:設定執行伺服器的主機名
2)port:執行伺服器的埠
3)flower_conf:flower的配置檔案
4)broker_api
5)pid:pid檔案位置
6)daemon:布林值,是否為守護程序
7)stdout:將stdout重定向到檔案
8)stderr:將錯誤重定向到檔案
9)log-file:日誌檔案

2.19 啟動排程程式例項

GET - http://{HOST}:{PORT}/admin/rest_api/api?api=scheduler

引數(均為可選引數):
1)dag_id
2)subdir:可選引數,從中查詢dag的檔案位置或者目錄
3)run-duration:設定退出前執行的秒數
4)num_runs:設定退出前執行的次數
5)do_pickle:布林型,嘗試pickle DAG物件傳送給worker,而不是讓worker執行他們的程式碼版本
6)pid:pid檔案位置
7)daemon:布林值,是否為守護程序
8)stdout:將stdout重定向到檔案
9)stderr:將錯誤重定向到檔案
10)log-file:日誌檔案

2.20 獲取task的執行狀態

GET - http://{HOST}:{PORT}/admin/rest_api/api?api=task_state

引數:
1)dag_id
2)task_id
3)execution_date
4)subdir:可選

2.21 CRUD 對pool的操作

GET - http://{HOST}:{PORT}/admin/rest_api/api?api=pool

引數:
1)set:分別設定池插槽計數和描述。 表單中的預期輸入:NAME SLOT_COUNT POOL_DESCRIPTION
2)get:獲取池資訊
3)delete:刪除池

2.22 由worker生成的日誌

GET - http://{HOST}:{PORT}/admin/rest_api/api?api=serve_logs

引數:無

2.23 清除一組任務例項

GET - http://{HOST}:{PORT}/admin/rest_api/api?api=clear

引數:
1)dag_id
2)task_regex:可選引數,用於過濾特定的task_ids以進行回填正則表示式
3)start_date:覆蓋start_date YYYY-MM-DD
4)end_date:可選引數,同上覆蓋
5)subdir:可選引數
6)upstream:可選,布林型,包含上游任務
7)downstream:可選,布林型,包含下游任務
8)no_confirm:可選,布林型,不要求確認
9)only_failed:可選,布林型,僅失敗的作業
10)only_running:可選,布林型,僅執行的作業
11)exclude_subdags:可選,布林型,排除子標記

2.24 釋出一個新的dag

POST - http://{HOST}:{PORT}/admin/rest_api/api?api=deploy_dag

post引數:
1)dag_file:file-上傳部署的py檔案
2)force:可選,布林型,檔案已存在,是否強制上傳
3)pause:可選,布林型,在建立時強制暫停dag並覆蓋'dags_are_paused_at_creation'配置
4)unpause(可選) - 布林值 -  DAG在建立時將被強制取消暫停,並覆蓋'dags_are_paused_at_creation'配置

2.25 重新整理一個dag

GET - http://{HOST}:{PORT}/admin/rest_api/api?api=refresh_dag

引數: 
1)dag_id

3. api響應引數:

airflow_cmd  - 字串 - 在本地計算機上執行Airflow CLI命令
arguments  -  Dict  - 包含您傳入的引數的字典及其值
post_arguments  -  Dict  - 包含您傳入的帖子身體引數及其值的字典
call_time  - 時間戳 - 伺服器接收請求的時間
output  -  String  - 呼叫CLI函式的文字輸出
response_time  - 時間戳 - 伺服器發回響應的時間
status  - 字串 - 響應呼叫的狀態。 (可能的值:OK,ERROR)
warning  -  String  - 從API發回的警告訊息
http_response_code  - 整數 -  HTTP響應程式碼