1. 程式人生 > 其它 >上傳下載程式,支援動態更換IP和埠, 上傳下載, 進度條顯示, 正則校驗檔案格式, hash校驗檔案完整性

上傳下載程式,支援動態更換IP和埠, 上傳下載, 進度條顯示, 正則校驗檔案格式, hash校驗檔案完整性

技術標籤:Linuxpythonexesocketpythonexeftp開發工具

支援動態更換IP和埠, 上傳下載, 進度條顯示, 正則校驗檔案格式, hash校驗檔案完整性

另有已封裝好的exe檔案

客戶端

# coding=utf-8
from socket import *
import json
import struct
import os,re
import hashlib

ip = 0
port = 0

# 列印進度條
def progress(percent, symbol='█', width=40):
    if percent > 1:  # 超過 100% 的時候讓其停在 1
percent = 1 # 可以避免進度條溢位 show_progress = ("▌%%-%ds▌" % width) % (int(percent * width) * symbol) print("\r%s %.2f%%" % (show_progress, percent * 100), end='') # hash 校驗 def Hash_md5(file_path:str): m = hashlib.md5() m.update(str(os.path.getsize(file_path)).
encode("utf-8")) return m.hexdigest() # 連線 def connection(): client = socket(AF_INET,SOCK_STREAM) client.connect((ip,port)) return client # 下載 def download(client): client.send("1".encode("utf-8")) while 1: try: file_path =
input("Please enter the file path(q/exit)>>").strip() if file_path.lower() == "q":break if len(file_path) == 0:continue to_path = input("Please enter the save directory(q/back)>>").strip() if to_path.lower() == "q":continue if not os.path.isdir(to_path): print("not find");continue else: file_name = input("Please enter filename(q/back)>>").strip() if file_name.lower() == "q":continue if re.search(r'[/|:*"<>\\]',file_name): print(r'Filenames cannot have these symbols:/|:*"<>\\');continue goal_path = os.path.join(to_path,file_name) client.send(file_path.encode("utf-8")) bytes_4 = client.recv(4) if bytes_4.decode("utf-8") == "4044": print("not find");continue else: header_bytes_len = struct.unpack("i",bytes_4)[0] header_bytes = client.recv(header_bytes_len) header_dic = json.loads(header_bytes.decode("utf-8")) date_len = header_dic["file_size"] hash_md5 = header_dic["hash"] recv_len = 0 with open(goal_path,"wb")as f: while 1: date = client.recv(1024) recv_len += len(date) percent = recv_len / date_len # 接收的比例 progress(percent, width=40) # 進度條的寬度40 f.write(date) if recv_len == date_len: break if hash_md5 == Hash_md5(goal_path): # hash 值校驗 print("\nHash auth succeed\nFile saved...") continue else: os.remove(goal_path) # 校驗失敗內容刪除 print("Hash auth failed!!") except Exception as E: print(E);break # 上傳 def uploading(client): client.send("2".encode("utf-8")) while 1: try: file_path = input("Please enter the path of the uploaded file(q/exit)>>").strip() if file_path.lower() == "q":break file_path = os.path.normpath(file_path) if not os.path.isfile(file_path): print("not find");continue goal_path = input("Please enter the destination path(q/back)>>").strip() if goal_path.lower() == "q":continue goal_path = os.path.normpath(goal_path) client.send(goal_path.encode("utf-8")) bytes_4 = client.recv(4) if bytes_4.decode("utf-8") == "4044": print("not find");continue else: file_name = input("Please name the new file(q/back)>>").strip() if file_name.lower() == "q":continue if re.search(r'[/|:*"<>\\]',file_name): print(r'Filenames cannot have these symbols:/|:*"<>\\');continue goal_file_path = os.path.join(goal_path,file_name) file_size = os.path.getsize(file_path) file_name = os.path.basename(file_path) md5 = Hash_md5(file_path) header_dic = {"file_name": file_name, "file_size": file_size, "hash": md5,"file_path":goal_file_path} header_json = json.dumps(header_dic) header_bytes = header_json.encode("utf-8") header_bytes_len = struct.pack("i", len(header_bytes)) client.send(header_bytes_len) client.send(header_bytes) send_len = 0 with open(file_path, "rb")as f: for line in f: send_len += len(line) percent = send_len / file_size # 接收的比例 progress(percent, width=40) # 進度條的寬度40 client.send(line) print("\nsuccessfully upload!") except Exception as E: print(E);break func_dic = { "1":["download ",download], "2":["upload",uploading], "3":["IP Settings",download], } # 連線服務端ip和埠並選擇功能 def run(): while True: print("Please enter the correct IP and port") global ip,port ip = input("Please enter IP(q/exit)>>").strip() if ip.lower() == "q":break port = input("Please enter PORT(q/exit)>>").strip() if port.lower() == "q":break if port.isdigit(): port = int(port) else: print("Please enter the number") continue try: client = connection() # 檢測連線是否建立成功 except Exception as E: print(E);continue while 1: for k,v in func_dic.items(): print(f"{k} : {v[0]}") select = input("Please select function>>") if select == "3":break if select in func_dic: func_dic[select][1](client) run()

