1. 程式人生 > >AirFlow 如何傳引數給DAG

AirFlow 如何傳引數給DAG

題外話(可以直接跳過進入正題):

    最近需要使用airflow去排程任務,平常使用airflow都是排程的定時任務,這次不需要定時任務,只需要條件滿足時主動觸發airflow執行任務排程即可,同時將任務必要的外界引數傳給DAG。查了好多資料發現都沒有介紹如何將引數傳給DAG的,於是就去看airflow文件,找到了使用命令列可以將引數傳給DAG。但這種方式不適合我的任務需求,我需要呼叫airflow的API去觸發airflow執行,可是官方文件中這塊介紹的很少很少。也請教了一些人,得到的結果是airflow可能不支援自定義的引數傳入。那怎麼辦呢?難道只能改原始碼?不科學呀,命令列支援,API怎麼會不支援呢?於是本著不行就改原始碼的想法檢視,一路查下去最後發現API和命令列呼叫的是同一個方法(trigger_dag),既然是同一個方法,肯定支援啊,於是有重新看了一下文件和原始碼,終於找到了二者之間的聯絡,哈哈哈,搞定~~

目錄

1、通過API與AirFlow互動

AirFlow提供了兩種API互動方式:

(1)  GET方法獲取指定<DAG_ID>下的指定<TASK_ID>資訊

http://<WEBSERVER>:<PORT>/api/experimental/dags/<DAG_ID>/tasks/<TASK_ID>   #returns info for a task (GET).
  • 該介面返回的資訊主要包括:

