【Python網路程式設計】UDP聊天、TCP檔案下載、多執行緒UDP聊天器、多程序拷貝檔案
阿新 • • 發佈:2021-01-14
技術標籤:「 後端 」專欄# 【Python】# 【計算機網路】socket網路
一、UDP聊天
客戶端
import socket
udp_socket = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
#繫結地址
server_addr = ("",8080)
udp_socket.bind(server_addr)
for x in range(100):
#列印接收的內容
recv_content,recv_addr = udp_socket.recvfrom(1024)
print("%s:%s" %(recv_addr,(recv_content.decode("utf-8"))))
伺服器
import socket
#建立套間字
udp_broadcast = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
udp_broadcast.setsockopt(socket.SOL_SOCKET,socket.SO_BROADCAST,1)
for x in range(100):
#傳送訊息
send_content = input("傳送的廣播內容:")
udp_broadcast. sendto(send_content.encode("utf-8"), ("<broadcast>", 8080))
udp_broadcast.close()
二、TCP檔案下載
伺服器
import socket
s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.bind(('127.0.0.1',8059))
s.listen(5)
while True:
conn,add = s.accept()
print(add)
while True:
data = conn.recv(1024)
file_name = data.decode('utf-8')
print("客戶端列印的檔名稱為:",file_name)
try :
with open("./"+file_name,"rb") as f:
file_date = f.read()
except:
print("伺服器沒有該檔案:",file_name)
if(file_date):
print("伺服器存在該檔案")
conn.send(file_date)
file_date = ""
客戶端
import socket
s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.connect(('127.0.0.1',8059))
while True:
send_data = input("請輸入要下載檔名稱")
s.send(send_data.encode('utf-8'))
recvdata = s.recv(1024)
if(recvdata):
print("存在該檔案並已經下載")
with open(send_data,"wb") as f:
f.write(recvdata)
else:
print("不存在該檔案")
三、多執行緒UDP聊天
伺服器
import socket
s = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
s.bind(('127.0.0.1',9527))
while True:
data,add = s.recvfrom(1024)
print("Message:",data.decode('utf-8'),"From Address:",add)
send_data = input("伺服器傳送資料:")
s.sendto(send_data.encode('utf-8'),add)
客戶端
import socket
s = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
print("Client is running")
def sendData():
send_data = input("傳送資料:")
s.sendto(send_data.encode('utf-8'), ('127.0.0.1', 9527))
def recvData():
data,add = s.recvfrom(1024)
print("Message:", data.decode('utf-8'), "From Address:", add)
while True:
choose = input("請選擇傳送或接受資料(1表示傳送資料,其他表示接收資料):")
if(choose == '1'):
sendData()
else:
recvData()
四、多程序拷貝檔案
```javascript
import threading
import os
import shutil
import multiprocessing
import time
# 建立拷貝任務
def copy_work(q,source_dir, dest_dir, file_name):
# 拼接檔名路徑
source_file_path = source_dir + '/' + file_name
dest_file_path = dest_dir + '/' + file_name
# 開啟目標檔案
with open(dest_file_path, 'wb') as dest_file:
# 開啟原始檔
with open(source_file_path, 'rb') as source_file:
# 寫入資料
while True:
source_file_data = source_file.read(1024)
if source_file_data:
dest_file.write(source_file_data)
else:
break
# 當結束該檔案複製時,把該檔名放入q的訊息佇列
q.put(file_name)
if __name__ == '__main__':
# 指定源目錄和目標目錄
source_dir = input("輸入源目錄:")
dest_dir = input("輸入目標目錄")
if os.path.exists(source_dir):
if os.path.exists(dest_dir):
# shutil.rmtree(dest_dir)
print("目標資料夾已存在,如果目錄記憶體在同名檔案,將覆蓋")
else:
# 建立目標資料夾
os.mkdir(dest_dir)
# 獲取源目錄檔案列表
source_file_list = os.listdir(source_dir)
print(source_file_list)
# 建立訊息佇列用於程序間通訊,主要用於顯示已經完成的程序,來實現複製資料夾的進度
q = multiprocessing.Queue()
for file_name in source_file_list:
copy_thread = multiprocessing.Process(target=copy_work, args=(q,source_dir, dest_dir, file_name))
copy_thread.start()
# 判斷拷貝百分比
all_file_num = len(source_file_list) # 計算所有檔案個數
copy_ok = 0
while True:
file_name = q.get()
# print("已經完成copy: %s" % file_name)
copy_ok += 1
# time.sleep(0.1)
print("\r拷貝的進度為:%.2f %%" % (copy_ok * 100 / all_file_num), end="")
if copy_ok >= all_file_num:
break
else:
print("請確認源目錄是否存在或者是否拼寫錯誤")