上傳下載程式,支援動態更換IP和埠, 上傳下載, 進度條顯示, 正則校驗檔案格式, hash校驗檔案完整性
阿新 • • 發佈:2021-01-20
技術標籤:Linuxpythonexesocketpythonexeftp開發工具
支援動態更換IP和埠, 上傳下載, 進度條顯示, 正則校驗檔案格式, hash校驗檔案完整性
另有已封裝好的exe檔案
客戶端
# coding=utf-8
from socket import *
import json
import struct
import os,re
import hashlib
ip = 0
port = 0
# 列印進度條
def progress(percent, symbol='█', width=40):
if percent > 1: # 超過 100% 的時候讓其停在 1
percent = 1 # 可以避免進度條溢位
show_progress = ("▌%%-%ds▌" % width) % (int(percent * width) * symbol)
print("\r%s %.2f%%" % (show_progress, percent * 100), end='')
# hash 校驗
def Hash_md5(file_path:str):
m = hashlib.md5()
m.update(str(os.path.getsize(file_path)). encode("utf-8"))
return m.hexdigest()
# 連線
def connection():
client = socket(AF_INET,SOCK_STREAM)
client.connect((ip,port))
return client
# 下載
def download(client):
client.send("1".encode("utf-8"))
while 1:
try:
file_path = input("Please enter the file path(q/exit)>>").strip()
if file_path.lower() == "q":break
if len(file_path) == 0:continue
to_path = input("Please enter the save directory(q/back)>>").strip()
if to_path.lower() == "q":continue
if not os.path.isdir(to_path):
print("not find");continue
else:
file_name = input("Please enter filename(q/back)>>").strip()
if file_name.lower() == "q":continue
if re.search(r'[/|:*"<>\\]',file_name):
print(r'Filenames cannot have these symbols:/|:*"<>\\');continue
goal_path = os.path.join(to_path,file_name)
client.send(file_path.encode("utf-8"))
bytes_4 = client.recv(4)
if bytes_4.decode("utf-8") == "4044":
print("not find");continue
else:
header_bytes_len = struct.unpack("i",bytes_4)[0]
header_bytes = client.recv(header_bytes_len)
header_dic = json.loads(header_bytes.decode("utf-8"))
date_len = header_dic["file_size"]
hash_md5 = header_dic["hash"]
recv_len = 0
with open(goal_path,"wb")as f:
while 1:
date = client.recv(1024)
recv_len += len(date)
percent = recv_len / date_len # 接收的比例
progress(percent, width=40) # 進度條的寬度40
f.write(date)
if recv_len == date_len: break
if hash_md5 == Hash_md5(goal_path): # hash 值校驗
print("\nHash auth succeed\nFile saved...")
continue
else:
os.remove(goal_path) # 校驗失敗內容刪除
print("Hash auth failed!!")
except Exception as E:
print(E);break
# 上傳
def uploading(client):
client.send("2".encode("utf-8"))
while 1:
try:
file_path = input("Please enter the path of the uploaded file(q/exit)>>").strip()
if file_path.lower() == "q":break
file_path = os.path.normpath(file_path)
if not os.path.isfile(file_path):
print("not find");continue
goal_path = input("Please enter the destination path(q/back)>>").strip()
if goal_path.lower() == "q":continue
goal_path = os.path.normpath(goal_path)
client.send(goal_path.encode("utf-8"))
bytes_4 = client.recv(4)
if bytes_4.decode("utf-8") == "4044":
print("not find");continue
else:
file_name = input("Please name the new file(q/back)>>").strip()
if file_name.lower() == "q":continue
if re.search(r'[/|:*"<>\\]',file_name):
print(r'Filenames cannot have these symbols:/|:*"<>\\');continue
goal_file_path = os.path.join(goal_path,file_name)
file_size = os.path.getsize(file_path)
file_name = os.path.basename(file_path)
md5 = Hash_md5(file_path)
header_dic = {"file_name": file_name, "file_size": file_size, "hash": md5,"file_path":goal_file_path}
header_json = json.dumps(header_dic)
header_bytes = header_json.encode("utf-8")
header_bytes_len = struct.pack("i", len(header_bytes))
client.send(header_bytes_len)
client.send(header_bytes)
send_len = 0
with open(file_path, "rb")as f:
for line in f:
send_len += len(line)
percent = send_len / file_size # 接收的比例
progress(percent, width=40) # 進度條的寬度40
client.send(line)
print("\nsuccessfully upload!")
except Exception as E:
print(E);break
func_dic = {
"1":["download ",download],
"2":["upload",uploading],
"3":["IP Settings",download],
}
# 連線服務端ip和埠並選擇功能
def run():
while True:
print("Please enter the correct IP and port")
global ip,port
ip = input("Please enter IP(q/exit)>>").strip()
if ip.lower() == "q":break
port = input("Please enter PORT(q/exit)>>").strip()
if port.lower() == "q":break
if port.isdigit():
port = int(port)
else:
print("Please enter the number")
continue
try:
client = connection() # 檢測連線是否建立成功
except Exception as E:
print(E);continue
while 1:
for k,v in func_dic.items():
print(f"{k} : {v[0]}")
select = input("Please select function>>")
if select == "3":break
if select in func_dic:
func_dic[select][1](client)
run()
服務端
# coding=utf-8
from socket import *
import json
import struct
import os,hashlib
# 繫結服務端地址
def connection():
server = socket(AF_INET,SOCK_STREAM)
server.bind((ip,port))
server.listen(5)
return server
# 建立連線選擇功能
def recv_send(server):
while 1:
print("connection...")
conn,addr = server.accept()
print(f"from {addr} conn")
select = conn.recv(1)
if select.decode("utf-8") == "1":
download(conn)
elif select.decode("utf-8") == "2":
uploading(conn)
# 客戶端下載
def download(conn):
while 1:
try:
file_path = conn.recv(1024)
file_path = os.path.normpath(file_path.decode("utf-8"))
if not os.path.isfile(file_path):
conn.send("4044".encode("utf-8"))
else:
file_size = os.path.getsize(file_path)
file_name = os.path.basename(file_path)
m = hashlib.md5()
m.update(str(file_size).encode("utf-8"))
md5 = m.hexdigest()
header_dic = {"file_name":file_name,"file_size":file_size,"hash":md5}
header_json = json.dumps(header_dic)
header_bytes = header_json.encode("utf-8")
header_bytes_len = struct.pack("i",len(header_bytes))
conn.send(header_bytes_len)
conn.send(header_bytes)
with open(file_path,"rb")as f:
for line in f:
conn.send(line)
except Exception:
break
# 客戶端的上傳
def uploading(conn):
while 1:
try:
dir_path = conn.recv(1024)
dir_path = os.path.normpath(dir_path.decode("utf-8"))
if not os.path.isdir(dir_path):
conn.send("4044".encode("utf-8"));continue
else:
conn.send("4444".encode("utf-8"))
bytes_4 = conn.recv(4)
header_bytes_len = struct.unpack("i", bytes_4)[0]
header_bytes = conn.recv(header_bytes_len)
header_dic = json.loads(header_bytes.decode("utf-8"))
date_len = header_dic["file_size"]
goal_file_path = header_dic["file_path"]
recv_len = 0
with open(goal_file_path, "wb")as f:
while 1:
date = conn.recv(1024)
recv_len += len(date)
f.write(date)
if recv_len == date_len: break
continue
except Exception as E:
print(E);break
def run():
while True:
print("Please enter the correct IP and port")
global ip, port
ip = input("Please enter IP(q/exit)>>").strip()
if ip.lower() == "q": break
port = input("Please enter PORT(q/exit)>>").strip()
if port.lower() == "q": break
if port.isdigit():
port = int(port)
try:
server = connection()
except Exception as E:
print(E);continue
recv_send(server)
else:
print("Please enter the number")
continue
run()