1. 程式人生 > 其它 >大資料使用Airflow實現簡單的工作流排程

大資料使用Airflow實現簡單的工作流排程

 Airflow是一個以程式設計方式編寫,安排和監視工作流的平臺。

使用Airflow將實用工作流任務編寫的有向無環圖(DAG)。一個流程計劃程式在遵循指定的依賴項同時在一組工作執行緒上執行任務。豐富的使用者使檢視生產執行的管道問題,監視中的故障以及正在顯示的故障時需要對進行解決改變的容易。

 

1、編寫Dag任務指令碼

1. 啟動阿里雲伺服器,並啟動hadoop叢集。

2. 叢集叢集節點間ssh免密登入。

[root@airflowairflow]# vim /etc/hosts172.26.16.78airflow airflow172.26.16.41hadoop101 hadoop101172.26.16.39hadoop102 hadoop102172.26.16.40hadoop103 hadoop103 [root@airflow~]# ssh-keygen -t rsa[root@airflow~] # ssh-copy-id hadoop101[root@airflow~]# ssh-copy-id hadoop102[root@airflow~]# ssh-copy-id hadoop103

3.建立work-py目錄下寫python指令碼,編寫.py

[root@airflow~]# mkdir -p /opt/module/work-py[root@airflow~]# cd /opt/module/work-py/[root@airflowwork-py]# vim test.py #!/ usr/bin/pythonfromairflow import DAGfromairflow.operators.bash_operator import BashOperatorfromdatetime import datetime, timedelta default_args= { 'owner': 'test_owner', 'depends_on_past': True, 'email': ['[email protected]'], 'start_date ':datetime(2020,12,15), 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), # 'queue': 'bash_queue', # 'pool': 'backfill', # 'priority_weight': 10, # 'end_date': datetime(2016, 1, 1),}dag =DAG('test', default_args=default_args,schedule_interval=timedelta(days=1)) t1 =BashOperator(task_id='dwd', bash_command='ssh hadoop103"spark-submit --master yarn--deploy-mode client --driver-memory 1g --num-executors 2 --executor-cores 2--executor-memory 2g --class com.atguigu.member.controller.DwdMemberController--queue spark /root/work/com_atguigu_warehouse-1.0-SNAPSHOT-jar-with-dependencies.jar"',重試=3, dag=dag) t2 =BashOperator(task_id='dws', bash_command='ssh hadoop103"spark-submit --master yarn--deploy-mode client --driver-memory 1g --num-executors 2 -- executor-cores 2--executor-memory 2g --class com.atguigu.member.controller.DwsMemberController--queue spark /root/work/com_atguigu_warehouse-1.0-SNAPSHOT-jar-with-dependencies.jar"', retries=3 , dag=dag) t3 =BashOperator(task_id='ads',bash_command='ssh hadoop103"spark-submit --master yarn--deploy-mode client --driver-memory 1g --num-executors 2 --executor-cores 2--executor-memory 2g --class com.atguigu。 member.controller.AdsMemberController--佇列火花 /root/work/com_atguigu_warehouse-1.0-SNAPSHOT-jar-with-dependencies.jar"', retries=3, dag=dag) t2.set_upstream(t1)t3.set_upstream(t2)

指令碼解讀:

default_args 設定預設引數

depends_on_past 是否開啟任務依賴

schedule_interval 排程頻率

重試次數

start_date 開始時間

Bash 操作者任務,如果為真為假指定執行一個必須成功完成的任務,則執行是否成功完成。

task_id 任務唯一標識(必填)

bash_command 具體任務執行命令

如set_upstream 設定依賴上圖展示廣告任務依賴dws任務依賴dwd任務

 

注意:

必須導包

從氣流匯入 DAG

從airflow.operators.bash_operator importBashOperator

 

4.配置JDK

注意:ssh的目標機(hadoop002) /etc/bashrc裡必須配置java環境變數,配置完後原始碼。

(python3)[root@airflow work-py]# vim /etc/bashrc(python3)[root@airflow work-py]# source /etc/bashrc

 

 

5. 檢視Airflow,獲取dag檔案存放目錄檔案

 

(python3)[root@airflow work-py]# vim ~/airflow/airflow.cfg

 

 

6.按照配置檔案中配置的路徑,建立dd檔案存放目錄,將.py指令碼呼叫此目錄。

(python3)[root@airflow work-py]# mkdir ~/airflow/dags(python3)[root@airflow work-py]# cp test.py ~/airflow/dags/

