shell加python實現程式自動化控制
阿新 • • 發佈:2019-02-12
為做到資料的實時傳輸(實時:當前時間傳輸上一個小時的資料),shell用於控制整個流程,python用於處理資料。
shell程式碼如下:#bin/bash ######### ## 篤篤學車4G執行指令碼 ## 編寫者:zhangqm ## 日期:2018-04-04 ## 呼叫方式:nohup sh duduxueche.sh day/hour > ../log/duduxueche.log 2>&1 & ## type資料有兩種:1、day:按天跑 2、hour:按小時跑 ######## # 程式的日誌目錄 log='/data1/u_lx_data/zhangqm/yanjie/dudu/log' # 4G表的HDFS目錄 sourceDataHdfs='/user/db_lte/public/sada_lte_xdr03_103' # MR清單資料,HDFS目錄 mrDataListHdfs='/user/u_lx_data/private/zhangqm/dudu' # MR清單資料,本地目錄 dataList='/data1/u_lx_data/zhangqm/yanjie/dudu/data' # 結果資料 resultList='/data1/u_lx_data/zhangqm/yanjie/dudu/result' # 稽核語句 # 呼叫方式:Check_num 目錄 時間 小時 function Check_num() { if [ $3 == 'day' ];then num=$(hadoop fs -du -h -s $1/$2 |awk -F" " '{print $1}'|sed 's/ //g') echo $num else num=$(hadoop fs -du -h -s $1/$2/$3|awk -F" " '{print $1}'|sed 's/ //g') echo $num fi } # 得到清單資料 # 呼叫方式:Deal_data 時間 小時 function Deal_data() { if [ $2 == 'day' ];then # 拉清單資料 echo '開始處理'${dayTime_1}'天資料。。。' hadoop jar dudu4g_0514.jar ${sourceDataHdfs}/$1/ ${mrDataListHdfs}/$1 # 檢查天資料是否產生 size=$(Check_num ${mrDataListHdfs} $1 $2) if [ ${size} > 0 ] ; then hadoop fs -cat ${mrDataListHdfs}/$1/* > ${dataList}/$1.txt else echo '天資料沒有產生,請核查。。。' fi else # 拉清單資料 echo '開始處理'${dayTime}${hourTime_1}'小時資料。。。' hadoop jar dudu4g_0514.jar ${sourceDataHdfs}/$1/$2 ${mrDataListHdfs}/$1/$2 # 檢查小時資料是否產生 size=$(Check_num ${mrDataListHdfs} $1 $2) if [ ${size} > 0 ] ; then if [ ! -d ${dataList}/$1 ];then mkdir ${dataList}/$1 fi hadoop fs -cat ${mrDataListHdfs}/$1/$2/* > ${dataList}/$1/$2.txt else echo '當前小時資料沒有產生,請核查。。。' fi fi } #按小時跑 function Run_hour() { while true do # 獲取系統當前時間 dayTime=$(date +"%Y%m%d" ) # 獲取系統當前小時的前1小時的小時 hourTime_1=$(date -d "-1 hour" +"%H") if [ ${hourTime_1} -eq 23 ];then dayTime=$(date -d "-1 day " +"%Y%m%d") fi # 測試小時資料是否已經產生 hadoop fs -test -e ${mrDataListHdfs}/${dayTime}/${hourTime_1} #十五分鐘檢測一次,主要是解決跑程式所花費的時間不同問題 if [ $? -eq 0 ] ;then echo '******************************************' echo ${dayTime}${hourTime_1}'小時已經跑過。。。。' echo '等待十五分鐘。。。。。' sleep 900 echo '當前時間為:'$(date +"%Y-%m-%d %H:%M") else echo '******************************************' echo ${dayTime}${hourTime_1}'小時沒有跑過。。。。' echo '當前時間為:'$(date +"%Y-%m-%d %H:%M") Deal_data ${dayTime} ${hourTime_1} if [ ! -d ${resultList}/${dayTime} ];then mkdir ${resultList}/${dayTime} fi echo '得到結果資料,當前時間為:'$(date +"%Y-%m-%d %H:%M") python duduxueche.py ${dataList} ${dayTime} ${hourTime_1} ${resultList} echo ${dayTime}${hourTime_1}'小時全部結束,進入下一個小時的處理。。。。' #把70的資料結果放到231的HDFS上 ./duduxueche.exp ${resultList} ${dayTime} ${hourTime_1} fi # exit 2 done } #按天跑 function Run_day() { while true do # 獲取系統當前時間 dayTime=$(date +"%Y%m%d" ) # 獲取系統時間的前一天的時間 dayTime_1=$(date -d "-1 day $dayTime" +"%Y%m%d") # 測試天資料是否已經產生 hadoop fs -test -e ${mrDataListHdfs}/${dayTime_1} #8小時檢測一次,主要是解決跑程式所花費的時間不同問題 if [ $? -eq 0 ] ;then echo '******************************************' echo ${dayTime_1}'天已經跑過。。。。' echo '等待8個小時。。。。。' sleep 28800 echo '當前時間為:'$(date +"%Y-%m-%d %H:%M") else echo '******************************************' echo ${dayTime_1}'天沒有跑過。。。。' echo '當前時間為:'$(date +"%Y-%m-%d %H:%M") Deal_data ${dayTime_1} $1 #if [ ! -d ${resultList}/${dayTime} ];then # mkdir ${resultList}/${dayTime} #fi echo '得到結果資料,當前時間為:'$(date +"%Y-%m-%d %H:%M") python duduxueche.py ${dataList} ${dayTime} ${hourTime_1} ${resultList} #echo ${dayTime}${hourTime_1}'小時全部結束,進入下一個小時的處理。。。。' # 把70的資料結果放到231的HDFS上 #./duduxueche.exp ${resultList} ${dayTime} ${hourTime_1} fi # exit 2 done } if [ $1 == 'hour' ] ;then Run_hour else Run_day $1 fi
# -*- coding:utf-8 -*- from datetime import datetime import pandas as pd import sys #***************** ##按口徑出的phone結果 # 口徑:1)關鍵詞且官網 2)關鍵詞且app 3)關鍵詞訪問大於1 4)官網訪問大於1 #***************** def Main(): dataList = sys.argv[1]+"/"+ sys.argv[2]+"/"+sys.argv[3]+".txt" # 對照表清單資料(加密和不加密手機號對應關係) mapRuletxt = sys.argv[1]+"/"+"mapRuletxt.txt" targetTxt = sys.argv[4]+"/"+ sys.argv[2]+"/"+sys.argv[3]+".txt" # 儲存最終解果 phoneSet = set() # 儲存有關鍵詞行為的phone kwPhoneSet = set() # 儲存有官網行為的phone webPhoneSet = set() # app行為 appPhoneSet = set() #儲存對照表清單資料 dict={} uname = ['phone', 'time', 'name'] # 找官網的資料 def web(str): if str.find('app') == -1 and str.find('kw') == -1: return str print("開始。。。。。") print(datetime.now().strftime('%Y-%m-%d %H:%M:%S')) with open(mapRuletxt,'r') as fr: for line in fr: line = line.strip().split("\t") dict[line[1]] = line[0] with open(dataList,'r') as fr: for line in fr: line = line.strip().split("\t") if line[2].find('app') != -1: # app的 appPhoneSet.add(line[0]) elif line[2].find('kw') != -1: #關鍵詞搜尋的 kwPhoneSet.add(line[0]) else: # 官網訪問行為的 webPhoneSet.add(line[0]) # 有過關鍵詞搜尋行為且有過官網訪問行為的phone for kwphone in kwPhoneSet: if kwphone in webPhoneSet: phoneSet.add(kwphone) # 有過關鍵詞搜尋行為且有過app行為的phone for kwphone in kwPhoneSet: if kwphone in appPhoneSet: phoneSet.add(kwphone) # 關鍵詞訪問大於1的phone df = pd.read_table(dataList, sep="\t", header=None, names=uname, index_col=False)[['phone', 'name']] kw = df[df.name.str.contains('kw')].groupby('phone')['name'].agg([('uv',pd.Series.nunique)]).reset_index() for phone in kw[kw.uv > 1]['phone']: phoneSet.add(phone) # 官網訪問大於1的phone web = df.applymap(web).dropna().groupby('phone')['name'].agg([('uv',pd.Series.nunique)]).reset_index() for phone in web[web.uv > 1]['phone']: phoneSet.add(phone) with open(targetTxt,'w+') as fw: for phone in phoneSet: if phone in dict: fw.write(dict[phone]+"\n") print("結束。。。。。") print(datetime.now().strftime('%Y-%m-%d %H:%M:%S')) if __name__ == "__main__": Main()