1. 程式人生 > 其它 >【Python網路程式設計】UDP聊天、TCP檔案下載、多執行緒UDP聊天器、多程序拷貝檔案

【Python網路程式設計】UDP聊天、TCP檔案下載、多執行緒UDP聊天器、多程序拷貝檔案

技術標籤:「 後端 」專欄# 【Python】# 【計算機網路】socket網路

一、UDP聊天

客戶端

import socket
udp_socket = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
#繫結地址
server_addr = ("",8080)
udp_socket.bind(server_addr)
for x in range(100):
   #列印接收的內容
   recv_content,recv_addr = udp_socket.recvfrom(1024)
   print("%s:%s"
%(recv_addr,(recv_content.decode("utf-8"))))

伺服器

import socket

#建立套間字
udp_broadcast = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)

udp_broadcast.setsockopt(socket.SOL_SOCKET,socket.SO_BROADCAST,1)

for x in range(100):
   #傳送訊息
   send_content = input("傳送的廣播內容:")
   udp_broadcast.
sendto(send_content.encode("utf-8"), ("<broadcast>", 8080)) udp_broadcast.close()

二、TCP檔案下載

伺服器

import socket
s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.bind(('127.0.0.1',8059))
s.listen(5)
while True:
    conn,add = s.accept()
    print(add)
    while True:
        data =
conn.recv(1024) file_name = data.decode('utf-8') print("客戶端列印的檔名稱為:",file_name) try : with open("./"+file_name,"rb") as f: file_date = f.read() except: print("伺服器沒有該檔案:",file_name) if(file_date): print("伺服器存在該檔案") conn.send(file_date) file_date = ""

客戶端

import socket
s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.connect(('127.0.0.1',8059))
while True:
    send_data = input("請輸入要下載檔名稱")
    s.send(send_data.encode('utf-8'))
    recvdata = s.recv(1024)
    if(recvdata):
        print("存在該檔案並已經下載")
        with open(send_data,"wb") as f:
            f.write(recvdata)
    else:
        print("不存在該檔案")

三、多執行緒UDP聊天

伺服器

import socket
s = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
s.bind(('127.0.0.1',9527))
while True:
    data,add = s.recvfrom(1024)
    print("Message:",data.decode('utf-8'),"From Address:",add)
    send_data = input("伺服器傳送資料:")
    s.sendto(send_data.encode('utf-8'),add)

客戶端

import socket
s = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
print("Client is running")

def sendData():
    send_data = input("傳送資料:")
    s.sendto(send_data.encode('utf-8'), ('127.0.0.1', 9527))

def recvData():
    data,add = s.recvfrom(1024)
    print("Message:", data.decode('utf-8'), "From Address:", add)


while True:
   choose = input("請選擇傳送或接受資料(1表示傳送資料,其他表示接收資料):")
   if(choose == '1'):
       sendData()
   else:
       recvData()

四、多程序拷貝檔案

```javascript
import threading
import os
import shutil
import multiprocessing
import time

# 建立拷貝任務
def copy_work(q,source_dir, dest_dir, file_name):
    # 拼接檔名路徑
    source_file_path = source_dir + '/' + file_name
    dest_file_path = dest_dir + '/' + file_name
    # 開啟目標檔案
    with open(dest_file_path, 'wb') as dest_file:
        # 開啟原始檔
        with open(source_file_path, 'rb') as source_file:
            # 寫入資料
            while True:
                source_file_data = source_file.read(1024)
                if source_file_data:
                    dest_file.write(source_file_data)
                else:
                    break
    # 當結束該檔案複製時,把該檔名放入q的訊息佇列
    q.put(file_name)

if __name__ == '__main__':
    # 指定源目錄和目標目錄
    source_dir = input("輸入源目錄:")
    dest_dir = input("輸入目標目錄")
    if os.path.exists(source_dir):
        if os.path.exists(dest_dir):
            # shutil.rmtree(dest_dir)
            print("目標資料夾已存在,如果目錄記憶體在同名檔案,將覆蓋")
        else:
            # 建立目標資料夾
            os.mkdir(dest_dir)
        # 獲取源目錄檔案列表
        source_file_list = os.listdir(source_dir)
        print(source_file_list)
        # 建立訊息佇列用於程序間通訊,主要用於顯示已經完成的程序,來實現複製資料夾的進度
        q = multiprocessing.Queue()
        for file_name in source_file_list:
            copy_thread = multiprocessing.Process(target=copy_work, args=(q,source_dir, dest_dir, file_name))
            copy_thread.start()
        # 判斷拷貝百分比
        all_file_num = len(source_file_list)  # 計算所有檔案個數
        copy_ok = 0
        while True:
            file_name = q.get()
            # print("已經完成copy: %s" % file_name)
            copy_ok += 1
            # time.sleep(0.1)
            print("\r拷貝的進度為:%.2f %%" % (copy_ok * 100 / all_file_num), end="")

            if copy_ok >= all_file_num:
                break
    else:
        print("請確認源目錄是否存在或者是否拼寫錯誤")