1. 程式人生 > >工作中 pyspark的小知識點

工作中 pyspark的小知識點

1、df.na.fill({'欄位名1':'default','欄位名2':'default'})   對空值進行替換

2、df.dropDuplicaates()    去重根據欄位名進行去重,空參為全部欄位

3、df.subtract(df1)     返回在當前df中出現,並且不在df1中出現的元素,不去重。

4、print time.localtime([timestamp])    如下格式

time.struct_time(tm_year=2018, tm_mon=10, tm_mday=9, tm_hour=16, tm_min=52, tm_sec=10, tm_wday=1, tm_yday=282, tm_isdst=0)

5、print time.strptime('2018-12-14 15:45:12', '%Y-%m-%d %H:%M:%S')   如下格式

time.struct_time(tm_year=2018, tm_mon=12, tm_mday=14, tm_hour=15, tm_min=45, tm_sec=12, tm_wday=4, tm_yday=348, tm_isdst=-1)

6、時間戳轉格式化時間

time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(timestamp))

# 格式化成2016-03-20 11:45:39形式
print time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())

# 格式化成Sat Mar 28 22:24:24 2016形式
print time.strftime("%a %b %d %H:%M:%S %Y", time.localtime())

7、時間轉時間戳

print time.mktime(time.strptime('2018-12-14 15:45:12', '%Y-%m-%d %H:%M:%S'))

8、取某一天的前後多少天

def getPreDate(cp, days):
                #轉換為時間
    cp_from = (datetime.strptime(str(cp), '%Y%m%d%H') + timedelta(days)).strftime('%Y%m%d%H')
    return cp_from

9、os.path.join(path_prefix,'parquet','path')     的使用方法

print os.path.join('/user/bbd/dev/xxxx/xxxx/', 'parquet/', 'ccccc.txt')

/user/bbd/dev/xxxx/xxxx/parquet/ccccc.txt

10、時間取差值的方法,計算兩個時間相差多少天

def get_history_date():
    start_date = datetime(2018,10,10)
    end_date = datetime(2018,10,31)        
    print (end_date - start_date).days   #返回相差的天
    print end_date - start_date            #返回相差的
    for i in range(0,(end_date - start_date).days):
        cp_from = (start_date + timedelta(days=i)).strftime('%Y%m%d00')
        cp_end = (end_date + timedelta(days=i + 1)).strftime('%Y%m%d00')

        print str(cp_from) + '\t' + str(cp_end)


return: 如下
        21
        21 days, 0:00:00
        2018101000	2018110100
        2018101100	2018110200
        2018101200	2018110300
        。。。。

11、not  就是取反的意思   字串為空,none,數字為0, 物件為空, not後返回的都是真

def not_ceshi():
    a = ''
    b='as'
    c= None
    d=0
    e=-1
    if not a:
        print 'a為假'
    if b:
        print 'b為真'
    if not c:
        print 'c為假'
    if not d:
        print 'd為0'
    if e:
        print 'e為-1'


結果:
    a為假
    b為真
    c為假
    d為0
    e為-1

12、dataframe的drop,可以先分割槽排序,再對排序進行drop,num為視窗後分區排序的row_number()欄位

df.where(num=='1').drop(num)


返回選擇num為1的,並將num欄位去除

13、sorted 高階函式的使用    如下

def cmp_ignore_case(s1, s2):

    if s1.lower()[:1] > s2.lower()[:1]:
        return 1
    elif s1.lower()[:1] < s2.lower()[:1]:
        return -1
    else:
        return 0
#sorted 重新定義排序,可以只傳入一個引數,給定 (key=比較的引數的自定義函式)
cmp_ignore_case2 = lambda s: s[:1].lower()


print sorted(['bob', 'about', 'Zoo', 'Credit'], key=cmp_ignore_case2)
print sorted(['bob', 'about', 'Zoo', 'Credit'], cmp_ignore_case)


結果返回相同:
    ['about', 'bob', 'Credit', 'Zoo']
    ['about', 'bob', 'Credit', 'Zoo']

14、在我們進行dataframe的互操作時,需要用到unionall的時候,如果欄位數量不一致,可為其填充空白欄位,都為空

sql = 'SELECT '' 相同的欄位名, '' 相同的欄位名 。。。'

相當於補全欄位

15、驗證是否個人郵箱,

email_regx = '([a-zA-Z0-9]{1}[a-zA-Z0-9\.\+\-.]{0,63}@{1}[a-zA-Z0-9\-_]+(\.[a-zA-Z0-9\-_]+){1,4})'
def udf_format_email_string(data):
    try:
        data = data.strip()
        email = re.findall(email_regx, data)
        print email
        if email:
            res = email[0][0]
            except_email = ['[email protected]','[email protected]','[email protected]','[email protected]','[email protected]','[email protected]'
                            ,'[email protected]','[email protected]','VOICE=','[email protected]','[email protected]','[email protected]','[email protected]']
            
            for item in except_email:
                if res.startswith(item):
                    return ''
            return res
    except:
        pass
    return ''

是的話,返回郵箱,不是返回空

這裡有一個知識點,就是re.findall(regx,data),就是這裡的regx,有兩種情況,帶大括號和不帶括號,還有小括號

import re

string="abcdefg  acbdgef  abcdgfe  cadbgfe"

