1. 程式人生 > 其它 >airflow sample to start google chrome broswer

airflow sample to start google chrome broswer

from datetime import datetime, timedelta
from airflow.utils.dates import days_ago
from airflow import DAG
from airflow.operators.python import PythonOperator
from selenium import webdriver
from selenium.webdriver.chrome.options import Options


default_args = {
    'owner': 'Jasmine Qian',
    'start_date
': days_ago(0), 'email': ['[email protected]], 'retries': 0, 'retry_delay': timedelta(minutes=2), } dag = DAG( 'Python_selenium', default_args=default_args, tags=['python', 'selenium'], start_date=datetime(2021, 1, 1), catchup=False, ) def login(): url = "http://www.360doc.com/content/19/0217/09/33525635_815480537.shtml
" print(url) chrome_options = Options() chrome_options.add_argument("--no-sandbox") # linux only chrome_options.add_argument("--headless") # chrome_options.headless = True # also works driver = webdriver.Chrome(options=chrome_options) actual_url = driver.get(url) print
(actual_url) bodyContent = driver.find_element_by_tag_name('body').text print(bodyContent) driver.close() print("Succeed@@") def connet_google(): url = "http://www.google.com" print(url) chrome_options = Options() chrome_options.add_argument("--no-sandbox") # linux only chrome_options.add_argument("--headless") # chrome_options.headless = True # also works driver = webdriver.Chrome(options=chrome_options) bodyContent = driver.find_element_by_tag_name('body').text print(bodyContent) driver.close() print("Succeed@@") user_login = PythonOperator( task_id='login', python_callable=login, dag=dag, ) connet_google = PythonOperator( task_id='google', python_callable=connet_google, dag=dag, ) user_login >> connet_google if __name__ == "__main__": dag.cli()
------------------------- A little Progress a day makes you a big success... ----------------------------