Python例項(持續更新中)
阿新 • • 發佈:2022-05-19
目錄
- 一、資料篩選:篩選出某列含有特定值的記錄
- 二、資料處理:某列資料補0
- 三、資料處理:統計工單派發和質檢資訊
- 四、資料處理:四層六域月粒度檔案處理
- 五、資料處理:四量七費日粒度檔案轉化成月粒度
- 六、資料處理:計算長期在庫業務量TOP5記錄
- 七、資料處理:資料工單生成與附件遷移
- 八、資料處理:按行分割檔案並保留表頭
一、資料篩選:篩選出某列含有特定值的記錄
1、DPI資料
# app/ip的指標/標籤值資料篩選出包含重點城市的記錄 import pandas as pd from pathlib import Path import sys import os import chardet import datetime from dateutil.relativedelta import relativedelta def turn(file): with open(file, 'rb') as f: data = f.read() encoding = chardet.detect(data)['encoding'] data_str = data.decode(encoding) tp = 'LF' if '\r\n' in data_str: tp = 'CRLF' data_str = data_str.replace('\r\n', '\n') if encoding not in ['utf-8', 'ascii'] or tp == 'CRLF': with open(file, 'w', newline='\n', encoding='utf-8') as f: f.write(data_str) print(f"{file}: ({tp},{encoding}) trun to (LF,utf-8) success!") citys = ["北京市","廣州市","上海市","天津市","重慶市","瀋陽市","南京市","武漢市", "成都市","西安市","石家莊市","太原市","鄭州市","長春市","哈爾濱市","呼和浩特市","濟南市","合肥市","杭州市", "福州市","長沙市","南寧市","南昌市","貴陽市","昆明市","拉薩市","海口市","蘭州市","銀川市","西寧市","烏魯木齊市", "深圳市","蘇州市","東莞市","寧波市","青島市","溫州市","佛山市","無錫市","金華市","泉州市","大連市","廈門市","台州市"] address = '|'.join(citys) day6 = (datetime.datetime.now() + relativedelta(days=-6)).strftime("%Y%m%d") print(day6) for appType in ['bad_app_mark','bad_app']: intputApp = '/data/mytest/indicator/collection/dpi/sysk_test/' + day6 + '/'+appType for p in Path(intputApp).iterdir(): for s in p.rglob('*.csv'): # print(s) df = pd.read_csv(s,header=None,index_col=False,names = ['date','prov','city','big','small','a','b','c','d','e','f','g','h','i','j','k','l','m'],sep = '|') df_new = df[df['city'].str.contains(address)] df_new.to_csv(s,index=0,header=0,sep='|') turn(s) print("app done") for ipType in ['bad_ip_mark','bad_ip']: intputIp = '/data/mytest/indicator/collection/dpi/sysk_test/' + day6 + '/'+ipType for p in Path(intputIp).iterdir(): for s in p.rglob('*.csv'): # print(s) df = pd.read_csv(s,header=None,index_col=False,names = ['date','prov','city','big','small','a','b','c','d','e','f','g','h','i','j','k','l','m','n','o'],sep = '|') df_new = df[df['city'].str.contains(address)] df_new.to_csv(s,index=0,header=0,sep='|') turn(s) print("done")
2、中間表資料
# 重新計算業務庫出入庫:中間表和出入庫表篩選出只含京津滬的記錄 import pandas as pd from functools import reduce import chardet def turn(file): with open(file, 'rb') as f: data = f.read() encoding = chardet.detect(data)['encoding'] data_str = data.decode(encoding) tp = 'LF' if '\r\n' in data_str: tp = 'CRLF' data_str = data_str.replace('\r\n', '\n') if encoding not in ['utf-8', 'ascii'] or tp == 'CRLF': with open(file, 'w', newline='\n', encoding='utf-8') as f: f.write(data_str) print(f"{file}: ({tp},{encoding}) trun to (LF,utf-8) success!") pd.set_option('expand_frame_repr', False) middle = "D:\\cmdi\\sysk\\data_filter\\20220330\\intermediate_data\\partition=質差業務庫\\part-00000-317d2028-fe98-423e-a639-dbc1853b0807.c000.txt" middle_out = "C:\\Users\\Dell\\Desktop\\20220330_new\\middle\\part-00000-317d2028-fe98-423e-a639-dbc1853b0807.c000.txt" middle_app = "D:\\cmdi\\sysk\\data_filter\\20220330\\intermediate_data_business\\app\\part-00000-8c683085-b6dd-42f9-8073-8cf9f9387852.c000.txt" middle_out_app = "C:\\Users\\Dell\\Desktop\\20220330_new\\middle_app\\part-00000-8c683085-b6dd-42f9-8073-8cf9f9387852.c000.txt" middle_ip = "D:\\cmdi\\sysk\\data_filter\\20220330\\intermediate_data_business\\ip\\part-00000-8c683085-b6dd-42f9-8073-8cf9f9387852.c000.txt" middle_out_ip = "C:\\Users\\Dell\\Desktop\\20220330_new\\middle_ip\\part-00000-8c683085-b6dd-42f9-8073-8cf9f9387852.c000.txt" outin = "D:\\cmdi\\sysk\\data_filter\\20220330\\outin_data\\partition=質差業務庫\\part-00000-9c9cdf0f-cf95-4703-ad54-6f0b7ffe667e.c000.txt" outin_out = "C:\\Users\\Dell\\Desktop\\20220330_new\\outin\\part-00000-9c9cdf0f-cf95-4703-ad54-6f0b7ffe667e.c000.txt" outin_app = "D:\\cmdi\\sysk\\data_filter\\20220330\\outin_data_business\\app\\part-00000-b50d8aa8-b04d-4da5-ae34-f8cf9d2cb760.c000.txt" outin_out_app = "C:\\Users\\Dell\\Desktop\\20220330_new\\outin_app\\part-00000-b50d8aa8-b04d-4da5-ae34-f8cf9d2cb760.c000.txt" outin_ip = "D:\\cmdi\\sysk\\data_filter\\20220330\\outin_data_business\\ip\\part-00000-b50d8aa8-b04d-4da5-ae34-f8cf9d2cb760.c000.txt" outin_out_ip = "C:\\Users\\Dell\\Desktop\\20220330_new\\outin_ip\\part-00000-b50d8aa8-b04d-4da5-ae34-f8cf9d2cb760.c000.txt" # OUTINTYPE|KEY|省份|城市|BUSINESSTYPE|入庫時間|最新入庫時間|出庫時間|是否在庫|是否長期在庫|是否頻繁出入庫|星級|ARPU|指標 df = pd.read_csv(middle_ip, header=None, index_col=False, sep="|", names=["type","key","prov","city","business","inTime","newinTime","outTime","in","longIn","frequentIn","star","arpu","indicator"]) # OUTINTYPE|KEY|ADDR|TYPE|IPADDR|PORT|省份|城市|STAR|ARPU|BUSINESSTYPE|入庫時間|最新入庫時間|出庫時間|是否在庫|是否長期在庫|是否頻繁出入庫|指標 # df = pd.read_csv(outin, header=None, index_col=False, sep="|", names=["outintype","key","addr","type","ip","port","prov","city","star","arpu","businessType","inTime","newinTime","outTime","in","longIn","frequentIn","indicator"]) s = middle_out_ip shDf = df[df['prov'] == '上海'] bjDf = df[df['prov'] == '北京'] tjDf = df[df['prov'] == '天津'] print(len(shDf)) print(len(bjDf)) print(len(tjDf)) # dfs = [df1, df2, df3] sc = pd.concat([shDf, bjDf], ignore_index=True) resDf = pd.concat([sc, tjDf],ignore_index=True) resDf.to_csv(s, index=0, header=0, sep='|') turn(s) print(len(resDf)) print(resDf.head()) print("done")
二、資料處理:某列資料補0
1、中間表資料
# 中間表業務大類補0 import pandas as pd import numpy as np from pathlib import Path import sys import os import chardet def turn(file): with open(file, 'rb') as f: data = f.read() encoding = chardet.detect(data)['encoding'] data_str = data.decode(encoding) tp = 'LF' if '\r\n' in data_str: tp = 'CRLF' data_str = data_str.replace('\r\n', '\n') if encoding not in ['utf-8', 'ascii'] or tp == 'CRLF': with open(file, 'w', newline='\n', encoding='utf-8') as f: f.write(data_str) print(f"{file}: ({tp},{encoding}) trun to (LF,utf-8) success!") pd.set_option('expand_frame_repr', False) # inputPath = "C:\\Users\\Dell\\Desktop\\intermediate_data\\20220225\\partition=質差業務庫\\part-00000-c51b5c05-6be8-4996-917f-8b25de278967.c000.txt" # inputPath = "C:\\Users\\Dell\\Desktop\\intermediate_data_business\\20220225\\partition=移動上網-BAD_APP\\part-00000-911a4850-4699-44ea-a4f4-5d6697582c13.c000.txt" inputPath = "C:\\Users\\Dell\\Desktop\\intermediate_data_business\\20220225\\partition=移動上網-BAD_IP\\part-00000-911a4850-4699-44ea-a4f4-5d6697582c13.c000.txt" df = pd.read_csv(inputPath,header=None,index_col=False,names = ['business','app','prov','city','type','a','b','c','d','e','f','g','h','i'],sep = '|') # 拆分 business_name = ['one','two','three','four','five','six'] business_col = df['app'].str.split('_', expand=True) business_col.columns = business_name df = df.join(business_col) # 補0 df['six'] = df['six'].str.zfill(5) #合併 df['app'] = "_"+df['two'].map(str)+"_"+df['three'].map(str)+"__"+df['five'].map(str)+"_"+df['six'].map(str) #還原 df.drop('one', axis=1, inplace=True) df.drop('two', axis=1, inplace=True) df.drop('three', axis=1, inplace=True) df.drop('four', axis=1, inplace=True) df.drop('five', axis=1, inplace=True) df.drop('six', axis=1, inplace=True) print(df.head()) outputPath = "./output/ip_part-00000-911a4850-4699-44ea-a4f4-5d6697582c13.c000.txt" df.to_csv(outputPath,index=0,header=0,sep='|') turn(outputPath)
2、DPI資料
# app/ip資料業務小類補0
import pandas as pd
from pathlib import Path
import sys
import os
import chardet
def turn(file):
with open(file, 'rb') as f:
data = f.read()
encoding = chardet.detect(data)['encoding']
data_str = data.decode(encoding)
tp = 'LF'
if '\r\n' in data_str:
tp = 'CRLF'
data_str = data_str.replace('\r\n', '\n')
if encoding not in ['utf-8', 'ascii'] or tp == 'CRLF':
with open(file, 'w', newline='\n', encoding='utf-8') as f:
f.write(data_str)
print(f"{file}: ({tp},{encoding}) trun to (LF,utf-8) success!")
day6='20220401'
for appType in ['bad_app_mark','bad_app']:
intputApp = '/data/mytest/indicator/collection/dpi/sysk_test/' + day6 + '/'+appType
for p in Path(intputApp).iterdir():
for s in p.rglob('*.csv'):
# print(s)
df = pd.read_csv(s,header=None,index_col=False,names = ['date','prov','city','big','small','a','b','c','d','e','f','g','h','i','j','k','l','m'],sep = '|')
df['small'] = df['small'].astype('str')
df['small'] = df['small'].str.zfill(5)
# print(df)
df.to_csv(s,index=0,header=0,sep='|')
turn(s)
for ipType in ['bad_ip_mark','bad_ip']:
intputIp = '/data/mytest/indicator/collection/dpi/sysk_test/' + day6 + '/'+ipType
for p in Path(intputIp).iterdir():
for s in p.rglob('*.csv'):
# print(s)
df = pd.read_csv(s,header=None,index_col=False,names = ['date','prov','city','big','small','a','b','c','d','e','f','g','h','i','j','k','l','m','n','o'],sep = '|')
df['small'] = df['small'].astype('str')
df['small'] = df['small'].str.zfill(5)
# print(df)
df.to_csv(s,index=0,header=0,sep='|')
turn(s)
三、資料處理:統計工單派發和質檢資訊
# 統計工單派發和質檢情況
from pathlib import Path
import pandas as pd
import json
import time
from pandas.core.frame import DataFrame
prov_dict = {551: '安徽', 100: '北京', 230: '重慶', 591: '福建', 200: '廣東', 931: '甘肅', 771: '廣西',
851: '貴州', 371: '河南', 270: '湖北', 311: '河北', 898: '海南', 451: '黑龍江', 731: '湖南',
431: '吉林', 250: '江蘇', 791: '江西', 240: '遼寧', 471: '內蒙古', 951: '寧夏', 971: '青海',
280: '四川', 531: '山東', 210: '上海', 290: '陝西', 351: '山西', 220: '天津', 991: '新疆',
891: '西藏', 871: '雲南', 571: '浙江'}
type_dict = {0o1: '使用者', 0o2: '業務', 0o3: '位置', 0o4: '網元'}
# 獲取工單派發相關資訊
order = []
for p in Path('/home/liuge/laizhengyang/calc_inspect/attach').iterdir():
if(p.name.startswith("附件1") & p.name.endswith("T1.csv")):
qdWsid = p.name.split("_")[1]
s = p.name.split("_")[1].split("-")
del s[3]
s.insert(0, qdWsid)
order.append(s)
orderDF = DataFrame(order, columns=['qdWsid','date','type','prov'])
orderDF['type'] = orderDF['type'].apply(pd.to_numeric)
orderDF['prov'] = orderDF['prov'].apply(pd.to_numeric)
orderDF['type'] = orderDF['type'].map(lambda x: type_dict[x])
orderDF['prov'] = orderDF['prov'].map(lambda x: prov_dict[x])
#獲取工單質檢相關資訊
inspect = []
for p in Path('/home/liuge/laizhengyang/calc_inspect/inspect').iterdir():
# 載入檔案
with open(p, 'r', encoding='utf-8', errors='ignore') as f:
rows = json.load(f)
length = len(rows)
info = []
if length == 1:
qdWsid = rows[0]['qdWsid']
second = '空'
if rows[0].__contains__('issueSolvedSecond'):
second = rows[0]['issueSolvedSecond']
if len(second) == 0:
second = '空'
first = rows[0]['issueSolvedFirst']
info=[qdWsid, first, second]
inspect.append(info)
else:
for i in range(0, length):
qdWsid = rows[i]['qdWsid']
second = '空'
if rows[i].__contains__('issueSolvedSecond'):
second = rows[i]['issueSolvedSecond']
if len(second) == 0:
second = '空'
first = rows[i]['issueSolvedFirst']
info = [qdWsid, first, second]
inspect.append(info)
print(inspect)
inspectDF = DataFrame(inspect, columns=['qdWsid', '第一次質檢', '第二次質檢'])
inspectDF.loc[inspectDF['第一次質檢'] == '是', '第二次質檢'] = '歸檔'
# 清除重複情況
# 空後面必然跟著第二次質檢,因此這是一種重複情況?此處邏輯有問題,把第一次質檢完成尚未完成第二次質檢的排除了
inspectDF.drop(inspectDF.index[(inspectDF['第二次質檢'] == '空')], inplace=True)
inspectDF = inspectDF.drop_duplicates(['qdWsid'])
# 左聯結
outputDF = pd.merge(orderDF,inspectDF,how="left")
today = time.strftime("%Y%m%d", time.localtime())
writer = pd.ExcelWriter('/home/liuge/laizhengyang/calc_inspect/{}.xlsx'.format(today),engine='openpyxl')
outputDF.to_excel(writer, sheet_name='info',index=False)
orderDF.to_excel(writer, sheet_name='order',index=False)
inspectDF.to_excel(writer, sheet_name='inspect',index=False)
writer.save()
print('Done')
四、資料處理:四層六域月粒度檔案處理
# 四層六域月粒度資料修改:檔名修改.列名由中文改為英文,增加兩列
import pandas as pd
import chardet
import datetime
from dateutil.relativedelta import relativedelta
def turn(file):
with open(file, 'rb') as f:
data = f.read()
encoding = chardet.detect(data)['encoding']
data_str = data.decode(encoding)
tp = 'LF'
if '\r\n' in data_str:
tp = 'CRLF'
data_str = data_str.replace('\r\n', '\n')
if encoding not in ['utf-8', 'ascii'] or tp == 'CRLF':
with open(file, 'w', newline='\n', encoding='utf-8') as f:
f.write(data_str)
print(f"{file}: ({tp},{encoding}) trun to (LF,utf-8) success!")
pd.set_option('expand_frame_repr', False)
scly = {'省份': 'PROVINCE', '城市': 'CITY', '日期': 'DATE_TIME', '日期型別': 'DATE_TYPE',
'VoLTE全程呼叫成功率': 'B05D03S004I01100',
'VoLTE語音呼叫建立時延': 'B05D03S109I00300',
'VoLTE語音質差通話佔比': 'B05D03S004I01200',
'5G語音回落接通率': 'B05D03S004I01300',
'5G語音回落接通時延': 'B05D03S004I01400',
'支付響應成功率': 'B05D03S116I00100',
'支付業務響應總時延': 'B05D03S116I00300',
'即時通訊響應總時延': 'B05D03S136I00500',
'即時通訊訊息傳送成功率': 'B05D03S136I00300',
'即時通訊訊息接收成功率': 'B05D03S136I00400',
'視訊播放成功率': 'B05D03S107I00400',
'視訊播放等待時長': 'B05D03S107I00500',
'視訊播放卡頓時長佔比': 'B05D03S107I00600',
'視訊播放平均卡頓次數': 'B05D03S028I00100',
'視訊播放流暢度': 'B05D03S028I00200',
'視訊業務響應成功率': 'B18D13S005I00300',
'視訊業務響應總時延': 'B18D13S005I00900',
'視訊上行RTT時延': 'B18D13S005I00500',
'視訊下行RTT時延': 'B18D13S005I00600',
'網頁瀏覽成功率': 'B05D03S107I00200',
'網頁開啟時長': 'B05D03S107I00300',
'遊戲響應成功率': 'B05D03S005I00700',
'遊戲響應總時延': 'B05D03S036I00300',
'遊戲載入時延': 'B05D03S036I00200',
'遊戲上行RTT時延': 'B18D13S005I00700',
'遊戲下行RTT時延': 'B18D13S005I00800',
'5G訊息受理成功率': 'B05D03S135I00300',
'5G訊息下發成功率': 'B05D03S135I00400',
'家寬裝機及時率': 'B06D04S031I00300',
'家寬投訴處理及時率': 'B06D04S006I00200',
'VoLTE語音網路接通率': 'B05D03S004I02000',
'TCP上行重傳率': 'B05D03S005I03300',
'TCP下行重傳率': 'B05D03S005I03400',
'TCP上行亂序率': 'B05D03S005I03500',
'TCP下行亂序率': 'B05D03S005I03600',
'HTTP響應成功率': 'B05D03S005I02400',
'HTTP響應時延': 'B05D03S005I02500',
'SA排除使用者原因的初始註冊成功率': 'B05D03S005I02900',
'AMF業務請求成功率': 'B05D03S082I00200',
'ToBAMF使用者鑑權成功率': 'B05D03S082I00500',
'PDU會話建立成功率': 'B05D03S082I00300',
'5G尋呼成功率': 'B05D03S082I00400',
'5G流量分流比': 'B18D03S086I00100',
}
# inputPath = "C:\\Users\\Dell\\Desktop\\ori\\scly-m-202201-001.csv"
month = datetime.date.strftime(datetime.date.today() - relativedelta(months=1), '%Y%m')
inputPath = '/data/mytest/indicator/collection/jzxn/scly-slqf/scly-m-'+month+'-001.csv'
outputPath = '/data/mytest/indicator/collection/ods_slsw/scly/slsw_scly_m_'+month+'.csv'
df = pd.read_csv(inputPath,index_col=False,sep = '|')
# 將中文表頭轉化為英文
df = df.rename(columns=scly)
# 新增attribute列
df.insert(4, 'ATTRIBUTE1', '彙總')
df.insert(5, 'ATTRIBUTE2', '彙總')
df.to_csv(outputPath,index=0,sep='|')
turn(outputPath)
五、資料處理:四量七費日粒度檔案轉化成月粒度
# 四量七費月粒度資料修改:日粒度檔案轉化成月粒度,涉及異常值處理,groupby使用
#!/usr/local/bin/python
# -*- coding:utf-8 -*-
import pandas as pd
from pathlib import Path
import chardet
import datetime
from dateutil.relativedelta import relativedelta
def turn(file):
with open(file, 'rb') as f:
data = f.read()
encoding = chardet.detect(data)['encoding']
data_str = data.decode(encoding)
tp = 'LF'
if '\r\n' in data_str:
tp = 'CRLF'
data_str = data_str.replace('\r\n', '\n')
if encoding not in ['utf-8', 'ascii'] or tp == 'CRLF':
with open(file, 'w', newline='\n', encoding='utf-8') as f:
f.write(data_str)
print(f"{file}: ({tp},{encoding}) trun to (LF,utf-8) success!")
indicator = ['B18D17S086I00180', 'B18D17S086I00280', 'B18D17S086I04080', 'B18D17S086I03280', 'B13D17S086I02480',
'B13D17S086I02980', 'B18D17S086I00680', 'B18D17S086I00780', 'B18D17S086I04180', 'B18D17S086I00980',
'B18D17S086I01080', 'B18D17S086I03380', 'B18D17S086I03480', 'B18D17S088I01100', 'B18D17S088I01200']
# 'B18D17S086I00380', 'B18D17S086I00880'
pd.set_option('expand_frame_repr', False)
# month = datetime.date.strftime(datetime.date.today() - relativedelta(months=1), '%Y%m')
month = '202202'
df = pd.DataFrame()
# for p in Path('/data/mytest/indicator/collection/jzxn/scly-slqf').iterdir():
for p in Path("C:\\Users\\Dell\\Desktop\\需求梳理\\ori\\slqf").iterdir():
if p.name.__contains__(month) & p.name.__contains__("slqf"):
inputPath = p
df_tmp = pd.read_csv(inputPath, index_col=False, sep='|')
sc = df_tmp.drop(df_tmp.index[[0, 1]])
df = df.append(sc)
# df.to_csv('/data/mytest/indicator/collection/ods_slsw/slqf_collect/slqf_'+month+'_combine.csv', index=0, sep='|')
# df.to_csv("C:\\Users\\Dell\\Desktop\\original_data_"+month+".csv", index=0, sep='|')
df[indicator] = df[indicator].astype(float)
df = df.reset_index(drop=True)
B18D17S086I00180 = (df['B18D17S086I00180'].groupby(df['CITY']).mean()/1024/1024).round(6)
decideTime = df[df["B18D17S086I00280"] == df["B18D17S086I00280"].max()]['DATE_TIME']
timeStr = decideTime.values[0]
B18D17S086I00280 = df[df.DATE_TIME == timeStr][['CITY', 'B18D17S086I00280']]
B18D17S086I00280 = (pd.DataFrame(B18D17S086I00280).set_index('CITY')/1024/1024).round(6)
B18D17S086I03280 = (df['B18D17S086I03280'].groupby(df['CITY']).mean()/1024).round(6)
B13D17S086I02480 = df['B13D17S086I02480'].groupby(df['CITY']).mean().round(6)
B13D17S086I02980 = df['B13D17S086I02980'].groupby(df['CITY']).mean().round(6)
B18D17S086I00680 = (df['B18D17S086I00680'].groupby(df['CITY']).mean()/10000).round(6)
# B18D17S086I00780 = (df['B18D17S086I00780'].groupby(df['CITY']).max()/10000).round(6)
accDf = df.sort_values(by = 'B18D17S086I00780', ascending = True)
accDf = accDf.reset_index(drop=True)
accLen = len(accDf)
while accLen > 0:
maxVal = accDf['B18D17S086I00780'][accLen-1]
secondMaxVal = accDf['B18D17S086I00780'][accLen-2]
ratio = (maxVal-secondMaxVal)/maxVal
if ratio < 0.05:
break
accLen = accLen-1
decideTime780 = accDf[accDf['B18D17S086I00780'] == accDf['B18D17S086I00780'][accLen-1]]['DATE_TIME']
# decideTime780 = df[df["B18D17S086I00780"] == df["B18D17S086I00780"].max()]['DATE_TIME']
timeStr780 = decideTime780.values[0]
B18D17S086I00780 = df[df.DATE_TIME == timeStr780][['CITY', 'B18D17S086I00780']]
B18D17S086I00780 = (pd.DataFrame(B18D17S086I00780).set_index('CITY')/10000).round(6)
B18D17S086I00980 = (df['B18D17S086I00980'].groupby(df['CITY']).mean()/10000).round(6)
B18D17S086I01080 = (df['B18D17S086I01080'].groupby(df['CITY']).mean()/10000).round(6)
B18D17S086I03380 = df['B18D17S086I03380'].groupby(df['CITY']).mean().round(6)
B18D17S086I03480 = df['B18D17S086I03480'].groupby(df['CITY']).mean().round(6)
B18D17S088I01100 = (df['B18D17S088I01100'].groupby(df['CITY']).mean()/10000).round(6)
B18D17S088I01200 = (df['B18D17S088I01200'].groupby(df['CITY']).mean()/10000).round(6)
# B18D17S086I00380 = df['B18D17S086I00380'].groupby(df['CITY']).mean().round(6)
# B18D17S086I00880 = (df['B18D17S086I00880'].groupby(df['CITY']).mean()/10000).round(6)
sc = pd.concat([B18D17S086I00180, B18D17S086I00280, B18D17S086I03280, B13D17S086I02480,
B13D17S086I02980, B18D17S086I00680, B18D17S086I00780, B18D17S086I00980,
B18D17S086I01080, B18D17S086I03380, B18D17S086I03480, B18D17S088I01100,
B18D17S088I01200], axis=1, sort=False)
# , B18D17S086I00380, B18D17S086I00880
# 重建索引
sc = sc.reset_index(drop=False)
# 新增province一列,如果city為地市,省份應該怎麼辦?
# print(sc)
sc['PROVINCE'] = sc['CITY']
sc.insert(0, 'PROVINCE', sc.pop('PROVINCE'))
# 新增data_time一列
sc.insert(2, 'DATE_TIME', month)
# 新增data_type一列
sc.insert(3, 'DATE_TYPE', '月')
# 新增attribute1和attribute2
sc.insert(4, 'ATTRIBUTE1', '彙總')
sc.insert(5, 'ATTRIBUTE2', '彙總')
# outputPath = '/data/mytest/indicator/collection/ods_slsw/slqf/slsw_slqf_m_'+month+'_001_001.csv'
outputPath = 'C:\\Users\\Dell\\Desktop\\slsw_slqf_m_'+month+'_001_001.csv'
sc.to_csv(outputPath, index=0, sep='|')
turn(outputPath)
print("done")
六、資料處理:計算長期在庫業務量TOP5記錄
DPI資料+中間表資料
# 質差APP、IP、POI篩選長期在庫業務量TOP5
import pandas as pd
import os
import numpy as np
import datetime
from dateutil.relativedelta import relativedelta
month = datetime.date.strftime(datetime.date.today() - relativedelta(months=1), '%Y%m') #自動獲取上個月月份,YYYYMM格式
#month = 202110
sysk_date = str(month)[4:] + '30'
#sysk_date = '1030'
dpi_path = 'D:\\TOP5\\DPI'
sysk_path = 'D:\\TOP5\\四域四庫'
def SearchFiles(path, fileType):
fileList=[]
#root:絕對路徑,dirs:資料夾名稱,files:檔名。os.walk的預設3個引數
for root, dirs, files in os.walk(path):
for fileName in files:
if fileName.endswith(fileType):
fileList.append(os.path.join(root,fileName))
return fileList
# prov_dict = {551: '安徽', 100: '北京', 230: '重慶', 591: '福建', 200: '廣東', 931: '甘肅', 771: '廣西',
# 851: '貴州', 371: '河南', 270: '湖北', 311: '河北', 898: '海南', 451: '黑龍江', 731: '湖南',
# 431: '吉林', 250: '江蘇', 791: '江西', 240: '遼寧', 471: '內蒙古', 951: '寧夏', 971: '青海',
# 280: '四川', 531: '山東', 210: '上海', 290: '陝西', 351: '山西', 220: '天津', 991: '新疆',
# 891: '西藏', 871: '雲南', 571: '浙江'}
#dpi app資料-----------------------------------------------------------------------------------------
app = pd.read_csv(open('{}\\{}\\質差APP-{}.log'.format(dpi_path,month,month),encoding='UTF8',errors='ignore')
,sep='|',usecols=[1,3,4,5,16,17],header=None,low_memory=False,dtype=object) #強制字串型別,保證不丟0
# 讀取了省份、業務大類、業務小類、使用者數
app.columns=['prov','type','subtype','cnt','大類名稱','小類名稱']
app['cnt'] = app['cnt'].apply(pd.to_numeric) #cnt轉數值
app['app_type'] = app['type'] + '_' + app['subtype'] #兩列拼接
app_cnt = app.groupby(by = ['prov','app_type','大類名稱','小類名稱']).aggregate({'cnt':np.sum})
app_cnt.reset_index(inplace=True)
sysk_path_app = '{}\\四域四庫_{}\\app'.format(sysk_path, sysk_date)
fileType ='.txt'
fList = SearchFiles(sysk_path_app, fileType)
for file in fList:
pd0=pd.read_csv(open(file,encoding='UTF8',errors='ignore'),sep='|',header=None,usecols=[1,6,15],low_memory=False,dtype=object)
if file==fList[0]:
sysk_app=pd0
else:
sysk_app=pd.concat([sysk_app, pd0],ignore_index=True)
# 讀取了業務大小類編碼、省份、是否長期在庫
sysk_app.columns=['userid','prov','is_long']
sysk_app_long = sysk_app[sysk_app['is_long'] == '是']
# 匯聚
app_output = pd.merge(sysk_app_long,app_cnt,left_on=['prov','userid'],right_on=['prov','app_type'], how='left')
app_output.drop(['userid','is_long'], axis=1,inplace=True)
app_output.drop_duplicates(inplace=True)
app_output['rank'] = app_output.groupby(['prov'])['cnt'].rank(method='min',ascending =0) #分組排名
app_output.sort_values(['prov','rank'],ascending=[1,1],inplace=True)
app_output = app_output[app_output['rank']<=5]
app_order = ['prov','app_type','cnt','大類名稱','小類名稱','rank']
app_output = app_output[app_order]
#dpi ip資料--------------------------------------------------------------------------------------------
ip = pd.read_csv(open('{}\\{}\\質差IP-{}.log'.format(dpi_path,month,month),encoding='UTF8',errors='ignore')
,sep='|',usecols=[1,3,4,5,6,17,18,19],header=None,low_memory=False,dtype=object) #強制字串型別,保證不丟0
# 讀取了省份、業務大類、業務小類、IP、使用者數
ip.columns=['prov','type','subtype','ip','cnt','大類名稱','小類名稱','歸屬地']
ip['cnt'] = ip['cnt'].apply(pd.to_numeric) #cnt轉數值
ip['app_type'] = ip['type'] + '_' + ip['subtype'] #兩列拼接
ip_cnt = ip.groupby(by = ['prov','app_type','ip','歸屬地','大類名稱','小類名稱']).aggregate({'cnt':np.sum})
ip_cnt.reset_index(inplace=True)
sysk_path_ip = '{}\\四域四庫_{}\\ip'.format(sysk_path,sysk_date)
fileType ='.txt'
fList = SearchFiles(sysk_path_ip, fileType)
#print(fList)
for file in fList:
pd0=pd.read_csv(open(file,encoding='UTF8',errors='ignore'),sep='|',header=None,usecols=[1,4,6,15],low_memory=False,dtype=object)
if file==fList[0]:
sysk_ip=pd0
else:
sysk_ip=pd.concat([sysk_ip, pd0],ignore_index=True)
# 讀取了業務大小類編碼、IP、省份、是否長期在庫
sysk_ip.columns=['userid','ip','prov','is_long']
sysk_ip_long = sysk_ip[sysk_ip['is_long'] == '是']
# 匯聚
ip_output = pd.merge(sysk_ip_long,ip_cnt,left_on=['prov','userid','ip'],right_on=['prov','app_type','ip'], how='left')
ip_output.drop(['userid','is_long'], axis=1,inplace=True)
ip_output.drop_duplicates(inplace=True)
ip_output['rank'] = ip_output.groupby(['prov'])['cnt'].rank(method='min',ascending =0) #分組排名
ip_output.sort_values(['prov','rank'],ascending=[1,1],inplace=True)
ip_output = ip_output[ip_output['rank']<=5]
order = ['prov','app_type','ip','cnt','歸屬地','大類名稱','小類名稱','rank']
ip_output = ip_output[order]
#dpi poi資料------------------------------------------------------------------------------------------
poi = pd.read_csv(open('{}\\{}\\質差POI-{}.log'.format(dpi_path,month,month),encoding='UTF8',errors='ignore')
,sep='|',usecols=[1,3,4,6],header=None,low_memory=False,dtype=object) #強制字串型別,保證不丟0
poi.columns=['prov','longitude','latitude','cnt']
poi['cnt'] = poi['cnt'].apply(pd.to_numeric) #cnt轉數值
poi['poi'] = poi['longitude'] + '_' + poi['latitude'] #兩列拼接
poi_cnt = poi.groupby(by = ['prov','poi']).aggregate({'cnt':np.sum})
poi_cnt.reset_index(inplace=True)
sysk_path_poi = '{}\\四域四庫_{}\\poi'.format(sysk_path,sysk_date)
fileType ='.txt'
fList = SearchFiles(sysk_path_poi, fileType)
#print(fList)
for file in fList:
pd0=pd.read_csv(open(file,encoding='UTF8',errors='ignore'),sep='|',header=None,usecols=[1,6,15],low_memory=False,dtype=object)
if file==fList[0]:
sysk_poi=pd0
else:
sysk_poi=pd.concat([sysk_poi, pd0],ignore_index=True)
sysk_poi.columns=['userid','prov','is_long']
sysk_poi_long = sysk_poi[sysk_poi['is_long'] == '是']
# 匯聚
poi_output = pd.merge(sysk_poi_long,poi_cnt,left_on=['prov','userid'],right_on=['prov','poi'], how='left')
poi_output.drop(['userid','is_long'], axis=1,inplace=True)
poi_output['rank'] = poi_output.groupby(['prov'])['cnt'].rank(method='min',ascending =0) #分組排名
poi_output.sort_values(['prov','rank'],ascending=[1,1],inplace=True)
poi_output = poi_output[poi_output['rank']<=5]
#print(poi_output.head(20))
writer = pd.ExcelWriter('{}\\{}\\質差top5_{}.xlsx'.format(dpi_path,month,month),engine='openpyxl')
app_output.to_excel(writer, sheet_name='app',index=False)
ip_output.to_excel(writer, sheet_name='ip',index=False)
poi_output.to_excel(writer, sheet_name='poi',index=False)
writer.save()
print('Done')
七、資料處理:資料工單生成與附件遷移
# 附件1/2重新命名並遷移至attachment路徑,生成工單
import datetime
import json
import time
import os, shutil, sys
# 獲取當前日期
# file_today = datetime.date.strftime(datetime.date.today(), '%Y%m%d')
# file_min = time.strftime("%Y%m%d%H%M", time.localtime())
# file_sec = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
file_today = "20220519"
file_min = "202205191030"
file_sec = "2022-05-19 10:30:00"
data_month = '202204'
# 獲取省份對映
prov_dict = {"安徽": 551, "北京": 100, "重慶": 230, "福建": 591, "廣東": 200, "甘肅": 931, "廣西": 771,
"貴州": 851, "河南": 371, "湖北": 270,"河北": 311, "海南": 898, "黑龍江": 451, "湖南": 731,
"吉林": 431, "江蘇": 250, "江西": 791, "遼寧": 240, "內蒙古": 471,"寧夏": 951, "青海": 971,
"四川": 280, "山東": 531, "上海": 210, "陝西": 290, "山西": 351, "天津": 220, "新疆": 991,
"西藏": 891, "雲南": 871, "浙江": 571}
prov_code_dict = {"廣東": "GD", "北京": "BJ", "上海": "SH", "天津": "TJ", "江蘇": "JS", "浙江": "ZJ",
"安徽": "AH", "福建": "FJ", "湖北": "HB", "陝西": "SN", "河北": "HE", "山西": "SX",
"河南": "HA", "吉林": "JL", "湖南": "HN", "廣西": "GX", "江西": "JX", "雲南": "YN",
"海南": "HI", "甘肅": "GS", "青海": "QH", "重慶": "CQ", "遼寧": "LN", "四川": "SC",
"山東": "SD", "貴州": "GZ", "西藏": "XZ", "寧夏": "NX", "新疆": "XJ", "黑龍江": "HL",
"內蒙古": "NM"}
# 附件處理邏輯
# 檔案遷移
def move_to_root_folder(root_path, cur_path):
for filename in os.listdir(cur_path):
if os.path.isfile(os.path.join(cur_path, filename)):
shutil.move(os.path.join(cur_path, filename), os.path.join(root_path, filename))
elif os.path.isdir(os.path.join(cur_path, filename)):
move_to_root_folder(root_path, os.path.join(cur_path, filename))
else:
sys.exit("Should never reach here.")
# remove empty folders
if cur_path != root_path:
os.rmdir(cur_path)
# 生成附件1
key_word = 'no5g_eci_month'
ori_path = 'C:\\Users\\Dell\\Desktop\\'+key_word+'\\'+data_month+'\\userList\\'
# 列出省份目錄
prov_path = os.listdir(ori_path)
for prov in prov_path:
if(prov.__contains__("province")):
provName = prov[9:]
provCode = prov_dict[provName]
desPath = ori_path + '\\' + prov # 獲取舊的檔案路徑和名稱
oldFile = os.listdir(desPath)[0]
oldName = os.path.join(desPath + "\\"+oldFile)
# 此處需要增加檔案大小判斷,並自動拆分成多個檔案,檔名指定從0001增長到000n。。。
newFile = "附件1_"+file_today+"-05-"+str(provCode)+"-0001_"+key_word+".csv"
newName = os.path.join(desPath + "\\" + newFile) # 修改之後檔名
os.rename(oldName, newName)
# 生成附件2
key_word2 = 'indoor_quality'
ori_path2 = 'C:\\Users\\Dell\\Desktop\\'+key_word2+'\\'+data_month+'\\userList\\'
# 列出省份目錄
prov_path2 = os.listdir(ori_path2)
for prov2 in prov_path2:
if(prov2.__contains__("province")):
provName2 = prov2[9:]
provCode2 = prov_dict[provName2]
desPath2 = ori_path2 + '\\' + prov2 # 獲取舊的檔案路徑和名稱
oldFile2 = os.listdir(desPath2)[0]
oldName2= os.path.join(desPath2 + "\\"+oldFile2)
# 此處需要增加檔案大小判斷,並自動拆分成多個檔案,檔名指定從0001增長到000n。。。
newFile2 = "附件2_"+file_today+"-05-"+str(provCode2)+"-0001_"+key_word2+".csv"
newName2 = os.path.join(desPath2 + "\\" + newFile2) # 修改之後檔名
os.rename(oldName2, newName2)
des_path1 = 'C:\\Users\\Dell\\Desktop\\'+data_month+'\\attachmentone'
move_to_root_folder(des_path1, ori_path)
des_path2 = 'C:\\Users\\Dell\\Desktop\\'+data_month+'\\attachmenttwo'
move_to_root_folder(des_path2, ori_path2)
# 生成工單
province = ["安徽", "北京", "重慶", "福建", "廣東", "甘肅", "廣西", "貴州", "河南",
"湖北", "河北", "海南", "黑龍江", "湖南", "吉林", "江蘇", "江西", "遼寧",
"內蒙古", "寧夏", "青海", "四川", "山東", "上海", "陝西", "山西", "天津",
"新疆", "西藏", "雲南", "浙江"]
order_combine = []
for i in province:
provCode = prov_code_dict[i]
provNum = prov_dict[i]
orderId = file_today + '-05-' + str(provNum) + '-0001'
attach1_word = "no5g_eci_month"
attach2_word = "indoor_quality"
attach1 = "附件1_" + file_today + "-05-" + str(provNum) + "-0001_" + attach1_word + ".csv"
attach2 = "附件2_" + file_today + "-05-" + str(provNum) + "-0001_" + attach2_word + ".csv"
order = {
"createUsername": "張思為",
"createMobile": "13811784627;[email protected]",
"qdWsid": orderId,
"decideTime": file_sec,
"provinceCode": provCode,
"provinceName": i,
"qdRegion": "",
"qdType": "分析資料",
"qdMajor": "移動業務",
"wsHintInfo": "移動業務-分析資料工單-轉質量管理專業",
"relatedCounts": "",
"relatedValuableCounts": "",
"taskLevel": "一般",
"factoryHandleLimitTime1": "14",
"decideRule": "具體分析要求,總部將與各省質量管理專業同事溝通,本工單僅派發分析資料,不作具體修復。",
"repairRule": "核心規則見附件,請各省參考。請將本工單轉發省內質量管理負責同事。",
"topCause": "",
"relatedImportantCounts": "",
"attachList1": [
{
"fileName": attach1,
"fileUrl": "/ftpdata/user/eoms/attachments/"
},
{
"fileName": attach2,
"fileUrl": "/ftpdata/user/eoms/attachments/"
},
{
"fileName": "資料統計規則.docx",
"fileUrl": "/ftpdata/user/eoms/attachments/"
}
]
}
order_combine.append(order)
order_name = 'sysk_workorder_'+file_min+'.txt'
order_path = 'C:\\Users\\Dell\\Desktop\\'+order_name
json.dump(order_combine, open(order_path,'w'),ensure_ascii=False,indent=None)
print("done")
八、資料處理:按行分割檔案並保留表頭
# 按照行數分割附件,並保留表頭
import pandas as pd
import os
import chardet
def turn(file):
with open(file, 'rb') as f:
data = f.read()
encoding = chardet.detect(data)['encoding']
data_str = data.decode(encoding)
tp = 'LF'
if '\r\n' in data_str:
tp = 'CRLF'
data_str = data_str.replace('\r\n', '\n')
if encoding not in ['utf-8', 'ascii'] or tp == 'CRLF':
with open(file, 'w', newline='\n', encoding='utf-8') as f:
f.write(data_str)
print(f"{file}: ({tp},{encoding}) trun to (LF,utf-8) success!")
def SplitExcel(file, num):
file_dir = 'C:\\Users\\Dell\\Desktop\\result' # 建立目錄
if os.path.isdir(file_dir):
os.rmdir(file_dir)
else:
os.mkdir(file_dir)
n = 1
row_list = []
df = pd.DataFrame(pd.read_csv(file))
row_num = int(df.shape[0]) # 獲取行數
if num >= row_num: # 如果分割行數大於總行數,報錯
raise Exception('too much!!')
try:
for i in list(range(num, row_num, num)):
row_list.append(i)
row_list.append(row_num) # 得到完整列表
except Exception as e:
print(e)
(name, ext) = os.path.splitext(file) # 獲取檔名
for m in row_list:
filename = os.path.join(file_dir, name + '_' + str(n) + '.csv')
if m < row_num:
df_handle = df.iloc[m - num:m] # 獲取n行之前
print(df_handle)
df_handle.to_csv(filename, index=False)
turn(filename)
elif m == int(row_num):
remainder = int(int(row_num) % num) # 餘數
df_handle = df.iloc[m - remainder:m] # 獲取最後不能整除的行
df_handle.to_csv(filename, index=False)
turn(filename)
n = n + 1
prov_dict = {"安徽": 551, "北京": 100, "重慶": 230, "福建": 591, "廣東": 200, "甘肅": 931, "廣西": 771,
"貴州": 851, "河南": 371, "湖北": 270,"河北": 311, "海南": 898, "黑龍江": 451, "湖南": 731,
"吉林": 431, "江蘇": 250, "江西": 791, "遼寧": 240, "內蒙古": 471,"寧夏": 951, "青海": 971,
"四川": 280, "山東": 531, "上海": 210, "陝西": 290, "山西": 351, "天津": 220, "新疆": 991,
"西藏": 891, "雲南": 871, "浙江": 571}
if __name__ == '__main__':
province = "湖北"
provCode = prov_dict[province]
fileName = "C:\\Users\\Dell\\Desktop\\202205\\attachmentone\\附件1_20220519-05-" + str(provCode) + "-0001_no5g_eci_month.csv"
# file = 'result.xls'
SplitExcel(fileName, num=730000)
print("done")