#帶括號與不帶括號的區別
#帶大括號和小括號
regex=re.compile("((\w+)\s+\w+)")
print(regex.findall(string))
#輸出:[('abcdefg  acbdgef', 'abcdefg'), ('abcdgfe  cadbgfe', 'abcdgfe')]

#只帶小括號
regex1=re.compile("(\w+)\s+\w+")
print(regex1.findall(string))
#輸出:['abcdefg', 'abcdgfe']

#不帶括號
regex2=re.compile("\w+\s+\w+")
print(regex2.findall(string))
#輸出:['abcdefg  acbdgef', 'abcdgfe  cadbgfe']

return :

    regex:返回的是tuple (大括號匹配的,小括號匹配的)
    regex1:返回的是小括號裡匹配的
    regex2:返回的是全部匹配的

16、在pyspark中,對dataframe進行操作,對某個中文欄位進行max(col)操作,分組後取出現的第一個,類似於根據姓名去重,一個郵箱只能對應一個姓名

select email format_data(max(name)) from email_detal group by email


根據email進行分組,對姓名進行去重,但是有可能姓名有差異

17、一些dataframe的api示例

def get_df():

    conf = SparkConf()

    spark = SparkSession.builder \
        .master("local[*]") \
        .appName("get_df") \
        .config(conf=conf) \
        .enableHiveSupport() \
        .getOrCreate()

    rdd1 = spark.sparkContext.parallelize([
        ('make',24,198),
        ('make',23,198),
        ('tubu',24,198),
        ('tubu',23,198),
        ('mark',24,198),
        ('mark',23,198),
        ('uzi',24,198),
        ('uzi',23,197)
    ])
    rdd2 = spark.sparkContext.parallelize([
        ('make',24,198),
        ('tubu',24,198)
    ])

    schema = StructType([
        StructField('name',StringType(), True),
        StructField('age',LongType(), True),
        StructField('hight',LongType(), True)
    ])
    df1 = spark.createDataFrame(rdd1,schema)
    df2 = spark.createDataFrame(rdd2,schema)

    df1.createTempView('tmp')

    sql = 'select *, row_number() over (partition by name,age order by hight) as num from tmp'
    sql2 = 'select `name`,age, case when age=24 then age else age+2 end  re_age, hight,"table_name" tablename from tmp'
    spark.sql(sql).where('num=1').drop("num").show()   #刪除某列欄位
    df1.subtract(df2).show()                #df1中去掉df2中的完全相同的資料
    df1.dropDuplicates(['name','age']).show()  #去掉指定兩列的重複資料,不加參,為全部列
    df1.drop_duplicates(['name','age']).show()    #同上
    df1.join(df2,df1.name == df2.name,'left').show()   #各種join   
    spark.sql(sql2).drop('age').show()        #根據某些的欄位來確定另一個欄位的取值

18、註冊 py檔案  logging   如下

import logging,os
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)


用來測試是否有相關資訊

19、將多張表我們需要的欄位清洗進parquet檔案,後面的維度需要哪些自己讀取進行相關過濾就可以了,首先對我們需要對基礎表的資訊瞭解,然後根據欄位含義,對應清洗進對應欄位,比如總共五張表的資料,要寫進總表,可按如下方式

def creat_dataframe(source):
    '''獲取每個源表的所有資訊到源資訊總表'''
    df = spark.sql("select * from {tablename}".format(tablename=source["table_name"]))
    if source.get("before_where"):
        df = df.where(source["before_where"])
    column_source = OrderedDict(zip(source["column"],source["source"]))
    columns = ["%s as %s"%(table_file,table_name) for table_name,table_file in column_source.items()]
    df = df.selectExpr(*columns)
    if source.get("after_where"):
        df = df.where(source["after_where"])
    return df

def etl_school():
    columns = ["NAME", "SSQX", "XYMC", "TABLENAME", "table_order"]
    where = "verify_school(NAME) = 1"

    data_source = [
        {
            "table_name":"tb_gaw_dzxxyryxsjfmxx",
            "column":columns,
            "source":["DWMC", "SSQX", "''", "'tb_gaw_dzxxyryxsjfmxx'", 5],
            "after_where":where
        },
        {
            "table_name":"tb_gaw_jw_xxjzygxx",
            "column":columns,
            "source":["DWMC", "SSQX", "''", "'tb_gaw_jw_xxjzygxx'", 1],
            "after_where":where
        },
        {
            "table_name":"tb_gaw_jw_zxyjs_new",
            "column":columns,
            "source":["XXMC", "''", "FYMC", "'tb_gaw_jw_zxyjs_new'", 2],
            "after_where":where
        },
        {
            "table_name":"tb_gaw_jw_zxdxs_new",
            "column":columns,
            "source":["XXMC", "''", "FYMC", "'tb_gaw_jw_zxdxs_new'", 3],
            "after_where":where
        }
    ]

    dfs = map(creat_dataframe, data_source)
    df_union = reduce(lambda x, y: x.union(y), dfs)
    df = df_union.selectExpr("*", "row_number() over (partition by NAME,XYMC order by table_order asc) as num")
    df_tmp = df.where("num=1").drop("num")
    df.tmp = df_tmp.where("NAME !=  XYMC")

    write_parquet(df_tmp, "tmp/etl_school")
    logger.info("save to /tmp/etl_school success!!")