1. 程式人生 > >airflow 文檔學習(一) 基本Operator

airflow 文檔學習(一) 基本Operator

來源 ask tag **kwargs 日期範圍 語句 reat mssql sub

1. Operator和task

簡單來說,Operator就是task的抽象類

2. BaseOperator

所有的功能性Operator的來源

2.1 參數:

task_id (string) :唯一標識task的id
owner (string)
retries (int):任務重試此時
retry_delay (timedelta) :重試間隔
start_date (datetime):任務開始時間,第一個任務實例的執行時間
end_date (datetime):如果指定的話,任務執行不會超過這個時間
depends_on_past (bool):如果指定的話,任務實例將以此運行並且依賴上一個任務的成功
wait_for_downstream (bool):在這個參數指定的任何地方,depends_on_past都將強制為true
queue(str):指定到隊列,CeleryExcutor指定特定隊列
dag(DAG):指定dag
pool(str):此任務運行的插槽池,限制任務的並發
execution_timeout (datetime.timedelta):執行此任務實例允許的最長時間,超過最長時間則任務失敗
sla(datetime.timedelta):作業預計成功時間
on_failure_callback(callable):當此任務實例失敗時調用的函數
on_retry_callback (callable) :與on_failure_callback相似,只是重試任務時觸發
on_success_callback (callable) :與on_failure_callback相似,任務成功時觸發
trigger_rule (str):定義依賴的觸發規則
  ·包括的選項有:{all_success | all_failed | all_done | one_success | one_failed | dummy},默認為all_success
run_as_user(str):在運行任務時使用unix用戶名進行模擬
executor_config (dict):特定的執行程序解釋其任務參數
task_concurrency (int):設置後,任務將限制execution_dates之間的並發運行
resources (dict):資源參數名稱(Resources構造函數的參數名稱)與其值的映射。
priority_weight(int):此任務的優先級權重
weight_rule(str):用於任務的有效總優先級權重的加權方法
max_retry_delay (timedelta):重試最大延遲間隔
retry_exponential_backoff (bool) 

2.2 方法

set_downstream(task_or_task_list)
將任務或者任務列表設置為當前任務的下遊
set_upstream(task_or_task_list)
將任務或者任務列表設置為當前任務的上遊

clear(start_date=None, end_date=None, upstream=False, downstream=False, session=None)
根據指定參數清除與任務關聯的任務實例的狀態
execute(context)
創建operator時派生的主要方法
get_direct_relative_ids(upstream=False)
獲取當前任務上遊或者下遊的直接相對id
get_direct_relatives(upstream=False)
獲取當前任務的上遊或者下遊直接關聯對象
get_flat_relative_ids(upstream=False, found_descendants=None)
獲取上遊或者下遊的關聯對象的id列表
get_flat_relatives(upstream=False)
獲取上遊或者下遊的關聯對象列表
get_task_instances(session, start_date=None, end_date=None)
獲取特定時間範圍的與此任務相關的任務實例
has_dag()
是否設置dag
on_kill()
當任務實例被殺死時,重寫方法以清除子進程
run(start_date=None, end_date=None, ignore_first_depends_on_past=False, ignore_ti_state=False, mark_success=False)
為日期範圍運行一組任務實例
post_execute(context, result=None)
調用self.execute()後立即觸發,它傳遞上下文和operator返回的結果
pre_execute(context)
調用self.execute()之前觸發
prepare_template()
模板化字段被其內容替換之後觸發
render_template(attr, content, context)
從文件或直接在字段中呈現模板,並返回呈現結果
render_template_from_field(attr, content, context, jinja_env)
從字段中呈現模板
xcom_pull(context, task_ids=None, dag_id=None, key=‘return_value‘, include_prior_dates=None)
xcom_push(context, key, value, execution_date=None) 

2.3 可調用值

dag:如果設置了則返回dag,否則報錯
deps:返回依賴任務列表
downstream_list:下遊任務列表
schedule_interval:任務排列
upstream_list:上遊任務列表

3. BaseSensorOperator

基於 airflow.models.BaseOperator, airflow.models.SkipMixin

3.1 參數:

soft_fail (bool):設置為true以將任務標記為失敗時的skipped
poke_interval (int):作業在每次嘗試之間應等待的時間(單位:秒)
timeout (int):超時時間

3.2 方法:

execute(context)
poke(context)

4. Core Operators

4.1 airflow.sensors.base_sensor_operator.BaseSensorOperator(poke_interval=60, timeout=604800, soft_fail=False, *args, **kwargs)

