pyspark spark客戶端設定
阿新 • • 發佈:2021-08-02
1,SparkContext和sparkSession的區別
1-1、#構建SparkContext,讀取檔案 from pyspark import * import os os.environ["YARN_CONF_DIR"] = "/etc/spark2/conf/yarn-conf" conf = SparkConf().setAppName("appname").setSparkHome("/opt/cloudera/parcels/SPARK2/lib/spark2").setMaster("yarn") sc = SparkContext(conf=conf) rdd= sc.textFile("hdfs://part*",10) 1-2、#構建SparkSession 執行hive語句(這個也能完成1中的功能,所以能用這個儘量用這個) from pyspark.sql import * spark = SparkSession.builder.appName("appName").master("yarn").enableHiveSupport().getOrCreate() get_duration_sql=''' select id,duration from *_db.da_* where day='20210126' and duration is not null''' rdd=spark.sql(get_duration_sql).rdd
SparkSession是Spark 2.0引如的新概念。SparkSession為使用者提供了統一的切入點,來讓使用者學習spark的各項功能。
在spark的早期版本中,SparkContext是spark的主要切入點,由於RDD是主要的API,我們通過sparkcontext來建立和操作RDD。對於每個其他的API,我們需要使用不同的context。例如,對於Streming,我們需要使用StreamingContext;對於sql,使用sqlContext;對於Hive,使用hiveContext。但是隨著DataSet和DataFrame的API逐漸成為標準的API,就需要為他們建立接入點。所以在spark2.0中,引入SparkSession作為DataSet和DataFrame API的切入點,SparkSession封裝了SparkConf、SparkContext和SQLContext。為了向後相容,SQLContext和HiveContext也被儲存下來。
SparkSession實質上是SQLContext和HiveContext的組合(未來可能還會加上StreamingContext),所以在SQLContext和HiveContext上可用的API在SparkSession上同樣是可以使用的。SparkSession內部封裝了sparkContext,所以計算實際上是由sparkContext完成的
資料連結:https://blog.csdn.net/beautiful_huang/article/details/103820534
2,sparkSession的使用
if len(sys.argv) < 4: print('input error') directory = sys.argv[1] day = sys.argv[2] hour = sys.argv[3] if hour=="24": hour="*" # 配置環境 os.environ["YARN_CONF_DIR"] = "/etc/spark2/conf/yarn-conf" # 例項化spark spark = SparkSession.builder. \ appName("spark_slot_eva"). \ config("spark.sql.shuffle.partitions", 10). \ config("spark.default.parallelism", 1600). \ config("hive.warehouse.subdir.inherit.perms", "false"). \ config("spark.executor.cores", "4"). \ config("spark.executor.instances", "200"). \ config("spark.executor.memory", "4g"). \ config("spark.port.maxRetries","100").\ config("spark.yarn.executor.memoryOverhead","26g").\ enableHiveSupport(). \ getOrCreate() sc = spark.sparkContext # 讀取樣本資料集和slot.conf dir_list=[directory,day,hour,"part*"] final_dir="/".join(dir_list) print(final_dir) rdd = sc.textFile(final_dir) # print('------------------rdd----------------', rdd.first()) # 2.計算資料的總行數 line_num = rdd.count() print('-------------------------------------line_num--------------', line_num) # 計算特徵覆蓋率 dict1 = get_slot_cover(rdd,line_num) dict2 = get_slot_num(rdd) dict3 = get_slot_num_avg(rdd,line_num) res2hive(spark,day,hour,dict1,dict2,dict3)
spark.executor.memory + spark.yarn.executor.memoryOverhead < 30G 在執行的有這個限制(公司環境限制)