day34 執行緒池 協程
執行緒的其他方法
Thread例項物件的方法 # isAlive(): 返回執行緒是否是活動的。 # getName(): 返回執行緒名。 # setName(): 設定執行緒名。 threading模組提供的一些方法: # threading.currentThread(): 返回當前的執行緒變數物件。 # threading.enumerate(): 返回一個包含正在執行的執行緒的list。正在執行指執行緒啟動後、結束前,不包括啟動前和終止後的執行緒。 # threading.activeCount(): 返回正在執行的執行緒數量,與len(threading.enumerate())有相同的結果其他方法
import threading import time from threading import Thread,current_thread def f1(n): time.sleep(1) print('子執行緒物件', current_thread()) # <Thread(Thread-1, started 123145336967168)> print('子執行緒名稱', current_thread().getName()) # 當前執行緒物件 Thread-1 print栗子('子執行緒ID', current_thread().ident) # 123145336967168 print('%s號執行緒任務'%n) if __name__ == '__main__': t1 = Thread(target=f1,args=(1,)) t1.start() print('主執行緒物件',current_thread()) # <_MainThread(MainThread, started 140734833878464)> print('主執行緒名稱',current_thread().getName()) #當前執行緒物件(是主執行緒物件) MainThread print('主執行緒ID',current_thread().ident) # 當前執行緒ID 140734833878464 print(threading.enumerate()) # [<_MainThread(MainThread, started 140734833878464)>, <Thread(Thread-1, started 123145336967168)>] print(threading.active_count()) # 2 """ 結果: 主執行緒物件 <_MainThread(MainThread, started 140734833878464)> 主執行緒名稱 MainThread 主執行緒ID 140734833878464 [<_MainThread(MainThread, started 140734833878464)>, <Thread(Thread-1, started 123145336967168)>] 2 子執行緒物件 <Thread(Thread-1, started 123145336967168)> 子執行緒名稱 Thread-1 子執行緒ID 123145336967168 1號執行緒任務 # 小結: threading.current_thread() <==等效於==> Thread(target=f1) #這兩個等效的前提是: 左邊 的位置要跟 右邊target(目標函式)所在位置 一樣,即左邊的是獲取當前位置的執行緒變數物件,右邊的是在target(目標函式)所在位置建立執行緒物件. """
執行緒佇列 (重點)
執行緒佇列,不需要從threading模組裡面匯入,直接import queue就可以了,這是python自帶的
queue佇列 :使用import queue,用法與程序佇列 multiprocessing.Queue 一樣,也有以下方法:
# put,put_nowait,get,get_nowait,full,empty,qsize q = Queue(5) # 5是size q.put() #放入資料,滿了會等待(阻塞) q.get() #獲取資料,沒有資料了會等待(阻塞) q.qsize() # 當前放進去的元素的個數 q.empty() # 是否為空,不可靠(因為多執行緒) q.full() # 是否滿了,不可靠(因為多執行緒) q.put_nowait() #新增資料,不等待,但是佇列滿了報錯 q.get_nowait() #獲取資料,不等待,但是佇列空了報錯
class queue.
Queue
(maxsize=0) #先進先出(FIFO: fisrt in fisrt out)
import queue # 執行緒中的佇列使用的是這個,等效於程序中的佇列 put,put_nowait,get,get_nowait,full,empty q = queue.Queue(4) # FIFO先進先出 first in first out q.put(1) q.put(2) print(q.full()) # 不滿 # print('當前佇列內容的長度',q.qsize()) q.put(3) print(q.full()) # 滿 # q.put(4) # 不報錯,會阻塞 print(q.qsize()) try: q.put_nowait(4) # 報錯queue.Full except Exception: print('queue full') print(q.get()) print(q.get()) print(q.empty()) # 不空 print(q.get()) print(q.empty()) # 空 print(q.get()) # 不報錯,會阻塞 try: print(q.get_nowait()) # 報錯queue.Empty except Exception: print('queue empty')先進先出佇列
class queue.
LifoQueue
(maxsize=0) #先進後出佇列(或者後進先出(LIFO: last in fisrt out)),類似於棧
q = queue.LifoQueue(3) # Lifo q.put(1) q.put(2) print(q.full()) # 不滿 # print('當前佇列內容的長度',q.qsize()) q.put(3) print(q.full()) # 滿 # q.put(4) # 不報錯,會阻塞 print(q.qsize()) try: q.put_nowait(4) # 報錯queue.Full except Exception: print('queue full') print(q.get()) print(q.get()) print(q.empty()) # 不空 print(q.get()) print(q.empty()) # 空 print(q.get()) # 不報錯,會阻塞 try: print(q.get_nowait()) # 報錯queue.Empty except Exception: print('queue empty')後進先出佇列
class queue.
PriorityQueue
(maxsize=0) #優先順序的佇列(儲存資料時可設定優先順序)
# 優先順序佇列 PriorityQueue # put的資料是一個元組,元組的第一個引數是優先順序數字(通常是數字,也可以是非數字之間的比較),數字越小優先順序越高,越先被get拿到被取出來,第二個引數是put進去的值(可以是任意的資料型別) # 如果說優先順序(第一個引數)相同,那麼比較值(第二個引數),值必須是相同的資料型別(不包括字典),否則報錯 # 比較第二個引數: # 如果第二個引數(或者其引數的元素)是數字: 數字==直接拿整體的數字==>比較大小, # 如果第二個引數(或者其引數的元素)是字串:字串=依次取到每個字元=>比較每個字元的ASCII碼. q = queue.PriorityQueue(10) q.put((-5, 'alex')) # 放入元組,第一個元素是優先順序(可以為負數,越小,優先順序越高),第二個是真正的資料(資料型別隨意) q.put((2, 'blex')) q.put((3, 'clex')) q.put((3, '111')) print(q.get()) print(q.get()) print(q.get()) print(q.get()) print('=======================') q.put(('x', 123)) q.put(('y', 345)) print(q.get()) print(q.get()) print('=======================') """ ('x', 123) ('y', 345) """ q.put((5, 'alex')) # 放入元組,第一個元素是優先順序(可以為負數,越小,優先順序越高),第二個是真正的資料(資料型別隨意) q.put((2, 1)) q.put((3, (1,))) # q.put((7, {1,2})) # 優先順序相同資料型別不同,報錯TypeError: '<' not supported between instances of 'dict' and 'set' q.put((7, {1:2})) q.put((7, {1:'a'})) # 優先順序相同資料型別都是字典,報錯TypeError: '<' not supported between instances of 'dict' and 'dict' print(q.get()) print(q.get()) print(q.get()) print(q.get()) print('=======================')優先順序佇列
執行緒池(重點)
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
統一使用方式,使用threadPollExecutor和ProcessPollExecutor的方式一樣,而且只要通過這個concurrent.futures匯入就可以直接用他們兩個了
concurrent.futures模組提供了高度封裝的非同步呼叫介面 ThreadPoolExecutor:執行緒池,提供非同步呼叫 ProcessPoolExecutor: 程序池,提供非同步呼叫 Both implement the same interface, which is defined by the abstract Executor class. 兩者實現相同的介面,該介面由抽象Executor類定義。 #2 基本方法 #submit(fn, *args, **kwargs) 非同步提交任務(萬能傳參,傳入的實參可以是任意資料型別,注意fn的形引數量要和這裡的實引數量對應) #map(func, *iterables, timeout=None, chunksize=1) 取代for迴圈submit的操作(引數1是函式,引數2是可迭代物件) #shutdown(wait=True) ==>close()+join() 相當於程序池的multiprocessing.Pool().close()+multiprocessing.Pool().join()操作 wait=True,等待池內所有任務執行完畢回收完資源後才繼續 wait=False,立即返回,並不會等待池內的任務執行完畢 但不管wait引數為何值,整個程式都會等到所有任務執行完畢 submit和map必須在shutdown之前 #result(timeout=None) 取得結果(相當於pool.get()) #add_done_callback(fn) 回撥函式(功能類似於pool的callback,但是顯然用法不同) """ multiprocessing.Pool和concurrent.futures.ThreadPoolExecutor,ProcessPoolExecutor中回撥函式的區別: 程序的回撥函式res = pool.apply_async(f1,args=(5,),callback=call_back_func) (這裡的callback是預設的關鍵字,call_back_func是自定義的回撥函式名)==>作為非同步物件的引數呼叫 執行緒的回撥函式res = tp.submit(f1,11,12).add_done_callback(f2) (這裡的add_done_callback是預設的回撥函式名,f2是自定義的回撥函式)==>作為非同步物件的方法呼叫) """concurrent.futures方法
上栗子:
import time import os import threading from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor def func(n): time.sleep(2) print('%s列印的:'%(threading.get_ident()),n) return n*n tpool = ThreadPoolExecutor(max_workers=5) #預設一般起執行緒的資料不超過CPU個數*5 # tpool = ProcessPoolExecutor(max_workers=5) #程序池的使用只需要將上面的ThreadPoolExecutor改為ProcessPoolExecutor就行了,其他都不用改 #非同步執行 t_lst = [] for i in range(5): t = tpool.submit(func,i) #提交執行函式,返回一個結果物件,i作為任務函式的引數 def submit(self, fn, *args, **kwargs): 可以傳任意形式的引數 t_lst.append(t) # # print(t.result()) #這個返回的結果物件t,不能直接去拿結果,不然又變成串行了,可以理解為拿到一個號碼,等所有執行緒的結果都出來之後,我們再去通過結果物件t獲取結果 tpool.shutdown() #起到原來的close阻止新任務進來 + join的作用,等待所有的執行緒執行完畢 print('主執行緒') for ti in t_lst: print('>>>>',ti.result()) # 我們還可以不用shutdown(),用下面這種方式 # while 1: # for n,ti in enumerate(t_lst): # print('>>>>', ti.result(),n) # time.sleep(2) #每個兩秒去去一次結果,哪個有結果了,就可以取出哪一個,想表達的意思就是說不用等到所有的結果都出來再去取,可以輪詢著去取結果,因為你的任務需要執行的時間很長,那麼你需要等很久才能拿到結果,通過這樣的方式可以將快速出來的結果先拿出來。如果有的結果物件裡面還沒有執行結果,那麼你什麼也取不到,這一點要注意,不是空的,是什麼也取不到,那怎麼判斷我已經取出了哪一個的結果,可以通過列舉enumerate來搞,記錄你是哪一個位置的結果物件的結果已經被取過了,取過的就不再取了 #結果分析: 列印的結果是沒有順序的,因為到了func函式中的sleep的時候執行緒會切換,誰先列印就沒準兒了,但是最後的我們通過結果物件取結果的時候拿到的是有序的,因為我們主執行緒進行for迴圈的時候,我們是按順序將結果物件新增到列表中的。 # 37220列印的: 0 # 32292列印的: 4 # 33444列印的: 1 # 30068列印的: 2 # 29884列印的: 3 # 主執行緒 # >>>> 0 # >>>> 1 # >>>> 4 # >>>> 9 # >>>> 16栗子
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import time def f1(n,s): # 要與 萬能傳參 的引數數量一致 time.sleep(1) # print(n,s) return n * n if __name__ == '__main__': tp = ThreadPoolExecutor(4) # 執行緒 預設的執行緒個數是cpu個數 * 5 # tp = ProcessPoolExecutor(4) # 程序 預設的程序個數是cpu個數 這兩個的方法一致 # tp.map(f1, range(10)) # 非同步提交任務,引數是(任務名,可迭代物件) res_lis = [] for i in range(10): res = tp.submit(f1,i,'baobao') # submit是給執行緒池非同步提交任務,萬能傳參 # print(res) # <Future at 0x10617a208 state=running> res_lis.append(res) for t in res_lis: # 4個4個的列印 print(t.result()) tp.shutdown() # ==等效於==> close + join 主執行緒等待所有提交給執行緒池的任務全部執行完畢 for t in res_lis: # 全部一起列印 print(t.result()) # 結果物件.result,#和get方法一樣,如果沒有結果,會等待,阻塞程式 print('主執行緒') """ 只需要將這一行程式碼改為下面這一行就可以了,其他的程式碼都不用變 tpool = ThreadPoolExecutor(max_workers=5) #預設一般起執行緒的資料不超過CPU個數*5 # tpool = ProcessPoolExecutor(max_workers=5)#預設一般起執行緒的資料不超過CPU個數 你就會發現為什麼將執行緒池和程序池都放到這一個模組裡面了,因為用法一樣,所以不管是執行緒池還是程序池,更推薦使用這個from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor """ThreadPoolExecutor的簡單使用
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor def func(n): time.sleep(2) return n*n def call_back(m): print('結果為:%s'%(m.result())) # 注意回撥函式拿到的是執行緒(程序)物件,想要拿到值需要呼叫result方法 tpool = ThreadPoolExecutor(max_workers=5) t_lst = [] for i in range(5): t = tpool.submit(func,i).add_done_callback(call_back) """ 結果為:0 結果為:1 結果為:4 結果為:9 結果為:16 """回撥函式簡單應用
協程
協程:是單執行緒下的併發,又稱微執行緒,纖程。英文名Coroutine。一句話說明什麼是執行緒:協程是一種使用者態的輕量級執行緒,即協程是由使用者程式自己控制排程的。
需要強調的是:
#1. python的執行緒屬於核心級別的,即由作業系統控制排程(如單執行緒遇到io或執行時間過長就會被迫交出cpu執行許可權,切換其他執行緒執行) #2. 單執行緒內開啟協程,一旦遇到io,就會從應用程式級別(而非作業系統)控制切換,以此來提升效率(!!!非io操作的切換與效率無關)
作業系統控制執行緒的切換 <==對比==> 使用者在單執行緒內控制協程的切換
#1. 協程的切換開銷更小,屬於程式級別的切換,作業系統完全感知不到,因而更加輕量級 #2. 單執行緒內就可以實現併發的效果,最大限度地利用cpu協程優點
#1. 協程的本質是單執行緒下,無法利用多核,可以是一個程式開啟多個程序,每個程序內開啟多個執行緒,每個執行緒內開啟協程 #2. 協程指的是單個執行緒,因而一旦協程出現阻塞,將會阻塞整個執行緒協程缺點
# 1.必須在只有一個單執行緒裡實現併發 # 2.修改共享資料不需加鎖 # 3.使用者程式裡自己儲存多個控制流的上下文棧 # 4.附加:一個協程遇到IO操作自動切換到其它協程(如何實現檢測IO,yield、greenlet都無法實現,就用到了gevent模組(select機制))協程特點總結
協程就是告訴Cpython直譯器,你不是nb嗎,不是搞了個GIL鎖嗎,那好,我就自己搞成一個執行緒讓你去執行,省去你切換執行緒的時間,我自己切換比你切換要快很多,避免了很多的開銷,對於單執行緒下,我們不可避免程式中出現io操作,但如果我們能在自己的程式中(即使用者程式級別,而非作業系統級別)控制單執行緒下的多個任務能在一個任務遇到io阻塞時就切換到另外一個任務去計算,這樣就保證了該執行緒能夠最大限度地處於就緒態,即隨時都可以被cpu執行的狀態,相當於我們在使用者程式級別將自己的io操作最大限度地隱藏起來,從而可以迷惑作業系統,讓其看到:該執行緒好像是一直在計算,io比較少,從而更多的將cpu的執行許可權分配給我們的執行緒。
協程的本質就是在單執行緒下,由使用者自己控制一個任務遇到io阻塞了就切換另外一個任務去執行,以此來提升效率。為了實現它,我們需要找尋一種可以同時滿足以下條件的解決方案:
#1. 可以控制多個任務之間的切換,切換之前將任務的狀態儲存下來,以便重新執行時,可以基於暫停的位置繼續執行。 #2. 作為1的補充:可以檢測io操作,在遇到io操作的情況下才發生切換
生成器
併發的本質:任務切換+儲存狀態,yield本身就是一種在單執行緒下可以儲存任務執行狀態的方法,
#1 yield可以儲存狀態,yield的狀態儲存與作業系統的儲存執行緒狀態很像,但是yield是程式碼級別控制的,更輕量級 #2 send可以把一個函式的結果傳給另外一個函式,以此實現單執行緒內程式之間的切換
import time #基於yield併發執行,多工之間來回切換,這就是個簡單的協程的體現,但是他能夠節省I/O時間嗎?不能,yield不能檢測IO,不能實現遇到IO自動切換 def f1(): for i in range(3): time.sleep(0.5) # 發現什麼?只是進行了切換,但是並沒有節省I/O時間 print('f1>>',i) # yield def f2(): # g = f1() for i in range(3): time.sleep(0.5) print('f2>>', i) # next(g) #不寫yield,下面兩個任務是執行完func1裡面所有的程式才會執行func2裡面的程式,有了yield,我們實現了兩個任務的切換+儲存狀態 #基於yield儲存狀態,實現兩個任務直接來回切換,即併發的效果 #PS:如果每個任務中都加上列印,那麼明顯地看到兩個任務的列印是你一次我一次,即併發執行的. f1() f2() """ f1>> 0 f1>> 1 f1>> 2 f2>> 0 f2>> 1 f2>> 2 有了yield: f2>> 0 f1>> 0 f2>> 1 f1>> 1 f2>> 2 f1>> 2生成器版協程
greenlet模組
#安裝==>在終端輸入以下程式碼 pip3 install greenlet
import time from greenlet import greenlet # 真正的協程模組就是使用greenlet完成的切換 def f1(s): print('第一次f1==>'+s) g2.switch('taibai') #切換到g2這個物件的任務去執行 time.sleep(1) print('第一次f1==>'+s) g2.switch() def f2(s): print('第一次f2==>'+s) g1.switch() time.sleep(1) print('第二次f2==>'+s) g1 = greenlet(f1) #例項化一個greenlet物件,並將任務名稱作為引數傳進去 g2 = greenlet(f2) g1.switch('alex') #執行g1物件裡面的任務,可以在第一次switch時傳入引數,以後都不需要 """ greenlet只是提供了一種比generator更加便捷的切換方式,當切到一個任務執行時如果遇到io,那就原地阻塞,仍然是沒有解決遇到IO自動切換來提升效率的問題。 """greenlet版協程
一般在工作中我們都是程序+執行緒+協程的方式來實現併發,以達到最好的併發效果,如果是4核的cpu,一般起5個程序,每個程序中20個執行緒(5倍cpu數量),每個執行緒可以起500個協程,大規模爬取頁面的時候,等待網路延遲的時間的時候,我們就可以用協程去實現併發。 併發數量 = 5 * 20 * 500 = 50000個併發,這是一般一個4cpu的機器最大的併發數。nginx在負載均衡的時候最大承載量就是5w個。
gevent模組(重點)
#安裝==>在終端輸入以下程式碼 pip3 install gevent
from gevent import monkey;monkey.patch_all() # 必須寫在最上面,這句話後面的所有阻塞全部能夠識別了 import gevent import time import threading # 遇到IO阻塞時會自動切換任務 def f1(name): print(f'{name}==第一次f1') print(threading.current_thread().getName()) # DummyThread-1 假執行緒,虛擬執行緒 # gevent.sleep(1) # gevent預設可以識別的io阻塞 time.sleep(2) # 加上mokey就能夠識別到time模組的sleep了 print(f'{name}==第二次f1') return name def f2(name): print(threading.current_thread().getName()) # DummyThread-2 print(f'{name}==第一次f2') # gevent.sleep(2) time.sleep(2) # 來回切換,直到一個I/O的時間結束,這裡都是我們的gevent做得,不再是控制不了的作業系統了。 print(f'{name}==第二次f2') s = time.time() g1 = gevent.spawn(f1,'alex') #非同步提交了f1任務 g2 = gevent.spawn(f2,name='egon') #建立一個協程物件g2,spawn括號內第一個引數是函式名,如f2,後面可以有多個引數,可以是位置實參或關鍵字實參,都是傳給函式f2的,spawn是非同步提交任務 # g1.join() # 等待g1結束,上面只是建立協程物件,這個join才是去執行 # g2.join() # 等待g2結束 有人測試的時候會發現,不寫第二個join也能執行g2,是的,協程幫你切換執行了,但是你會發現,如果g2裡面的任務執行的時間長,但是不寫join的話,就不會執行完等到g2剩下的任務了 gevent.joinall([g1,g2]) # 這裡等價於上述join兩步合作一步 print(g1.value)#拿到func1的返回值 e = time.time() print('執行時間:',e-s) # 測試執行時間 print('主程式任務') """ 結果: alex==第一次f1 DummyThread-1 DummyThread-2 egon==第一次f2 alex==第二次f1 egon==第二次f2 alex 執行時間: 2.004991054534912 主程式任務 """ ''' # spawn是類方法,引數是萬能的 @classmethod def spawn(cls, *args, **kwargs): # 萬能形參==>實參可以隨便傳入 g = cls(*args, **kwargs) g.start() return g ''' # 我們可以用threading.current_thread().getName()來檢視每個g1和g2,檢視的結果為DummyThread-n,即假執行緒,虛擬執行緒,其實都在一個執行緒裡面 # 程序執行緒的任務切換是由作業系統自行切換的,你自己不能控制 # 協程是通過自己的程式(程式碼)來進行切換的,自己能夠控制,只有遇到協程模組能夠識別的IO操作的時候,程式才會進行任務切換,實現併發效果,如果所有程式都沒有IO操作,那麼就基本屬於序列執行了。gevent版協程
from gevent import spawn,joinall,monkey;monkey.patch_all() import time def task(pid): """ Some non-deterministic task """ time.sleep(0.5) print('Task %s done' % pid) def synchronous(): for i in range(10): task(i) def asynchronous(): g_l=[spawn(task,i) for i in range(10)] joinall(g_l) if __name__ == '__main__': print('Synchronous:') synchronous() print('Asynchronous:') asynchronous() #上面程式的重要部分是將task函式封裝到greenlet內部執行緒的gevent.spawn。 初始化的greenlet列表存放在陣列threads中,此陣列被傳給gevent.joinall 函式,後者阻塞當前流程,並執行所有給定的greenlet。執行流程只會在 所有greenlet執行完後才會繼續向下走。 """ # 結果: Synchronous:同步,一個一個的列印 Task 0 done Task 1 done Task 2 done Task 3 done Task 4 done Task 5 done Task 6 done Task 7 done Task 8 done Task 9 done Asynchronous:非同步,一起列印 Task 0 done Task 1 done Task 2 done Task 3 done Task 4 done Task 5 done Task 6 done Task 7 done Task 8 done Task 9 done """協程:同步非同步對比
今日內容回顧:
1,執行緒的其他方法
Threading.current_thread() #當前執行緒物件
GetName() 獲取執行緒名
Ident 獲取執行緒id
Threading.Enumerate() #當前正在執行的執行緒物件的一個列表
Threading.active_count() #當前正在執行的執行緒數量
2,執行緒佇列(重點)
Import queue
先進先出佇列:queue.Queue(3)
先進後出\後進先出佇列:queue.LifoQueue(3)
優先順序佇列:queue.priorityQueue(3)
Put的資料是一個元組,元組的第一個引數是優先順序數字,數字越小優先順序越高,越先被get到被取出來,第二個引數是put進去的值,如果說優先順序相同,那麼值別忘了應該是相同的資料型別,字典不行
3,執行緒池
From concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
P = ThreadPoolExecutor(4) #預設的執行緒個數是cpu個數 * 5
P = ProcessPoolExecutor(4) #預設的程序個數是cpu個數
P.map(f1,可迭代的物件) #非同步執行
Def f1(n1,n2):
Print(n1,n2)
P.submit(f1,11,12) #非同步提交任務
Res = P.submit(f1,11,12)
Res.result() #和get方法一樣,如果沒有結果,會等待,阻塞程式
Shutdown() #close+join,鎖定執行緒池,等待執行緒池中所有已經提交的任務全部執行完畢
今日作業
- 多執行緒實現 一個socket併發聊天,就是一個服務端同時與多個客戶端進行溝通
- 寫一個簡易的socketserver
- 通過執行緒池做爬蟲,通過回撥函式來清洗爬取回來的資料,簡單寫,就是將爬取回來的網頁內容,通過正則來匹配一些其中的內容,匹配其中的連結網址
明天默寫:
- 執行緒池的方法
- Gevent模組的寫法