工作中 pyspark的小知識點
1、df.na.fill({'欄位名1':'default','欄位名2':'default'}) 對空值進行替換
2、df.dropDuplicaates() 去重根據欄位名進行去重,空參為全部欄位
3、df.subtract(df1) 返回在當前df中出現,並且不在df1中出現的元素,不去重。
4、print time.localtime([timestamp]) 如下格式
time.struct_time(tm_year=2018, tm_mon=10, tm_mday=9, tm_hour=16, tm_min=52, tm_sec=10, tm_wday=1, tm_yday=282, tm_isdst=0)
5、print time.strptime('2018-12-14 15:45:12', '%Y-%m-%d %H:%M:%S') 如下格式
time.struct_time(tm_year=2018, tm_mon=12, tm_mday=14, tm_hour=15, tm_min=45, tm_sec=12, tm_wday=4, tm_yday=348, tm_isdst=-1)
6、時間戳轉格式化時間
time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(timestamp)) # 格式化成2016-03-20 11:45:39形式 print time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) # 格式化成Sat Mar 28 22:24:24 2016形式 print time.strftime("%a %b %d %H:%M:%S %Y", time.localtime())
7、時間轉時間戳
print time.mktime(time.strptime('2018-12-14 15:45:12', '%Y-%m-%d %H:%M:%S'))
8、取某一天的前後多少天
def getPreDate(cp, days):
#轉換為時間
cp_from = (datetime.strptime(str(cp), '%Y%m%d%H') + timedelta(days)).strftime('%Y%m%d%H')
return cp_from
9、os.path.join(path_prefix,'parquet','path') 的使用方法
print os.path.join('/user/bbd/dev/xxxx/xxxx/', 'parquet/', 'ccccc.txt')
/user/bbd/dev/xxxx/xxxx/parquet/ccccc.txt
10、時間取差值的方法,計算兩個時間相差多少天
def get_history_date():
start_date = datetime(2018,10,10)
end_date = datetime(2018,10,31)
print (end_date - start_date).days #返回相差的天
print end_date - start_date #返回相差的
for i in range(0,(end_date - start_date).days):
cp_from = (start_date + timedelta(days=i)).strftime('%Y%m%d00')
cp_end = (end_date + timedelta(days=i + 1)).strftime('%Y%m%d00')
print str(cp_from) + '\t' + str(cp_end)
return: 如下
21
21 days, 0:00:00
2018101000 2018110100
2018101100 2018110200
2018101200 2018110300
。。。。
11、not 就是取反的意思 字串為空,none,數字為0, 物件為空, not後返回的都是真
def not_ceshi():
a = ''
b='as'
c= None
d=0
e=-1
if not a:
print 'a為假'
if b:
print 'b為真'
if not c:
print 'c為假'
if not d:
print 'd為0'
if e:
print 'e為-1'
結果:
a為假
b為真
c為假
d為0
e為-1
12、dataframe的drop,可以先分割槽排序,再對排序進行drop,num為視窗後分區排序的row_number()欄位
df.where(num=='1').drop(num)
返回選擇num為1的,並將num欄位去除
13、sorted 高階函式的使用 如下
def cmp_ignore_case(s1, s2):
if s1.lower()[:1] > s2.lower()[:1]:
return 1
elif s1.lower()[:1] < s2.lower()[:1]:
return -1
else:
return 0
#sorted 重新定義排序,可以只傳入一個引數,給定 (key=比較的引數的自定義函式)
cmp_ignore_case2 = lambda s: s[:1].lower()
print sorted(['bob', 'about', 'Zoo', 'Credit'], key=cmp_ignore_case2)
print sorted(['bob', 'about', 'Zoo', 'Credit'], cmp_ignore_case)
結果返回相同:
['about', 'bob', 'Credit', 'Zoo']
['about', 'bob', 'Credit', 'Zoo']
14、在我們進行dataframe的互操作時,需要用到unionall的時候,如果欄位數量不一致,可為其填充空白欄位,都為空
sql = 'SELECT '' 相同的欄位名, '' 相同的欄位名 。。。'
相當於補全欄位
15、驗證是否個人郵箱,
email_regx = '([a-zA-Z0-9]{1}[a-zA-Z0-9\.\+\-.]{0,63}@{1}[a-zA-Z0-9\-_]+(\.[a-zA-Z0-9\-_]+){1,4})'
def udf_format_email_string(data):
try:
data = data.strip()
email = re.findall(email_regx, data)
print email
if email:
res = email[0][0]
except_email = ['[email protected]','[email protected]','[email protected]','[email protected]','[email protected]','[email protected]'
,'[email protected]','[email protected]','VOICE=','[email protected]','[email protected]','[email protected]','[email protected]']
for item in except_email:
if res.startswith(item):
return ''
return res
except:
pass
return ''
是的話,返回郵箱,不是返回空
這裡有一個知識點,就是re.findall(regx,data),就是這裡的regx,有兩種情況,帶大括號和不帶括號,還有小括號
import re
string="abcdefg acbdgef abcdgfe cadbgfe"
#帶括號與不帶括號的區別
#帶大括號和小括號
regex=re.compile("((\w+)\s+\w+)")
print(regex.findall(string))
#輸出:[('abcdefg acbdgef', 'abcdefg'), ('abcdgfe cadbgfe', 'abcdgfe')]
#只帶小括號
regex1=re.compile("(\w+)\s+\w+")
print(regex1.findall(string))
#輸出:['abcdefg', 'abcdgfe']
#不帶括號
regex2=re.compile("\w+\s+\w+")
print(regex2.findall(string))
#輸出:['abcdefg acbdgef', 'abcdgfe cadbgfe']
return :
regex:返回的是tuple (大括號匹配的,小括號匹配的)
regex1:返回的是小括號裡匹配的
regex2:返回的是全部匹配的
16、在pyspark中,對dataframe進行操作,對某個中文欄位進行max(col)操作,分組後取出現的第一個,類似於根據姓名去重,一個郵箱只能對應一個姓名
select email format_data(max(name)) from email_detal group by email
根據email進行分組,對姓名進行去重,但是有可能姓名有差異
17、一些dataframe的api示例
def get_df():
conf = SparkConf()
spark = SparkSession.builder \
.master("local[*]") \
.appName("get_df") \
.config(conf=conf) \
.enableHiveSupport() \
.getOrCreate()
rdd1 = spark.sparkContext.parallelize([
('make',24,198),
('make',23,198),
('tubu',24,198),
('tubu',23,198),
('mark',24,198),
('mark',23,198),
('uzi',24,198),
('uzi',23,197)
])
rdd2 = spark.sparkContext.parallelize([
('make',24,198),
('tubu',24,198)
])
schema = StructType([
StructField('name',StringType(), True),
StructField('age',LongType(), True),
StructField('hight',LongType(), True)
])
df1 = spark.createDataFrame(rdd1,schema)
df2 = spark.createDataFrame(rdd2,schema)
df1.createTempView('tmp')
sql = 'select *, row_number() over (partition by name,age order by hight) as num from tmp'
sql2 = 'select `name`,age, case when age=24 then age else age+2 end re_age, hight,"table_name" tablename from tmp'
spark.sql(sql).where('num=1').drop("num").show() #刪除某列欄位
df1.subtract(df2).show() #df1中去掉df2中的完全相同的資料
df1.dropDuplicates(['name','age']).show() #去掉指定兩列的重複資料,不加參,為全部列
df1.drop_duplicates(['name','age']).show() #同上
df1.join(df2,df1.name == df2.name,'left').show() #各種join
spark.sql(sql2).drop('age').show() #根據某些的欄位來確定另一個欄位的取值
18、註冊 py檔案 logging 如下
import logging,os
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
用來測試是否有相關資訊
19、將多張表我們需要的欄位清洗進parquet檔案,後面的維度需要哪些自己讀取進行相關過濾就可以了,首先對我們需要對基礎表的資訊瞭解,然後根據欄位含義,對應清洗進對應欄位,比如總共五張表的資料,要寫進總表,可按如下方式
def creat_dataframe(source):
'''獲取每個源表的所有資訊到源資訊總表'''
df = spark.sql("select * from {tablename}".format(tablename=source["table_name"]))
if source.get("before_where"):
df = df.where(source["before_where"])
column_source = OrderedDict(zip(source["column"],source["source"]))
columns = ["%s as %s"%(table_file,table_name) for table_name,table_file in column_source.items()]
df = df.selectExpr(*columns)
if source.get("after_where"):
df = df.where(source["after_where"])
return df
def etl_school():
columns = ["NAME", "SSQX", "XYMC", "TABLENAME", "table_order"]
where = "verify_school(NAME) = 1"
data_source = [
{
"table_name":"tb_gaw_dzxxyryxsjfmxx",
"column":columns,
"source":["DWMC", "SSQX", "''", "'tb_gaw_dzxxyryxsjfmxx'", 5],
"after_where":where
},
{
"table_name":"tb_gaw_jw_xxjzygxx",
"column":columns,
"source":["DWMC", "SSQX", "''", "'tb_gaw_jw_xxjzygxx'", 1],
"after_where":where
},
{
"table_name":"tb_gaw_jw_zxyjs_new",
"column":columns,
"source":["XXMC", "''", "FYMC", "'tb_gaw_jw_zxyjs_new'", 2],
"after_where":where
},
{
"table_name":"tb_gaw_jw_zxdxs_new",
"column":columns,
"source":["XXMC", "''", "FYMC", "'tb_gaw_jw_zxdxs_new'", 3],
"after_where":where
}
]
dfs = map(creat_dataframe, data_source)
df_union = reduce(lambda x, y: x.union(y), dfs)
df = df_union.selectExpr("*", "row_number() over (partition by NAME,XYMC order by table_order asc) as num")
df_tmp = df.where("num=1").drop("num")
df.tmp = df_tmp.where("NAME != XYMC")
write_parquet(df_tmp, "tmp/etl_school")
logger.info("save to /tmp/etl_school success!!")