1. 程式人生 > 其它 >python3 多程序複製檔案,多執行緒複製檔案加顯示進度條 Rev03

python3 多程序複製檔案,多執行緒複製檔案加顯示進度條 Rev03

@ BY Cool_Breeze
@ Rev 03
@ 2020/08/10
'''


#coding=utf-8

import shutil
import os
import time
from multiprocessing import Process,Pool,Queue,Manager
from threading import Thread
from concurrent.futures import ThreadPoolExecutor


class copyf_show:
    '''
    # 多程序複製檔案
    # 多程序複製檔案加顯示進度
    # despath 目標目錄
    # soupath 源目錄
    '''
    def __init__(self, despath, soupath):
        self.d = despath
        self.s = soupath
        self.st = os.path.dirname(soupath)
        self.q = Manager().Queue()
        self.show_d = {} #show
        self.count = 0 #show
        self.total = 0 #show
        if not os.path.isdir(self.d): os.mkdir(self.d)

    def pro_pool(self):
        '''
        # 實時複製檔案
        # 先啟動掃描目錄,向管道傳送資料
        # 開啟程序池,等待管道接受資料
        # 一個程序複製一個目錄層檔案
        '''
        print('正在從 {} 複製所有檔案到 {}'.format(self.s, self.d))
        time_node = time.time()
        count = 0
        ps = Process(target=self.scanfile)
        ps.start()
        p = Pool() #程序數量
        while True:
            if self.q.empty(): continue
            t = self.q.get()
            if not t: break
            count += len(list(t.values())[0])
            p.apply_async(self.pro_cp, (t,))
        ps.join()
        p.close()
        p.join()
        print('總計檔案數量:{}\n耗時:{} 秒'.format(count,round(time.time() - time_node, 2)))

    def scanfile(self):
        '''
        # 掃描一層目錄,生成字典
        # 絕對路徑為key,檔案列表為值
        # 向管道傳送字典
        '''
        for root,dirs,files in os.walk(self.s):
            self.q.put({root:files})
        self.q.put(False)

    def pro_cp(self, dif):
        '''
        # 判斷目標目錄是否存在
        # 切換路徑,複製檔案
        '''
        dpa = list(dif.keys())[0].replace(self.st, self.d)
        if not os.path.isdir(dpa): os.mkdir(dpa)
        os.chdir(dpa)
        for i,j in dif.items():
            for nn in [os.path.join(i,k) for k in j]:
                shutil.copy(nn, dpa)
        
    def show_fun(self):
        '''
        # 開啟執行緒城池複製檔案並列印進度條
        '''
        print('正在從 {} 複製所有檔案到 {}'.format(self.s, self.d))
        self.show_scanfile()
        self.count = 0
        print('總計檔案數量:{}'.format(self.total))
        pp = Thread(target=self.show_pgr)
        pp.setDaemon(True)# 主程式退出結束
        pp.start()
        for d,f in self.show_d.items():
            self.show_pool(d, f)
        pp.join()

    def show_pgr(self):
        '''
        # 進度條
        # 已複製數除以總數乘以100
        # 計時
        '''
        time_node = time.time()
        while True:
            cls = round(self.count/self.total*100)
            print('\r{:<100} {}%'.format('*'*cls, cls),end='')
            if cls == 100: 
                print('\n耗時:{} 秒'.format(round((time.time() - time_node), 2)))
                break
            time.sleep(0.1)

    def show_scanfile(self):
        '''
        # 掃描一層目錄,生成字典
        # 絕對路徑為key,檔案列表為值
        # 
        '''
        for root,dirs,files in os.walk(self.s):
            self.total += len(files)
            self.show_d.update({root:files})

    def show_pool(self, root, fl):
        '''
        # root = 目錄, fl = 檔案列表
        # 多執行緒
        '''
        dpa = root.replace(self.st, self.d)
        if not os.path.isdir(dpa): os.mkdir(dpa)
        os.chdir(dpa)
        with ThreadPoolExecutor(None, self.show_cp) as p:
            p.map(self.show_cp, [(root + os.sep + f, dpa) for f in fl])
        p.shutdown()
        self.count += len(fl)

    def show_cp(self, file):
        '''
        # 絕對路徑
        # 複製檔案file = (src, dst)
        '''
        shutil.copy(file[0], file[1])


if __name__ == '__main__':
    path = r'D:\GIN\py\pool_copy_file\t'
    soup = r'D:\GIN\py\photo\photo'
    t = copyf_show(path, soup)
    t.pro_pool()
    t.show_fun()
    print('Done!')