基於 airflow.models.BaseOperator

4.1.1 參數:

bash_command (string):bash命令
xcom_push (bool):設置為true,則當bash命令完成之後,寫入stdout的最後一行也會被推送到xcom
env (dict):如果不為None的話,則它定義新進程的環境變量的映射,用於替代當前的進程環境

4.1.2 方法:

execute(context)
會在臨時目錄中執行bash,並且之後會對其進行清理
on_kill()
同BaseOperator

4.2 airflow.operators.bash_operator.BashOperator(bash_command, xcom_push=False, env=None, output_encoding=‘utf-8‘, *args, **kwargs)

基於 airflow.operators.python_operator.PythonOperator, airflow.models.SkipMixin

4.3 airflow.operators.python_operator.BranchPythonOperator(python_callable, op_args=None, op_kwargs=None, provide_context=False, templates_dict=None, templates_exts=None, *args, **kwargs)

基於 airflow.operators.python_operator.PythonOperator, airflow.models.SkipMixin

4.4 airflow.operators.check_operator.CheckOperator(sql, conn_id=None, *args, **kwargs)

基於 airflow.models.BaseOperator

CheckOperator需要返回一個單行的sql查詢,第一行的每個值都需要使用python bool cast進行計算,如果任何返回值false,則檢查失敗並輸出錯誤

4.4.1 註意:

1)python bool cast 會將一下視作false:

?False

?0

?Empty string ("")

?Empty list ([])

?Empty dictionary or set ({})

2)這是一個抽象類,需要定義get_db_hook,而get_db_hook是鉤子,從外部獲取單個記錄

4.4.2 參數:

sql (string):執行的sql語句   

4.5 airflow.operators.email_operator.EmailOperator(to, subject, html_content, files=None, cc=None, bcc=None, mime_subtype=‘mixed‘, mime_charset=‘us_ascii‘, *args, **kwargs)

基於 airflow.models.BaseOperator

功能:發送一封郵件

4.5.1 參數:

to (list or string (comma or semicolon delimited)):發送電子郵件的郵件列表
subject (string):郵件主題
html_content (string):郵件內容(允許html標記)
files (list):附件名稱
cc (list or string (comma or semicolon delimited)):抄送列表
bcc (list or string (comma or semicolon delimited)):bcc列表
mime_subtype (string):MIME字內容類型
mime_charset (string):字符集參數添加到 Content-Type 頭部

4.6 airflow.operators.mysql_operator.MySqlOperator(sql, mysql_conn_id=‘mysql_default‘, parameters=None, autocommit=False, database=None, *args, **kwargs)

基於 airflow.models.BaseOperator

功能:在指定的mysql數據庫中執行sql代碼

4.6.1 參數:

mysql_conn_id (string):數據庫名
sql:執行的sql語句
database (string):覆蓋連接中定義的數據庫名稱

4.7 airflow.operators.presto_check_operator.PrestoValueCheckOperator(sql, pass_value, tolerance=None, presto_conn_id=‘presto_default‘, *args, **kwargs)

基於:airflow.models.BaseOperator

4.7.1 參數:

python_callable (python callable):可調用對象的引用
op_kwargs (dict):關鍵字參數字典,將在調用函數中解壓
op_args (list):調用callable時將解壓縮的位置參數列表
provide_context (bool):如果設置為true,Airflow將傳遞一組可在函數中使用的關鍵字參數。 這組kwargs完全對應於你在jinja模板中可以使用的內容,為此,您需要在函數頭中定義** kwargs
templates_dict (dict of str):一個字典,其中的值是模板,這些模板將在__init__和執行之間的某個時間由Airflow引擎進行模板化,並在應用模板後在可調用的上下文中可用
templates_exts (list(str)):處理模板化字段時要解析的文件擴展名列表

4.8 airflow.operators.python_operator.PythonVirtualenvOperator(python_callable, requirements=None, python_version=None, use_dill=False, system_site_packages=True, op_args=None, op_kwargs=None, string_args=None, templates_dict=None, templates_exts=None, *args, **kwargs)

4.9 airflow.operators.python_operator.ShortCircuitOperator(python_callable, op_args=None, op_kwargs=None, provide_context=False, templates_dict=None, templates_exts=None, *args, **kwargs)

基於:airflow.operators.python_operator.PythonOperator, airflow.models.SkipMixin

功能:僅在滿足條件時才允許工作流繼續。 否則,將跳過工作流程“短路”和下遊任務。
派生自PythonOperator,條件結果由python_callable決定

