網路併發程式設計
阿新 • • 發佈:2022-01-17
一.殭屍程序與孤兒程序
1.1 殭屍程序
程序程式碼執行結束之後並沒有直接結束而是需要等待回收子程序資源才會結束
1.2 孤兒程序
主程式已經死亡,子程式還在執行
二.守護程序
守護程序即守護者某個程序,一旦該程序結束那麼也隨之結束
from multiprocessing import Process import time def test(name): print('總管:%s is running' % name) time.sleep(3) print('總管:%s is over' % name) if __name__ == '__main__': p = Process(target=test, args=('jason',)) p.daemon = True # 設定為守護程序(一定要放在start語句上方) p.start() print("皇帝jason壽終正寢") time.sleep(0.1)
三.互斥鎖
問題:併發情況下操作同一份資料,極其容易造成資料錯亂
解決措施:將併發變成序列,雖然降低了效率,但是提升了資料安全。
鎖就可以實現將併發變成序列的效果
鎖分為:①行鎖 ②表鎖
import json import time import random from multiprocessing importProcess, Lock # 查票 def find(name): with open(r'data.txt', 'r', encoding='utf8') as f: data_dict = json.load(f) ticket_num = data_dict.get('ticket_num') print('您好,%s,目前剩餘票量為%s' % (name, ticket_num)) # 買票 def buy(name): with open(r'data.txt', 'r', encoding='utf8') as f: data_dict= json.load(f) ticket_num = data_dict.get('ticket_num') # 建立一個延遲 time.sleep(1) if ticket_num > 0: data_dict['ticket_num'] -= 1 with open(r'data.txt', 'w', encoding='utf8') as f: json.dump(data_dict, f) print('%s購票成功' % name) else: print('沒票了') def run(name): find(name) mutex.acquire() buy(name) mutex.release() if __name__ == '__main__': mutex = Lock() for i in range(1, 11): p = Process(target=run, args=('使用者%s' % i,)) p.start()
四.訊息佇列
佇列就是先進先出
from multiprocessing import Queue q = Queue(5) # put存放資料 q.put(3333) q.put(111) q.put(22) # get拿取資料 順序就是先進先出 print(q.get()) print(q.get()) print(q.get()) print(q.full()) # 判斷佇列中資料是否滿了 try: print(q.get_nowait()) # 判斷佇列中是否還有資料 沒有資料立即報錯 except Exception as e: print(e)
五.ipc機制
from multiprocessing import Queue, Process def producer(q): q.put("子程序p放的資料") def consumer(q): print("子程序c取的資料", q.get()) if __name__ == '__main__': q = Queue() # 開一個程序 p = Process(target=producer, args=(q,)) c = Process(target=consumer, args=(q,)) p.start() c.start() # p.join() # c.join()
六.生產者消費者模型
生產者
負責產生資料(做包子的)
消費者
負責處理資料(吃包子的)
該模型需要解決恭喜不平衡現象
""" 生產者 負責產生資料(做包子的) 消費者 負責處理資料(吃包子的) 該模型需要解決恭喜不平衡現象 """ from multiprocessing import Queue, Process, JoinableQueue import time import random def producer(name, food, q): for i in range(10): print('%s 生產了 %s' % (name, food)) q.put(food) time.sleep(random.random()) def consumer(name, q): while True: data = q.get() print('%s 吃了 %s' % (name, data)) q.task_done() if __name__ == '__main__': # q = Queue() q = JoinableQueue() p1 = Process(target=producer, args=('大廚jason', '瑪莎拉', q)) p2 = Process(target=producer, args=('印度阿三', '飛餅', q)) p3 = Process(target=producer, args=('泰國阿人', '榴蓮', q)) c1 = Process(target=consumer, args=('班長阿飛', q)) p1.start() p2.start() p3.start() c1.daemon = True c1.start() p1.join() p2.join() p3.join() q.join() # 等待佇列中所有的資料被取乾淨 print('主')
七.執行緒理論
什麼是執行緒?
程序其實是一個資源單位 真正被CPU執行的其實是程序裡面的執行緒
"""
程序類似於是工廠 執行緒類似於是工廠裡面的一條條流水線
所有的程序肯定含有最少一個執行緒
"""
程序間資料預設是隔離的 但是同一個程序內的多個執行緒資料是共享的
八.開設執行緒的兩種方式
from threading import Thread import time # 第一種 def test(name): start_time = time.time() print('%s哈哈哈' % name) time.sleep(2) # io操作 print('%s哈哈哈,結束了' % name) t = Thread(target=test, args=('jason',)) t.start() print('主') # 第二種 class MyClass(Thread): def __init__(self, name): super().__init__() self.name = name def run(self): print('%s哈哈哈' % self.name) time.sleep(2) # io操作 print('%s哈哈哈,結束了' % self.name) a = MyClass('jason') a.start() a.join() print('主')
九.守護執行緒
""" 主執行緒的結束意味著整個程序的結束 所以主執行緒需要等待裡面所有非守護執行緒的結束才能結束 """ from threading import Thread from multiprocessing import Process import time def foo(): print(123) time.sleep(3) print("end123") def bar(): print(456) time.sleep(1) print("end456") if __name__ == '__main__': t1=Thread(target=foo) t2=Thread(target=bar) t1.daemon=True t1.start() t2.start() print("main-------")
十.執行緒資料共享
from threading import Thread money = 100 def test(): global money money = 999 t = Thread(target=test) t.start() t.join() print(money)
十一.執行緒互斥鎖
from threading import Thread, Lock from multiprocessing import Lock import time num = 100 def test(mutex): global num mutex.acquire() # 先獲取num的數值 tmp = num # 模擬延遲效果 time.sleep(0.1) # 修改數值 tmp -= 1 num = tmp mutex.release() t_list = [] mutex = Lock() for i in range(100): t = Thread(target=test, args=(mutex,)) t.start() t_list.append(t) # 確保所有的子執行緒全部結束 for t in t_list: t.join() print(num)
十二.tcp服務端實行多執行緒併發
服務端
import socket from threading import Thread from multiprocessing import Process server = socket.socket() server.bind(('127.0.0.1', 8081)) server.listen(5) # 半連線池 def talk(sock): while True: try: data = sock.recv(1024) # 接收訊息 if len(data) == 0: continue else: print(data.decode('utf8')) sock.send(data + b'jack, my_lover') except ConnectionResetError as e: print(e) break while True: sock, address = server.accept() print(address) t = Thread(target=talk, args=(sock,)) t.start()View Code
客戶端
import socket from threading import Thread from multiprocessing import Process client = socket.socket() client.connect(('127.0.0.1', 8081)) while True: msg = input('你輸入傳送內容>>>:').strip() if len(msg) == 0: print('格式錯誤,重新輸入') continue client.send(msg.encode('utf8')) # 傳送訊息 data = client.recv(1024) # 接受訊息 print(data.decode('utf8'))View Code