1. 程式人生 > 實用技巧 >readzip_minute_data 多程序處理資料

readzip_minute_data 多程序處理資料

#!/usr/bin/env python
import os
import numpy as np
import py7zr
import shutil
import pandas as pd
import time
import multiprocessing
import re
def fun_time_l2(a,b):
    if float(a)<=float(b) :
        return 1
    else:
        return 0

def read_files(filename):#讀檔案內容
    df1 = pd.DataFrame()
    with open(filename, "r") as f:
        listT = []
        for line in f:
            listT.append(line)
        df1 = pd.DataFrame(listT)
    index = df1.loc[(df1[0].str.contains("find"))].index
    if index.isnull:
        df1 = df1.drop(index=index)
    df1 = pd.DataFrame(df1[0].str.strip())
    df1 = pd.DataFrame(df1[0].str.split("\t", expand=True))
    df1[3] = df1[1].astype("int") * df1[2].astype("int")
    df1.columns = ["time", "price", "vol", "amount"]
    vol_t = abs(df1["vol"].astype("long")).sum()
    amount_t = abs(df1["amount"].astype("long")).sum()

    df_f_xiao = df1[(df1["amount"].astype("int") < 0) & ((df1["amount"].astype("int") > -40000))]
    df_f_zhong = df1[(df1["amount"].astype("int") <= -40000) & ((df1["amount"].astype("int") > -200000))]
    df_f_da = df1[(df1["amount"].astype("int") <= - 200000) & ((df1["amount"].astype("int") > -1000000))]
    df_f_te_da = df1[(df1["amount"].astype("int") <= - 1000000)]

    f_xiao = df_f_xiao["amount"].astype("long").sum()
    f_zhong = df_f_zhong["amount"].astype("long").sum()
    f_da = df_f_da["amount"].astype("long").sum()
    f_te_da = df_f_te_da["amount"].astype("long").sum()

    df_z_xiao = df1[(df1["amount"].astype("int") > 0) & ((df1["amount"].astype("int") < 40000))]
    df_z_zhong = df1[(df1["amount"].astype("int") >= 40000) & ((df1["amount"].astype("int") < 200000))]
    df_z_da = df1[(df1["amount"].astype("int") >= 200000) & ((df1["amount"].astype("int") < 1000000))]
    df_z_te_da = df1[(df1["amount"].astype("int") >= 1000000)]

    z_xiao = df_z_xiao["amount"].astype("long").sum()
    z_zhong = df_z_zhong["amount"].astype("long").sum()
    z_da = df_z_da["amount"].astype("long").sum()
    z_te_da = df_z_te_da["amount"].astype("long").sum()

    # add 增加計算最小值
    min_L = df1["price"].astype("int").min()
    sum_V = abs(df1["vol"].astype("int")).sum()
    min_2 = min_L * 1.02
    df_min_2 = df1[(df1["price"].astype("int") < min_2)]
    sum_min_2_v = abs(df_min_2["vol"].astype("long")).sum()
    re_min_L2 = abs(sum_min_2_v) / sum_V * 100
    # add time
    df_min_3 = pd.DataFrame()
    df_min_3["time"] = df_min_2["time"].str[:-2]
    df_min_3 = df_min_3.drop_duplicates(subset = ['time'],keep = 'first',inplace = False)
    time_l2 = len(df_min_3)

    #add minute
    df_time_all = pd.DataFrame()
    df_time_all["time"] = df1["time"].str[:-2]

    df_time_all["price"] = df1["price"].astype('int')
    df_time_all["vol"] = df1["vol"].astype('long')
    df_time_all["abs_vol"] = abs(df1["vol"].astype('long'))

    df_time_all_only = df_time_all.drop_duplicates(subset=['time'], keep='first', inplace=False)
    df_time_all_only = df_time_all_only.reset_index(drop=True)

    df_list_return = pd.DataFrame()
    b = {"vol_t": vol_t, "amount_t": amount_t, "z_xiao": z_xiao, "z_zhong": z_zhong, "z_da": z_da, "z_te_da": z_te_da,
         "f_xiao": f_xiao, "f_zhong": f_zhong, "f_da": f_da, "f_te_da": f_te_da, "re_min_L2": re_min_L2,
         'time_l2': time_l2}
    df_list_return = df_list_return.append(b,ignore_index=True)

    pd_read = pd.pivot_table(df_time_all, index='time',values = ['price','vol',"abs_vol"] , aggfunc = { 'price':np.mean,'vol':np.sum ,"abs_vol":np.sum})

    pd_read = pd_read.reset_index()
    pd_read["time_abs_vol"] = pd_read["time"] +".2"
    pd_read["time_price"] = pd_read["time"] + ".1"
    pd_read["time_vol"] = pd_read["time"] + ".3"
    pd_r_1 = pd.DataFrame()
    pd_r_1["value"] = pd_read["price"]
    pd_r_1["time_price"] = pd_read["time_price"]
    pd_r_1 = pd_r_1.set_index("time_price",drop = True)
    pd_r_2 = pd.DataFrame()
    pd_r_2["value"] = pd_read["abs_vol"]
    pd_r_2["time_abs_vol"] = pd_read["time_abs_vol"]
    pd_r_2 = pd_r_2.set_index("time_abs_vol", drop=True)
    pd_r_3 = pd.DataFrame()
    pd_r_3["value"] = pd_read["vol"]
    pd_r_3["time_vol"] = pd_read["time_vol"]
    pd_r_3 = pd_r_3.set_index("time_vol", drop=True)

    pd_r_co = pd.DataFrame()
    pd_r_co = pd_r_co.append(pd_r_1)
    pd_r_co = pd_r_co.append(pd_r_2)
    pd_r_co = pd_r_co.append(pd_r_3)

    pd_r_co = pd_r_co.reset_index()
    pd_r_co["index"] = pd_r_co["index"].astype('float')
    pd_r_co = pd_r_co.sort_values(by = "index")
    pd_r_co = pd_r_co.set_index("index", drop=True)

    df_list_return = df_list_return.T
    df_list_return.columns = ['value']
    df_list_return = df_list_return.append(pd_r_co)
    df_list_return = df_list_return.T
    return df_list_return
  


