python Event事件、程序池與執行緒池、協程解析
阿新 • • 發佈:2020-01-09
Event事件
用來控制執行緒的執行
出現e.wait(),就會把這個執行緒設定為False,就不能執行這個任務;
只要有一個執行緒出現e.set(),就會告訴Event物件,把有e.wait的使用者全部改為True,剩餘的任務就會立馬去執行。由一些執行緒去控制另一些執行緒,中間通過Event。
from threading import Event from threading import Thread import time # 呼叫Event例項化出物件 e = Event() # # # 若該方法出現在任務中,則為False,阻塞 # e.wait() # False # # 若該方法出現在任務中,則將其他執行緒的False改為True,進入就緒態和執行態 # e.set() # True def light(): print('紅燈亮...') time.sleep(5) # 應該發出訊號,告訴其他執行緒準備執行 e.set() # 將car中的False變為True print('綠燈亮...') def car(name): print('正在等紅燈...') # 讓所有汽車任務進入阻塞態 e.wait() # False print(f'{name}正在加速飄逸...') # 讓一個light執行緒控制多個car執行緒 t = Thread(target=light) t.start() for i in range(10): t = Thread(target=car,args=(f'汽車{i}號',)) t.start()
程序池與執行緒池
程序池與執行緒池是用來控制當前程式允許建立(程序/執行緒)的數量
作用:保證在硬體允許的範圍內建立(程序/執行緒)的數量
執行緒池使用一:
from concurrent.futures import ThreadPoolExecutor import time pool = ThreadPoolExecutor(5) # 5代表只能開啟5個程序,不加預設使用cpu的程序數 # ThreadPoolExecutor(5) # 5代表只能開啟5個執行緒 # pool.submit() #非同步提交任務, 括號裡傳函式地址 def task(): print('執行緒任務開始了...') time.sleep(1) print('執行緒任務結束了...') for line in range(5): pool.submit(task)
使用二:
from concurrent.futures import ThreadPoolExecutor import time pool = ThreadPoolExecutor(5) # 5代表只能開啟5個程序,不加預設使用cpu的程序數 # ThreadPoolExecutor(5) # 5代表只能開啟5個執行緒 # pool.submit() #非同步提交任務, 括號裡傳函式地址 def task(): print('執行緒任務開始了...') time.sleep(1) print('執行緒任務結束了...') return 123 # 回撥函式 def call_back(res): print(type(res)) res2 = res.result() # 注意:賦值操作不要與接收的res同名 print(res2) for line in range(5): pool.submit(task).add_done_callback(call_back)
pool.shutdown() 會讓所有執行緒池的任務結束後,才往下執行程式碼
多執行緒爬取梨視訊
利用requests模組,封裝底層socket套接字
- 主頁中獲取所有視訊id號,拼接視訊詳情頁url
- 在視訊詳情頁中獲取真實視訊url srcUrl=
- 往真實視訊url地址傳送請求獲取 視訊 二進位制資料
- 最後把視訊二進位制資料儲存到本地
協程
- 程序: 資源單位
- 執行緒: 執行單位
- 協程: 在單執行緒下實現併發
注意: 協程不是作業系統資源,目的是讓單執行緒實現併發
協程目的
- 作業系統:使用多道技術,切換 + 儲存狀態,一個是遇到IO, 另一個是CPU執行時間過長
- 協程:通過手動模擬作業系統 “多道計數”, 實現 切換 + 儲存狀態
- 手動實現,遇到IO切換,欺騙作業系統誤以為沒有IO操作
- 單執行緒時,遇到IO,就切換 + 儲存狀態
- 單執行緒時,對於計算密集型,來回切換 + 儲存狀態反而效率更低
優點:在IO密集型的情況下,會提高效率
缺點:若在計算密集型的情況下,來回切換,反而效率更低
import time def func1(): for i in range(10000000): i+1 def func2(): for i in range(10000000): i+1 start = time.time() func1() func2() stop = time.time() print(stop - start) # 1.0312113761901855 # 基於yield實現併發 在計算密集型的情況下效率更低 def func1(): while True: 10000000+1 yield def func2(): g = func1() for i in range(10000000): i+1 next(g) # 每次執行next相當於切換到func1下面 start = time.time() func2() stop = time.time() print(stop - start) # 1.3294126987457275
gevent
gevent是一個第三方模組,可以幫你監聽IO操作,並切換
使用gevent的目的:在單執行緒下實現,遇到IO就會 儲存狀態 + 切換
import time from gevent import monkey monkey.patch_all() # 可以監聽該程式下所有的IO操作 from gevent import spawn,joinall # 用於做切換 + 儲存狀態 def func1(): print('1') time.sleep(1) # IO操作 def func2(): print('2') time.sleep(3) def func3(): print('3') time.sleep(5) start = time.time() s1 = spawn(func1) s2 = spawn(func2) s3 = spawn(func3) s1.join() # 傳送訊號,相當於等待自己(在單執行緒的情況下) s2.join() s3.join() # joinall((s1,s2,s3)) # 一個個執行很麻煩,可以用joinall把這些全部裝進去 end = time.time() print(end - start) # 5.006161451339722
TCP服務端socket套接字實現協程
服務端:
from gevent import monkey from gevent import spawn import socket monkey.patch_all() server = socket.socket() server.bind(('127.0.0.1',9999)) server.listen(5) def task(conn): while True: try: data = conn.recv(1024) if len(data) == 0: break print(data.decode('utf-8')) send_data = data.upper() conn.send(send_data) except Exception: break conn.close() def server2(): while True: conn,addr = server.accept() print(addr) spawn(task,conn) if __name__ == '__main__': s = spawn(server2) s.join()
客戶端:
import socket from threading import Thread,current_thread def client(): client = socket.socket() client.connect(('127.0.0.1',9999)) number = 0 while True: send_data = f'{current_thread().name} {number}' client.send(send_data.encode('utf-8')) data = client.recv(1024) print(data.decode('utf-8')) number += 1 for i in range(400): t = Thread(target=client) t.start()
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支援我們。