{
    "adhoc": "False",
    "depends_on_past": "False",
    "email": "['
[email protected]
']", "email_on_failure": "True", "email_on_retry": "True", "end_date": "None", "execution_timeout": "None", "max_retry_delay": "None", "on_failure_callback": "None", "on_retry_callback": "None", "on_success_callback": "None", "op_args": "[]", "op_kwargs": "{}", "owner": "XXX", "params": "{}", "pool": "None", "priority_weight": "1", "provide_context": "True", "python_callable": "<function get_params at 0x7f2e156691b8>", "queue": "3708", "resources": "{'disk': {'_qty': 512, '_units_str': 'MB', '_name': 'Disk'}, 'gpus': {'_qty': 0, '_units_str': 'gpu(s)', '_name': 'GPU'}, 'ram': {'_qty': 512, '_units_str': 'MB', '_name': 'RAM'}, 'cpus': {'_qty': 1, '_units_str': 'core(s)', '_name': 'CPU'}}", "retries": "1", "retry_delay": "0:05:00", "retry_exponential_backoff": "False", "run_as_user": "XXX", "sla": "None", "start_date": "2018-07-09 00:00:00", "task_id": "get_params", "templates_dict": "None", "trigger_rule": "all_success", "wait_for_downstream": "False" }

(2)  POST方法執行指定的<DAG_ID>

http://<WEBSERVER>:<PORT>/api/experimental/dags/<DAG_ID>/dag_runs   #creates a dag_run for a given dag id (POST).
  • 傳入自定義引數給DAG

header: Content-Type:application/json

{
    "conf": 自定義引數(json格式字串)
}

注:可傳入的引數包括run_id、conf、execution_date

例如:

{
    "conf": "{\"id\":1, \"start_time\":\"2018-01-01 00:00:00\", \"end_time\":\"2018-10-31 23:59:59\"}"
}
  • 通過以上方式傳入的引數會存放在 ‘dag_run’的conf屬性中,因此在DAG中獲取引數方式:
def get_params(**kwargs):
    id=kwargs.get('dag_run').conf.get('id')
    start_time=kwargs.get('dag_run').conf.get('start_time')
    end_time=kwargs.get('dag_run').conf.get('end_time')

注:同一個DAG中的每個task只要傳入**kwargs,都可以通過該方式獲取

2、通過命令列啟動DAG時傳參

$ airflow trigger_dag <dag_id> -c <json格式的字串引數>

例如

$ airflow trigger_dag <dag_id> -c '{"id":1, "start_time":"2018-01-01 00:00:00", "end_time":"2018-10-31 23:59:59"}'

引數獲取方式同上

參考:

相關推薦

AirFlow 如何引數DAG

題外話(可以直接跳過進入正題):     最近需要使用airflow去排程任務,平常使用airflow都是排程的定時任務,這次不需要定時任務,只需要條件滿足時主動觸發airflow執行任務排程即可,同時將任務必要的外界引數傳給DAG。查了好多資料發現都沒有介紹如何將引數傳給

ajax非同步引數後臺

通過使用ajax技術,將客戶端資料傳遞給服務端,服務端處理後返回資料給客戶端進一步處理,客戶端瀏覽器不必重新整理整個頁面,只需要重新整理部分內容。function ajaxFun(){  //js函式    $.ajax({url : '../servlet/priceSer

Flask框架——檢視引數

如何實現資料互動(通過瀏覽器給介面傳入引數) 要想傳入引數,可以通過一個語法,這個語法就是<>(尖括號) from flask import Flask app = Flask(__name__) # 傳入引數:通過url地址給檢視函式傳遞引數 # 使用語法:&

C-函式陣列引數另一個函式,使用malloc

// // main.c // // // Copyright © 2018 [email protected] // #include <stdio.h> #include<stdio.h> void fun(int *array){ int sum=

C語言-陣列引數另一個函式,使用malloc

// // main.c // // // Created by myhaspl on 2018/10/26. // Copyright © 2018 [email protec

POST方法@RequestBody引數失敗原因分析

通過ajax給springMVC傳遞引數時,通過post方法傳遞json字串時常用的方式,這時後端應該通過@RequestBody註解配合springMVC中配置的訊息轉換器來進行json字串的解析。 因為post方法中的json字串通常是一個前端的json物件轉化而成的字串,所以後端@Requ

bash:如何function引數

給shell裡的函式(function)傳遞引數有2種方式: 第一種方式: 在function裡直接通過$1,$2,etc.來獲取引數: 例: 建立一個檔案“file.txt”在shell裡建立一個函式並呼叫: file.txt: chen china engineer

[makefile] 命令列make直譯器引數

例如: build: [tab] mvn clean package 原本是這樣既可以跑 UT又可以打不jar 包來,但後來UT總是不過。怎麼避免對檔案的修改呢? 本著這樣目的,修改如下: 1 maven.test.skip = false 2 build: 3

oracle 的SQL指令碼引數

SQL指令碼內容: conn &1/&2selectcount(*) from user_tables;     exit 呼叫: SQL> exit從Oracle9i Enterprise Edition Release 9.2.0.1.0 - P

java看看我是怎麼利用陣列Runnable執行緒引數的1

我的文章只給有耐心的人看,所以先寫程式碼 import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Sem

python檔案引數

python檔案內部接收引數用:sys.argv sys.argv[0]:是python指令碼名 sys.arg[1]及以後各項:是接收的引數 看下面的例子: (para.py) #! /bin/

$emit 子

ren turn v-on something som msg rom mouse emit parent.vue <template>  <div class="wapper"> <p>child tells me:{{childWo

HTML5+js頁面Java後臺的小技巧

頁面傳值 bsp 發送 設置 這樣的 htm html5 分享 .com 頁面傳值小技巧

react navigation上一頁面

reac 組件 .get value cto oba info state getpara 使用新的導航組件react navigation,傳值方式略微發生了一些改變 A頁面到B頁面 pushaddremark(){ let _this=this;

vue父組件字組件

類型 type ima 數據 src nbsp div http vue 轉自https://www.cnblogs.com/padding1015/p/7878710.html 父組件通過綁定 傳入 數據的名稱 值 子組件接收 type為數據類型 vue父

vue中子組件父組件

ima bubuko TP vue nbsp 技術分享 技術 組件 src index.js 子組件 父組件 vue中子組件傳值給父組件

【轉】Vue組件一-父組件子組件

字符 container 實例 pro += 掌握 follow ofo https Vue組件一-父組件傳值給子組件 開始 Vue組件是學習Vue框架最比較難的部分,而這部分難點我認為可以分為三個部分學習,即 組件的傳值 - 父組件向子組件中傳值 事件

Vue.js中子部件及觸發動作的問題

computed ops .... flag null spa code 經驗 觸發 最近研究一個用vue.js做的程序並修改增加功能。其中用到傳值給子部件等問題。 template中有個子部件: <template> ...... <child

postman傳遞list<Integer>引數Springboot的controller

post在raw中選擇,傳遞json格式的資料,資料格式,如下: [ 30101,30103,30104 ] Springboot的controller接收的程式碼如下: public Object getModelByIds(@RequestBody List<Integ

Wpf Page間跳轉引數 And Window To Page

  這段時間用到Wpf,頁面間的跳轉網上有不少的示例,但是有些已經不能用了,尤其是頁面間的傳參問題更是一大堆,但正確的解決方案卻沒有幾個,或者說寫的不清楚,讓人走了很多彎路,檢視官方文件後發現了正確的姿勢,現記錄如下 Page頁間的跳轉      private void Button_Click(