4.10 airflow.sensors.http_sensor.HttpSensor(endpoint, http_conn_id=‘http_default‘, method=‘GET‘, request_params=None, headers=None, response_check=None, extra_options=None, *args, **kwargs)

基於:airflow.sensors.base_sensor_operator.BaseSensorOperator

4.10.1 參數:

http_conn_id (string):連接
method (string):方法
endpoint (string):完整的url
request_params (a dictionary of string key/value pairs):添加到get的url參數
headers (a dictionary of string key/value pairs):添加到get請求的http頭部
response_check (A lambda or defined function.):檢查請求相應對象
extra_options(選項字典,其中key是字符串,值取決於要修改的選項。):“請求”庫的額外選項,請參閱“請求”文檔(修改超時,ssl等選項)

4.11 airflow.sensors.sql_sensor.SqlSensor(conn_id, sql, *args, **kwargs)

基於:airflow.sensors.base_sensor_operator.BaseSensorOperator

功能:運行sql語句,直到滿足條件。 它將繼續嘗試,而sql不返回任何行,或者如果第一個單元格返回(0,‘0‘,‘‘)

4.11.1 參數:

conn_id (string)
sql

4.12 airflow.sensors.time_sensor.TimeSensor(target_time, *args, **kwargs)

基於:airflow.sensors.base_sensor_operator.BaseSensorOperator

功能:等到當天的指定時間

4.12.1 參數:

target_time (datetime.time) 

其他

