1. 程式人生 > 資料庫 >【夢溪筆談】6.spark-sql相關程式碼

【夢溪筆談】6.spark-sql相關程式碼

import os
import sys
#import datetime
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql import SparkSession

#不啟動BroadcastJoin 、conf spark.speculation=true
spark = SparkSession \
    .builder \
    .appName("app_test.py") \
    .enableHiveSupport() \
    .config("spark.dynamicAllocation.maxExecutors", "400") \
    .config("spark.sql.autoBroadcastJoinThreshold",-1) \
    .config("spark.yarn.executor.memoryOverhead", 3702) \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.repartition.enabled", "true") \
    .config("spark.log.level", "ERROR") \
    .config("spark.speculation", "true") \
    .config("spark.sql.hive.convertMetastoreOrc", "true")\
    .getOrCreate()
spark.sql("set hive.exec.dynamic.partition=true")
spark.sql("set hive.exec.orc.split.strategy=ETL")
spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")

from datetime import datetime, timedelta
def get_date(dt,time_delta=0):
    try:
       result=dt+timedelta(days=-time_delta)
    except:
        try:
           dt = datetime.strptime(dt, "%Y-%m-%d")  # 字串轉化為date形式
        except:
           dt = datetime.strptime(dt, '%Y%m%d')  # 字串轉化為date形式
        result = dt + timedelta(days=-time_delta)
    return str(result.strftime('%Y-%m-%d'))

def insert_tab(df,tab,spark):
    col_target = spark.sql("""select * from {tab} limit 1""".format(tab=tab)).columns
    col=df.columns
    not_in_col=[i for i in col_target if i not in col]
    for i in not_in_col:
        df = df.withColumn(i, F.lit(None))
    df2=df.select(col_target)
    df2.repartition('dt','data_type').write.insertInto(tab, overwrite=True)

def search_dt(partitions_list,dt):
    '''
    如果想要取的分割槽dt在partition_list中,則返回dt,否則返回dt之前最近的一個分割槽
    :param partition_list: 分割槽List
    :param dt: 想要取的分割槽
    :return: 函式最終確定的分割槽dt,字串格式
    '''
    dt=get_date(dt,0)
    if 'ACTIVE' in partitions_list:
        partitions_list.remove('ACTIVE')
    if dt in partitions_list:
        return dt
    dt_date=datetime.strptime(dt, '%Y-%m-%d').date()
    partition_list_lag=[(datetime.strptime(p_dt, '%Y-%m-%d').date()-dt_date).days for p_dt in partitions_list]
    try:
        reuslt=max(list(filter(lambda x:x<0,partition_list_lag)))
    except:
        reuslt=min(list(filter(lambda x:x>0,partition_list_lag)))
    return datetime.strftime(dt_date+timedelta(reuslt),'%Y-%m-%d')

def get_nearest_dt(table_name,dt,spark):
    #檢查是否有dt分割槽,如果沒有,取最近分割槽
    partitions = spark.sql("show partitions %s"%table_name).collect()
    partitions_list = []
    for i in range(len(partitions)):
        dt_tmp = partitions[i]['partition']
        partitions_list.append(dt_tmp[3:])
    dt_result=search_dt(partitions_list,dt)
    return dt_result