7.等待,重新整理任務列表,可以看到列表中,出現測試任務。

(python3)[root@airflow work-py]#airflow list_dags------------------------------------ --------------------------------------------DAGS------------------ -------------------------------------------------example_bash_operatorexample_branch_dop_operator_v3example_branch_operatorexample_complexexample_external_task_marker_childexample_external_task_marker_parentexample_http_operatorexample_kubernetes_executor_configexample_nested_branch_dagexample_passing_params_via_test_commandexample_pig_operatorexample_python_operatorexample_short_circuit_operatorexample_skip_dagexample_operatorexample_sub_dagexample .section-1example_subdag_operator.section-2example_trigger_controller_dagexample_trigger_target_dagexample_xcomlatest_onlylatest_only_with_triggertesttest_utilstutorial

8.重新整理Airflow的網頁頁面,已經出現測試任務。

 

9. 點選執行測試任務。

 

 

10.點選成功任務,檢視日誌。

 

 

 

 

11.檢視dag圖,甘特圖。

 

 

 

 

12.檢視指令碼程式碼。

 

2、Dag任務操作

1. 刪除dag任務。

 

2.通過執行以下命令,可以重新新增dag任務。

(python3)[root@airflow work-py]# airflow list_tasks test --treeThe'list_tasks' 命令在 Airflow 2.0 中已棄用並刪除,請改用'tasks list' [2020-12-1511:17:08,981] {__init__ .py:50} INFO - 使用執行器 SequentialExecutor[2020-12-1511:17:08,982] {dagbag.py:417} INFO - 從 /root/airflow/dags<Task(BashOperator):dwd> 填充 DagBag任務(BashOperator):dws> <任務(BashOperator):廣告>

3.檢視當前所有dag任務,可以回來檢視測試任務被重新添加了。

(python3)[root@airflow work-py]#(python3)[root@airflow work-py]#airflow list_dags 'list_dags' 命令在 Airflow 2.0 中已棄用並刪除,請使用 'dags list' 或 'dags report'相反[2020-12-1511:33:57,106] {__init__.py:50} 資訊 - 使用執行器 SequentialExecutor [2020-12-1511:33:57,106] {dagbag.py:417} 資訊 - 從 /root/airflow/dags 填充 DagBag ------------------------------------ ------------------DAGS-- -------------------------------------------------- example_bash_operatorexample_branch_dop_operator_v3example_branch_operatorexample_complexexample_external_task_marker_childexample_external_task_marker_parentexample_http_operatorexample_kubernetes_executor_configexample_nested_branch_dagexample_passing_params_via_test_commandexample_pig_operatorexample_python_operatorexample_short_circuit_operatorexample_skip_dagexample_subdag_operatorexample_subdag_operator.section-1example_subdag_operator.section-2example_trigger_controller_dagexample_trigger_target_dagexample_xcomlatest_onlylatest_only_with_triggertesttest_utilstutorialsection-1example_subdag_operator.section-2example_trigger_controller_dagexample_trigger_target_dagexample_xcomlatest_onlylatest_only_with_triggertesttest_utilstutorialsection-1example_subdag_operator.section-2example_trigger_controller_dagexample_trigger_target_dagexample_xcomlatest_onlylatest_only_with_triggertesttest_utilstutorial

 

4.重新新增的dag任務。

 

3、配置郵件伺服器

1.首先確保所有郵箱已經開啟SMTP服務。

2. 修改氣流配置檔案,如下:

(Python3)[root @ airflow work-py] #vim〜/ airflow / airflow.cfgsmtp_host = smtp.qq.comsmtp_starttls = truesmtp_ssl = falsesmtp_user = [email protected]#smtp_user = smtp_password = wjmfbxkfvypdebeg#smtp_password = smtp_port = 587smtp_mail_from = 2473196869 @qq.com

3.重啟氣流。

(python3)[root@airflow 氣流]# ps -ef|egrep 'scheduler|airflow-webserver'|grep -vgrep|awk '{print $2}'|xargs kill -15(python3)[root@airflow 氣流]# ps -ef |grep 氣流根 745 1 0 09:50 ?00:00:00 /sbin/dhclient -1 -q -lf/var/lib/dh​​client/dhclient--eth0.lease -pf /var/run/dhclient-eth0.pid -Hairflow eth0root 7875 1851 0 12:51 分/1 00:00:00 grep --color=auto 氣流(python3)[root@airflow 氣流]# kill -15 745 (python3)[root@airflow 氣流]# 氣流網路伺服器 -p 8080 -D (python3)[root @airflow 氣流]# 氣流排程器 -D

