1. 程式人生 > 其它 >airflow sample to pass metadata to task. -----XCOM

airflow sample to pass metadata to task. -----XCOM

1. At first, let us take a look at one sample which one we do not need to pass the metadata

import random
from datetime import datetime, timedelta
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator
from airflow import DAG

default_args = {
    'owner': 'Jasmine Qian
', 'start_date': days_ago(0), 'email': ['[email protected]'], 'retries': 0, 'retry_delay': timedelta(minutes=2), } dag = DAG( 'Pass_Metadata', default_args=default_args, tags=['meta', 'params'], schedule_interval=None, catchup=False, ) def run_this_fun(**context):
print("Hi") def check_state(**context): if random.random() > 0.7: raise Exception('Exception') print("I am OK") run_this_task1 = PythonOperator( task_id="run_this", python_callable=check_state, provide_context=True, dag=dag, ) run_this_task2 = PythonOperator( task_id
="run_this2", python_callable=run_this_fun, provide_context=True, dag=dag, ) run_this_task1 >> run_this_task2 # run_this_task2 if __name__ == "__main__": dag.cli()

2. how about task1 passing any metadata to task2?

import random
from datetime import datetime, timedelta
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator
from airflow import DAG

default_args = {
    'owner': 'Jasmine Qian',
    'start_date': days_ago(0),
    'email': ['[email protected]'],
    'retries': 0,
    'retry_delay': timedelta(minutes=2),
}

dag = DAG(
    'Pass_Metadata_v2',
    default_args=default_args,
    tags=['meta', 'params'],
    schedule_interval=None,
    catchup=False,
)


def run_this_fun(**context):
    received_value = context['ti'].xcom_pull(key='random_value')
    print("Hi, I received the value {}".format(str(received_value)))


def push_to_xcom(**context):
    random_value = random.random()
    context['ti'].xcom_push(key='random_value', value=random_value)
    print("I am OK")


run_this_task1 = PythonOperator(
    task_id="run_this",
    python_callable=push_to_xcom,
    provide_context=True,
    dag=dag,
)

run_this_task2 = PythonOperator(
    task_id="run_this2",
    python_callable=run_this_fun,
    provide_context=True,
    dag=dag,
)

run_this_task1 >> run_this_task2

# run_this_task2
if __name__ == "__main__":
    dag.cli()
------------------------- A little Progress a day makes you a big success... ----------------------------