def extract_files(filename):#提出7Z檔案
    with py7zr.SevenZipFile(filename, 'r') as archive:
        allfiles = archive.getnames()#獲取7Z檔案內的子檔名
        tempdir = allfiles[0].split("/")[0]#取7Z檔案內資料夾名稱
        savedir =pathsave + str(tempdir)
        if os.path.exists(savedir):
            shutil.rmtree(savedir)#刪除同名資料夾
        os.mkdir(savedir)#重建資料夾
        #archive.extract(pathsave,allfiles[0:3])#解壓到資料夾
        archive.extractall(pathsave)#解壓到資料夾
        #print(archive.extractall())
        return savedir
def read_dirs(savedir):#讀資料夾
    files=np.array(os.listdir(savedir))
    file_names = np.char.add(savedir + "\\",files)
    return file_names
def sub_process(df_only_name1,q):
    list_t1 = pd.DataFrame()
    n_count = 0
    for file in df_only_name1:
        n_count = n_count + 1
        #print("No. " ,n_count)
        (filepath, tempfilename) = os.path.split(file)
        (filename, extension) = os.path.splitext(tempfilename)

        if not os.path.getsize(file):  # 判斷檔案大小是否為0
            print("file siz = 0")
            print(file)
        else:
            list_t = pd.DataFrame()
            list_t = read_files(file)
            list_t.insert(0,"a_name",filename)
            list_t1 = list_t1.append(list_t)
    list_t1 = list_t1.reset_index(drop = True)#=============================================
    q.put(list_t1,block = False)
    exit(0)