4.重新編輯test.py指令碼檔案,並替換。

[root@airflow~]# cd /opt/module/work-py/[root@airflowwork-py]# vim test.py #!/usr/bin/pythonfromairflow import DAGfromairflow.operators.bash_operator import BashOperatorfromairflow.operators.email_operator import EmailOperatorfromdatetime import datetime, timedelta default_args= { 'owner': 'test_owner', 'depends_on_past': True, 'email': ['[email protected]'], 'start_date':datetime(2020,12,15), ' email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), # 'queue': 'bash_queue', # 'pool': 'backfill', # 'priority_weight' : 10, # 'end_date': datetime(2016, 1, 1),}dag =DAG('test', default_args=default_args,schedule_interval=timedelta(days=1)) t1 =BashOperator(task_id='dwd', bash_command='ssh hadoop103"spark-submit --master yarn--deploy-mode client --driver-memory 1g --num-executors 2 --executor-cores 2--executor-memory 2g --class com.atguigu.member.controller.DwdMemberController--queue spark /root/work/com_atguigu_warehouse-1.0-SNAPSHOT-jar-with-dependencies.jar"',重試=3, dag=dag) t2 =BashOperator(task_id='dws', bash_command='ssh hadoop103"spark-submit --master yarn--deploy-mode client --driver-memory 1g --num-executors 2 -- executor-cores 2--executor-memory 2g --class com.atguigu.member.controller.DwsMemberController--queue spark /root/work/com_atguigu_warehouse-1.0-SNAPSHOT-jar-with-dependencies.jar"', retries=3 , dag=dag) t3 =BashOperator(task_id='ads',bash_command='ssh hadoop103"spark-submit --master yarn--deploy-mode client --driver-memory 1g --num-executors 2 --executor-cores 2--executor-memory 2g --class com.atguigu。 member.controller.AdsMemberController--佇列火花 /root/work/com_atguigu_warehouse-1.0-SNAPSHOT-jar-with-dependencies.jar"', retries=3,dag=dag) email=EmailOperator( task_id="email", to= "[email protected]", subject="test-subject", html_content="<h1>test-content</h1>", cc="[email protected]", dag=dag) t2.set_upstream(t1 )t3.set_upstream(t2)email.set_upstream(t3) (python3)[root@airflow work-py]# cp test.py ~/airflow/dags/spark-submit --master yarn--deploy-mode client --driver-memory 1g --num-executors 2 --executor-cores 2--executor-memory 2g --class com.atguigu.member.controller.AdsMemberController- -queue spark /root/work/com_atguigu_warehouse-1.0-SNAPSHOT-jar-with-dependencies.jar"', retries=3,dag=dag) email=EmailOperator( task_id="email", to="[email protected] ", subject="test-subject", html_content="<h1>test-content</h1>", cc="[email protected]", dag=dag) t2.set_upstream(t1)t3.set_upstream(t2 )email.set_upstream(t3) (python3)[root@airflow work-py]# cp test.py ~/airflow/dags/spark-submit --master yarn--deploy-mode client --driver-memory 1g --num-executors 2 --executor-cores 2--executor-memory 2g --class com.atguigu.member.controller.AdsMemberController- -queue spark /root/work/com_atguigu_warehouse-1.0-SNAPSHOT-jar-with-dependencies.jar"', retries=3,dag=dag) email=EmailOperator( task_id="email", to="[email protected] ", subject="test-subject", html_content="<h1>test-content</h1>", cc="[email protected]", dag=dag) t2.set_upstream(t1)t3.set_upstream(t2 )email.set_upstream(t3) (python3)[root@airflow work-py]# cp test.py ~/airflow/dags/dag=dag) email=EmailOperator( task_id="email", to="[email protected]", subject="test-subject", html_content="<h1>test-content</h1>", cc=" [email protected]", dag=dag) t2.set_upstream(t1)t3.set_upstream(t2)email.set_upstream(t3) (python3)[root@airflow work-py]# cp test.py ~/airflow/dags /dag=dag) email=EmailOperator( task_id="email", to="[email protected]", subject="test-subject", html_content="<h1>test-content</h1>", cc=" [email protected]", dag=dag) t2.set_upstream(t1)t3.set_upstream(t2)email.set_upstream(t3) (python3)[root@airflow work-py]# cp test.py ~/airflow/dags /

5.檢視頁面是否有效。

6. 執行測試,檢視執行情況和郵件。