4.13 airflow.sensors.time_delta_sensor.TimeDeltaSensor(delta, *args, **kwargs)
4.14 airflow.sensors.web_hdfs_sensor.WebHdfsSensor(filepath, webhdfs_conn_id=‘webhdfs_default‘, *args, **kwargs)
4.15 airflow.operators.latest_only_operator.LatestOnlyOperator(task_id, owner=‘Airflow‘, email=None, email_on_retry=True, email_on_failure=True, retries=0, retry_delay=datetime.timedelta(0, 300), retry_exponential_backoff=False, max_retry_delay=None, start_date=None, end_date=None, schedule_interval=None, depends_on_past=False, wait_for_downstream=False, dag=None, params=None, default_args=None, adhoc=False, priority_weight=1, weight_rule=‘downstream‘, queue=‘default‘, pool=None, sla=None, execution_timeout=None, on_failure_callback=None, on_success_callback=None, on_retry_callback=None, trigger_rule=‘all_success‘, resources=None, run_as_user=None, task_concurrency=None, executor_config=None, inlets=None, outlets=None, *args, **kwargs)
4.16 airflow.operators.mssql_operator.MsSqlOperator(sql, mssql_conn_id=‘mssql_default‘, parameters=None, autocommit=False, database=None, *args, **kwargs)
4.17 airflow.operators.mssql_to_hive.MsSqlToHiveTransfer(sql, hive_table, create=True, recreate=False, partition=None, delimiter=‘x01‘, mssql_conn_id=‘mssql_default‘, hive_cli_conn_id=‘hive_cli_default‘, tblproperties=None, *args, **kwargs)
4.18 airflow.operators.dummy_operator.DummyOperator(*args, **kwargs)
4.19 airflow.operators.mysql_to_hive.MySqlToHiveTransfer(sql, hive_table, create=True, recreate=False, partition=None, delimiter=‘x01‘, mysql_conn_id=‘mysql_default‘, hive_cli_conn_id=‘hive_cli_default‘, tblproperties=None, *args, **kwargs)
4.20 airflow.operators.oracle_operator.OracleOperator(sql, oracle_conn_id=‘oracle_default‘, parameters=None, autocommit=False, *args, **kwargs)
4.21 airflow.operators.pig_operator.PigOperator(pig, pig_cli_conn_id=‘pig_cli_default‘, pigparams_jinja_translate=False, *args, **kwargs)
4.22 airflow.operators.postgres_operator.PostgresOperator(sql, postgres_conn_id=‘postgres_default‘, autocommit=False, parameters=None, database=None, *args, **kwargs)
4.23 airflow.operators.presto_check_operator.PrestoCheckOperator(sql, presto_conn_id=‘presto_default‘, *args, **kwargs)
4.24 airflow.operators.presto_check_operator.PrestoIntervalCheckOperator(table, metrics_thresholds, date_filter_column=‘ds‘, days_back=-7, presto_conn_id=‘presto_default‘, *args, **kwargs)
4.25 airflow.operators.presto_to_mysql.PrestoToMySqlTransfer(sql, mysql_table, presto_conn_id=‘presto_default‘, mysql_conn_id=‘mysql_default‘, mysql_preoperator=None, *args, **kwargs)
4.26 airflow.operators.druid_check_operator.DruidCheckOperator(sql, druid_broker_conn_id=‘druid_broker_default‘, *args, **kwargs)
4.27 airflow.operators.generic_transfer.GenericTransfer(sql, destination_table, source_conn_id, destination_conn_id, preoperator=None, *args, **kwargs)
4.28 airflow.operators.hive_to_druid.HiveToDruidTransfer(sql, druid_datasource, ts_dim, metric_spec=None, hive_cli_conn_id=‘hive_cli_default‘, druid_ingest_conn_id=‘druid_ingest_default‘, metastore_conn_id=‘metastore_default‘, hadoop_dependency_coordinates=None, intervals=None, num_shards=-1, target_partition_size=-1, query_granularity=‘NONE‘, segment_granularity=‘DAY‘, hive_tblproperties=None, *args, **kwargs)
4.29 airflow.operators.s3_file_transform_operator.S3FileTransformOperator(source_s3_key, dest_s3_key, transform_script=None, select_expression=None, source_aws_conn_id=‘aws_default‘, dest_aws_conn_id=‘aws_default‘, replace=False, *args, **kwargs)
4.30 airflow.operators.s3_to_hive_operator.S3ToHiveTransfer(s3_key, field_dict, hive_table, delimiter=‘, ‘, create=True, recreate=False, partition=None, headers=False, check_headers=False, wildcard_match=False, aws_conn_id=‘aws_default‘, hive_cli_conn_id=‘hive_cli_default‘, input_compressed=False, tblproperties=None, select_expression=None, *args, **kwargs)
4.31 airflow.operators.s3_to_redshift_operator.S3ToRedshiftTransfer(schema, table, s3_bucket, s3_key, redshift_conn_id=‘redshift_default‘, aws_conn_id=‘aws_default‘, copy_options=(), autocommit=False, parameters=None, *args, **kwargs)
4.32 airflow.operators.hive_to_mysql.HiveToMySqlTransfer(sql, mysql_table, hiveserver2_conn_id=‘hiveserver2_default‘, mysql_conn_id=‘mysql_default‘, mysql_preoperator=None, mysql_postoperator=None, bulk_load=False, *args, **kwargs)
4.33 airflow.operators.hive_to_samba_operator.Hive2SambaOperator(hql, destination_filepath, samba_conn_id=‘samba_default‘, hiveserver2_conn_id=‘hiveserver2_default‘, *args, **kwargs)
4.34 airflow.sensors.metastore_partition_sensor.MetastorePartitionSensor(table, partition_name, schema=‘default‘, mysql_conn_id=‘metastore_mysql‘, *args, **kwargs)
4.35 airflow.sensors.named_hive_partition_sensor.NamedHivePartitionSensor(partition_names, metastore_conn_id=‘metastore_default‘, poke_interval=180, hook=None, *args, **kwargs)
4.36 airflow.sensors.s3_key_sensor.S3KeySensor(bucket_key, bucket_name=None, wildcard_match=False, aws_conn_id=‘aws_default‘, *args, **kwargs)
4.37 airflow.sensors.s3_prefix_sensor.S3PrefixSensor(bucket_name, prefix, delimiter=‘/‘, aws_conn_id=‘aws_default‘, *args, **kwargs)
4.38 airflow.operators.hive_operator.HiveOperator(hql, hive_cli_conn_id=‘hive_cli_default‘, schema=‘default‘, hiveconfs=None, hiveconf_jinja_translate=False, script_begin_tag=None, run_as_owner=False, mapred_queue=None, mapred_queue_priority=None, mapred_job_name=None, *args, **kwargs)
4.39 airflow.operators.hive_stats_operator.HiveStatsCollectionOperator(table, partition, extra_exprs=None, col_blacklist=None, assignment_func=None, metastore_conn_id=‘metastore_default‘, presto_conn_id=‘presto_default‘, mysql_conn_id=‘airflow_db‘, *args, **kwargs)
4.40 airflow.operators.check_operator.IntervalCheckOperator(table, metrics_thresholds, date_filter_column=‘ds‘, days_back=-7, conn_id=None, *args, **kwargs)
4.41 airflow.operators.jdbc_operator.JdbcOperator(sql, jdbc_conn_id=‘jdbc_default‘, autocommit=False, parameters=None, *args, **kwargs)

airflow 文檔學習(一) 基本Operator