if __name__ == '__main__':
    #path = r'G:\datas of status\tick-by-tick trade'  # 資料檔案存放位置
    path = r'G:\datas of status\2018-tick-by-tick trade'
    pathsave = 'G:\\datas of status\\python codes\\'  # 設定臨時檔案存放位置
    pathTemp = 'G:\\datas of status\\python codes\\everyday_data\\temp'
    listM = np.array(os.listdir(path))  # 獲取月資料夾
    print(listM)
    listM = np.char.add(path + "\\", listM)  # 獲取月資料夾路徑
    #====================start work
    m = 13  # 開始處理第幾個資料夾(1~16,16=202004,15=202003)
    do_num = 2
    for n in range(do_num):
        i = m - n #處理第幾個資料夾(1~16)
        print(listM[i])
        listD = np.array(os.listdir(listM[i]))#獲取一個資料夾下所有日檔案全路徑
        print(listD)
        listD = np.char.add(listM[i] + "\\",listD)#獲取日檔案全名
        print(listD)
        pdM_all = pd.DataFrame()
        
        for filename in listD:
        #for filename in listD:
            # filename = listD[0]
            print("=========")
            print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
            npM = pd.DataFrame()
            savedir = extract_files(filename)    
            #savedir = "G:\\datas of status\\python codes\\20160315"#如果處理單個檔案(11111資料夾只存放一個檔案包,上一行註釋掉,不執行),則用此行,用完還原註釋此行
            print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
            savedir_t = re.sub("-", '', savedir)
            findt = re.search("\d+$", savedir_t)
            tempdir = findt.group()
            #====================
            file_names = read_dirs(savedir)
            all_nums = len(file_names)
            epochs = 3
            step = int(all_nums/epochs)
            process_list = []
            datelist = []
            q = multiprocessing.Queue(maxsize=epochs)

            for i in range(epochs):
                begin = i * step
                end = begin + step
                if i == epochs -1:
                    end = all_nums
                df_only_name1 = file_names[begin:end]
                tmp_process = multiprocessing.Process(target=sub_process, args=(df_only_name1, q))
                process_list.append(tmp_process)
            for process in process_list:
                process.start()
                #print("start",process)
            while(q.qsize() != epochs):
                #print(q.qsize(),"begin")
                if(q.qsize()>=1):
                    #print(q.qsize())
                    time.sleep(3)
                else:
                    time.sleep(5)
            count = 0
            time.sleep(1)
            #exit(0)
            while not q.empty():
                list_g = q.get()
                #print(list_g,"midle")
                #print("hhaa",count )
                count = count +1
                npM = npM.append(list_g)
                #print(npM)
            #=======================
            shutil.rmtree(savedir)
            #npM.columns = list_columns1
            #print(len(npM))
            pdD_t = npM
            pdD_t.insert(1, "date", tempdir, allow_duplicates=False)
            #===========
            #save_dfile = pathsave + "\\" + "everyday_data" + "\\" + pdD_t["date"][0] + ".csv"
            save_dfile = pathsave + "\\" + "everyday_data" + "\\" + tempdir + ".csv"
            # print(save_dfile)
            pdD_t = pdD_t.sort_values(by=['time_l2'], ascending=True)
            pdD_t.to_csv(save_dfile, sep=",", index=False, header=True)
            pdM_all = pdM_all.append(pdD_t)
            print(filename)
            print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
        # print(pdM_all)
        save_file = pathsave + pdM_all["date"][0].str[0:6] + ".csv"
        save_file = save_file.reset_index(drop=True)
        print(save_file[0])
        # df.to_csv(‘/opt/births1880.csv’, index=False, header=False
        # pdM_all = pdM_all.sort_values(by=['re_min_L2'], ascending=True)
        pdM_all.to_csv(save_file[0], sep=",", index=False, header=True)
    exit(0)