服務端

# coding=utf-8
from socket import *
import json
import struct
import os,hashlib

# 繫結服務端地址
def connection():
    server = socket(AF_INET,SOCK_STREAM)
    server.bind((ip,port))
    server.listen(5)
    return server

# 建立連線選擇功能
def recv_send(server):
    while 1:
        print("connection...")
        conn,addr = server.accept()
        print(f"from {addr} conn")
        select = conn.recv(1)
        if select.decode("utf-8") == "1":
            download(conn)
        elif select.decode("utf-8") == "2":
            uploading(conn)

# 客戶端下載
def download(conn):
    while 1:
        try:
            file_path = conn.recv(1024)
            file_path = os.path.normpath(file_path.decode("utf-8"))
            if not os.path.isfile(file_path):
                conn.send("4044".encode("utf-8"))
            else:
                file_size = os.path.getsize(file_path)
                file_name = os.path.basename(file_path)
                m = hashlib.md5()
                m.update(str(file_size).encode("utf-8"))
                md5 = m.hexdigest()
                header_dic = {"file_name":file_name,"file_size":file_size,"hash":md5}
                header_json = json.dumps(header_dic)
                header_bytes = header_json.encode("utf-8")
                header_bytes_len = struct.pack("i",len(header_bytes))
                conn.send(header_bytes_len)
                conn.send(header_bytes)
                with open(file_path,"rb")as f:
                    for line in f:
                        conn.send(line)
        except Exception:
            break

# 客戶端的上傳
def uploading(conn):
    while 1:
        try:
            dir_path = conn.recv(1024)
            dir_path = os.path.normpath(dir_path.decode("utf-8"))
            if not os.path.isdir(dir_path):
                conn.send("4044".encode("utf-8"));continue
            else:
                conn.send("4444".encode("utf-8"))
                bytes_4 = conn.recv(4)
                header_bytes_len = struct.unpack("i", bytes_4)[0]
                header_bytes = conn.recv(header_bytes_len)
                header_dic = json.loads(header_bytes.decode("utf-8"))
                date_len = header_dic["file_size"]
                goal_file_path = header_dic["file_path"]
                recv_len = 0
                with open(goal_file_path, "wb")as f:
                    while 1:
                        date = conn.recv(1024)
                        recv_len += len(date)
                        f.write(date)
                        if recv_len == date_len: break
                continue
        except Exception as E:
            print(E);break

def run():
    while True:
        print("Please enter the correct IP and port")
        global ip, port
        ip = input("Please enter IP(q/exit)>>").strip()
        if ip.lower() == "q": break
        port = input("Please enter PORT(q/exit)>>").strip()
        if port.lower() == "q": break
        if port.isdigit():
            port = int(port)
            try:
                server = connection()
            except Exception as E:
                print(E);continue
            recv_send(server)
        else:
            print("Please enter the number")
            continue

run()