1. 程式人生 > 其它 >pyspark spark客戶端設定

pyspark spark客戶端設定

1,SparkContext和sparkSession的區別

1-1、#構建SparkContext,讀取檔案

    from pyspark import *
    import os
    os.environ["YARN_CONF_DIR"] = "/etc/spark2/conf/yarn-conf"

    conf = SparkConf().setAppName("appname").setSparkHome("/opt/cloudera/parcels/SPARK2/lib/spark2").setMaster("yarn")
    sc = SparkContext(conf=conf)
    rdd 
= sc.textFile("hdfs://part*",10) 1-2、#構建SparkSession 執行hive語句(這個也能完成1中的功能,所以能用這個儘量用這個) from pyspark.sql import * spark = SparkSession.builder.appName("appName").master("yarn").enableHiveSupport().getOrCreate() get_duration_sql=''' select id,duration from *_db.da_* where day='20210126' and duration is not null
''' rdd=spark.sql(get_duration_sql).rdd

SparkSession是Spark 2.0引如的新概念。SparkSession為使用者提供了統一的切入點,來讓使用者學習spark的各項功能。
 在spark的早期版本中,SparkContext是spark的主要切入點,由於RDD是主要的API,我們通過sparkcontext來建立和操作RDD。對於每個其他的API,我們需要使用不同的context。例如,對於Streming,我們需要使用StreamingContext;對於sql,使用sqlContext;對於Hive,使用hiveContext。但是隨著DataSet和DataFrame的API逐漸成為標準的API,就需要為他們建立接入點。所以在spark2.0中,引入SparkSession作為DataSet和DataFrame API的切入點,SparkSession封裝了SparkConf、SparkContext和SQLContext。為了向後相容,SQLContext和HiveContext也被儲存下來。
 SparkSession實質上是SQLContext和HiveContext的組合(未來可能還會加上StreamingContext),所以在SQLContext和HiveContext上可用的API在SparkSession上同樣是可以使用的。SparkSession內部封裝了sparkContext,所以計算實際上是由sparkContext完成的
資料連結:https://blog.csdn.net/beautiful_huang/article/details/103820534

2,sparkSession的使用

    if len(sys.argv) < 4:
        print('input error')
    directory = sys.argv[1]
    day = sys.argv[2]
    hour = sys.argv[3]
    if hour=="24":
        hour="*"
    # 配置環境
    os.environ["YARN_CONF_DIR"] = "/etc/spark2/conf/yarn-conf"
    # 例項化spark
    spark = SparkSession.builder. \
        appName("spark_slot_eva"). \
        config("spark.sql.shuffle.partitions", 10). \
        config("spark.default.parallelism", 1600). \
        config("hive.warehouse.subdir.inherit.perms", "false"). \
        config("spark.executor.cores", "4"). \
        config("spark.executor.instances", "200"). \
        config("spark.executor.memory", "4g"). \
        config("spark.port.maxRetries","100").\
        config("spark.yarn.executor.memoryOverhead","26g").\
        enableHiveSupport(). \
        getOrCreate()
    sc = spark.sparkContext
    # 讀取樣本資料集和slot.conf
    dir_list=[directory,day,hour,"part*"]
    final_dir="/".join(dir_list)
    print(final_dir)
    rdd = sc.textFile(final_dir)
    # print('------------------rdd----------------', rdd.first())
    # 2.計算資料的總行數
    line_num = rdd.count()
    print('-------------------------------------line_num--------------', line_num)
     # 計算特徵覆蓋率
    dict1 = get_slot_cover(rdd,line_num)
    dict2 = get_slot_num(rdd)
    dict3 = get_slot_num_avg(rdd,line_num)
    res2hive(spark,day,hour,dict1,dict2,dict3)
spark.executor.memory  + spark.yarn.executor.memoryOverhead  < 30G  在執行的有這個限制(公司環境限制)