redis的Linux系統安裝與配置、redis的api使用、高階用法之慢查詢、pipline事物
一、什麼是執行緒(thread)
執行緒是作業系統能夠進行運算排程的最小單位。它被包含在程序之中,是程序中的實際運作單位。一個執行緒指的是程序中一個單一順序的控制流,一個程序中可以包含多個執行緒,每條執行緒並行執行不同的任務。下面,我們來舉一個例子來說明執行緒的工作模式:
- 假設你正在讀一本書,你現在想休息一下,但是你想在回來繼續閱讀的時候從剛剛停止閱讀的地方繼續讀。實現這一點的一種方法是記下頁碼、行號和字號。閱讀一本書的執行環境是這三個數字。
- 如果你有一個室友,她也在用同樣的方法閱讀這本書,她可以在你不用的時候拿起書,從她停下來的地方繼續讀。然後你可以把它拿回去,從你標記停下的地方繼續閱讀。
執行緒以相同的方式工作。CPU給你的錯覺是它在同一時間做多個計算。它通過在每次計算上花費一點時間來實現這一點。它可以這樣做,因為它對每個計算都有一個執行上下文。就像您可以與朋友共享一本書一樣,許多工也可以共享一個CPU。當然,真正地同時執行多執行緒需要多核CPU才可能實現。
多執行緒多用於處理IO密集型任務頻繁寫入讀出,cpu負責排程,消耗的是磁碟空間。
二、什麼是程序(process)
程式的執行例項稱為程序。對於作業系統來說,一個任務就是一個程序(Process),比如開啟一個瀏覽器就是啟動一個瀏覽器程序,開啟一個記事本就啟動了一個記事本程序,開啟兩個記事本就啟動了兩個記事本程序,開啟一個Word就啟動了一個Word程序。程序是很多資源的集合。
每個程序提供執行程式所需的資源。程序具有虛擬地址空間、可執行程式碼、對系統物件的開啟控制代碼、安全上下文、程序惟一識別符號、環境變數、優先順序類、最小和最大工作集大小,以及至少一個執行執行緒。每個程序都是從一個執行緒(通常稱為主執行緒)開始的,但是可以從它的任何執行緒中建立其他執行緒。大部分程序都不止同時幹一件事,比如Word,它可以同時進行打字、拼寫檢查、列印等事情。在一個程序內部,要同時幹多件事,就需要同時執行多個“子任務”,我們把程序內的這些“子任務”稱為執行緒(Thread)。
多程序多用於CPU密集型任務,例如:排序、計算,都是消耗CPU的。
三、程序與執行緒的區別
- 執行緒是作業系統能夠進行運算排程的最小單位,而程序是一組與計算相關的資源。一個程序可以包含一個或多個執行緒。
- 地址空間和其它資源(如開啟檔案):程序間相互獨立,同一程序下的各執行緒間共享。某程序內的執行緒在其它程序不可見。
- 通訊:程序間通訊IPC,執行緒間可以直接讀寫程序資料段(如全域性變數)來進行通訊——需要程序同步和互斥手段的輔助,以保證資料的一致性。
- 排程和切換:執行緒上下文切換比程序上下文切換要快得多。
- 建立執行緒比程序開銷小(開一個程序,裡面就有空間了,而執行緒在程序裡面,就沒必要在開一個空間了)
四、全域性直譯器鎖GIL(Global Interpreter Lock)
我們想執行的速度快一點的話,就得使用多執行緒或者多程序。在Python裡面,多執行緒被很多人詬病,為什麼呢,因為Python的直譯器(CPython)使用了一個叫GIL的全域性直譯器鎖,它不能利用多核CPU,只能執行在一個CPU上面,但是你在執行程式的時候,看起來好像還是在一起執行的,是因為作業系統輪流讓各個任務交替執行,任務1執行0.01秒,切換到任務2,任務2執行0.01秒,再切換到任務3,執行0.01秒……這樣反覆執行下去。表面上看,每個任務都是交替執行的,但是,由於CPU的執行速度實在是太快了,我們感覺就像所有任務都在同時執行一樣,這個叫做上下文切換。
在CPython中,由於全域性直譯器鎖,在同一時刻只能有一個執行緒進入直譯器,只能有一個執行緒執行Python程式碼(即使某些面向效能的庫可能會克服這個限制)。如果希望應用程式更好地利用多核計算機的計算資源,建議使用多程序。但是,如果您希望同時執行多個I/O密集型的任務,執行緒仍然是一個合適的選擇。
五、threading模組
1、直接呼叫
Python中的多執行緒使用threading模組,執行多執行緒使用threading.Thread(target=方法,args=(引數,)),如下:
1 import threading,time 2 def run(): # 定義每個執行緒需要執行的函式 3 time.sleep(3) 4 print('呵呵呵') 5 6 # 序列 7 for i in range(5): # 序列,需要執行15秒 8 run() 9 10 # 多執行緒: 11 for j in range(5): # 並行:執行3秒 12 t = threading.Thread(target=run) # 例項化了一個執行緒 13 t.start()
2、使用類繼承式呼叫
自己寫一個類繼承 threading.Thread類,在子類中重寫 run()方法,如下:
1 import threading,time 2 3 class MyThread(threading.Thread): 4 def __init__(self, num): 5 threading.Thread.__init__(self) 6 self.num = num 7 8 def run(self): # 定義每個執行緒要執行的函式 9 print("running on number:%s" % self.num) 10 time.sleep(3) 11 12 if __name__ == '__main__': 13 t1 = MyThread(1) 14 t2 = MyThread(2) 15 t1.start() 16 t2.start()
threading模組的常用方法
1 # threading 模組提供的常用方法: 2 # threading.currentThread(): 返回當前的執行緒變數。 3 # threading.enumerate(): 返回一個包含正在執行的執行緒的list。正在執行指執行緒啟動後、結束前,不包括啟動前和終止後的執行緒。 4 # threading.activeCount(): 返回正在執行的執行緒數量,與len(threading.enumerate())有相同的結果。 5 # 除了使用方法外,執行緒模組同樣提供了Thread類來處理執行緒,Thread類提供了以下方法: 6 # run(): 用以表示執行緒活動的方法。 7 # start():啟動執行緒活動。 8 # join([time]): 等待至執行緒中止。這阻塞呼叫執行緒直至執行緒的join() 方法被呼叫中止-正常退出或者丟擲未處理的異常-或者是可選的超時發生。 9 # isAlive(): 返回執行緒是否活動的。 10 # getName(): 返回執行緒名。 11 # setName(): 設定執行緒名。
3、多執行緒執行速度測試
下面再舉一個例子來對比單執行緒和多執行緒的執行速度:
1 import requests,time,threading 2 # 定義需要下載的網頁字典 3 urls = { 4 'besttest':'http://www.besttest.cn', 5 'niuniu':'http://www.nnzhp.cn', 6 'dsx':'http://www.imdsx.cn', 7 'cc':'http://www.cc-na.cn', 8 'alin':'http://www.limlhome.cn' 9 } 10 # 下載網頁並儲存成html檔案 11 # 子執行緒執行的函式,如果裡面有返回值的話,是不能獲取到的 12 # 只能在函式外面定義一個list或者字典來存每次處理的結果 13 data = {} 14 def down_html(file_name,url): 15 start_time = time.time() 16 res = requests.get(url).content # content就是返回的二進位制檔案內容 17 open(file_name+'.html','wb').write(res) 18 end_time = time.time() 19 run_time = end_time - start_time 20 data[url] = run_time 21 22 # 序列 23 start_time = time.time() # 記錄開始執行時間 24 for k,v in urls.items(): 25 down_html(k,v) 26 end_time = time.time() # 記錄執行結束時間 27 run_time = end_time - start_time 28 print(data) 29 print('序列下載總共花了%s秒'%run_time) 30 31 # 並行 32 start_time = time.time() 33 for k,v in urls.items(): 34 t = threading.Thread(target=down_html,args=(k,v)) # 多執行緒的函式如果傳參的話,必須得用args 35 t.start() 36 end_time = time.time() 37 run_time = end_time-start_time 38 print(data) 39 print('並行下載總共花了%s秒'%run_time)
序列執行結果:
並行執行結果:
從以上執行結果可以看出,並行下載的時間遠短於序列。但是仔細觀察會發現:並行執行時,打印出執行時間後,程式並沒有結束執行,而是等待了一段時間後才結束執行。實際上並行執行時,列印的是主執行緒執行的時間,主執行緒只是負責調起5個子執行緒去執行下載網頁內容,調起子執行緒以後主執行緒就執行完成了,所以執行時間才特別短,主執行緒結束後子執行緒並沒有結束。所以0.015s這個時間是主執行緒執行的時間,而不是並行下載的時間。如果想看到並行下載的時間,就需要引入執行緒等待。
4、執行緒等待(t.join())
1 import requests,time,threading 2 # 定義需要下載的網頁字典 3 urls = { 4 'besttest':'http://www.besttest.cn', 5 'niuniu':'http://www.nnzhp.cn', 6 'dsx':'http://www.imdsx.cn', 7 'cc':'http://www.cc-na.cn', 8 'alin':'http://www.limlhome.cn' 9 } 10 # 下載網頁並儲存成html檔案 11 # 子執行緒執行的函式,如果裡面有返回值的話,是不能獲取到的 12 # 只能在函式外面定義一個list或者字典來存每次處理的結果 13 data = {} 14 def down_html(file_name,url): 15 start_time = time.time() 16 res = requests.get(url).content # content就是返回的二進位制檔案內容 17 open(file_name+'.html','wb').write(res) 18 end_time = time.time() 19 run_time = end_time - start_time 20 data[url] = run_time 21 22 # 序列 23 start_time = time.time() # 記錄開始執行時間 24 for k,v in urls.items(): 25 down_html(k,v) 26 end_time = time.time() # 記錄執行結束時間 27 run_time = end_time - start_time 28 print(data) 29 print('序列下載總共花了%s秒'%run_time) 30 31 # 多執行緒 32 start_time = time.time() 33 threads = [] # 存放啟動的5個子執行緒 34 for k,v in urls.items(): 35 # 多執行緒的函式如果傳參的話,必須得用args 36 t = threading.Thread(target=down_html,args=(k,v)) 37 t.start() 38 threads.append(t) 39 for t in threads: # 主執行緒迴圈等待5個子執行緒執行結束 40 t.join() # 迴圈等待 41 print(data) # 通過函式前面定義的data字典獲取每個執行緒執行的時間 42 end_time = time.time() 43 run_time = end_time - start_time 44 print('並行下載總共花了%s秒'%run_time)
多執行緒執行結果:
從執行結果來看,總執行時間只是稍稍大於最大的下載網頁的時間(主執行緒調起子執行緒也需要一點時間),符合多執行緒的目的。有了執行緒等待,主執行緒就會等到子執行緒全部執行結束後再結束,這樣統計出的才是真正的並行下載時間。
看到這裡,我們還需要回答一個問題:為什麼Python的多執行緒不能利用多核CPU,但是在寫程式碼的時候,多執行緒的確在併發,而且還比單執行緒快
電腦cpu有幾核,那麼只能同時執行幾個執行緒。但是python的多執行緒,只能利用一個cpu的核心。因為Python的直譯器使用了GIL的一個叫全域性直譯器鎖,它不能利用多核CPU,只能執行在一個cpu上面,但是執行程式的時候,看起來好像還是在一起執行的,是因為作業系統輪流讓各個任務交替執行,任務1執行0.01秒,切換到任務2,任務2執行0.01秒,再切換到任務3,執行0.01秒……這樣反覆執行下去。表面上看,每個任務都是交替執行的,但是,由於CPU的執行速度實在是太快了,我們感覺就像所有任務都在同時執行一樣。這個叫做上下文切換。
Python只有一個GIL,執行python時,就要拿到這個鎖才能執行,在遇到I/O 操作時會釋放這把鎖。如果是純計算的程式,沒有 I/O 操作,直譯器會每隔100次操作就釋放這把鎖,讓別的執行緒有機會 執行(這個次數可以通sys.setcheckinterval來調整)同一時間只會有一個獲得GIL執行緒在跑,其他執行緒都處於等待狀態。
1、如果是CPU密集型程式碼(迴圈、計算等),由於計算工作量多和大,計算很快就會達到100,然後觸發GIL的釋放與在競爭,多個執行緒來回切換損耗資源,所以在多執行緒遇到CPU密集型程式碼時,單執行緒會比較快;
2、如果是I\O密集型程式碼(檔案處理、網路爬蟲),開啟多執行緒實際上是併發(不是並行),IO操作會進行IO等待,執行緒A等待時,自動切換到執行緒B,這樣就提升了效率。
5、守護執行緒(setDaemon(True))
所謂守護執行緒的意思就是:只要主執行緒結束,那麼子執行緒立即結束,不管子執行緒有沒有執行完成。
1 import threading 2 def run(): 3 time.sleep(3) 4 print('哈哈哈') 5 6 for i in range(50): 7 t = threading.Thread(target=run) 8 t.setDaemon(True) # 把子執行緒設定成為守護執行緒 9 t.start() 10 print('Done,執行完成') 11 time.sleep(3)
6、同步鎖
多個執行緒同時修改一個數據的時候,可能會把資料覆蓋,所以需要加執行緒鎖(threading.lock())。我們先來看看下面兩段程式碼
程式碼一:
1 import threading 2 3 def addNum(): 4 global num #在每個執行緒中都獲取這個全域性變數 5 num-=1 6 7 num = 100 #設定一個全域性變數 8 thread_list = [] 9 for i in range(100): 10 t = threading.Thread(target=addNum) 11 t.start() 12 thread_list.append(t) 13 14 for t in thread_list: #等待所有執行緒執行完畢 15 t.join() 16 17 print('final num:', num ) # 執行結果為0
程式碼二:
1 import threading,time 2 3 def addNum(): 4 global num #在每個執行緒中都獲取這個全域性變數 5 6 temp=num 7 # print('--get num:',num ) 8 time.sleep(0.1) 9 num =temp-1 #對此公共變數進行減1操作 10 11 num = 100 #設定一個共享變數 12 thread_list = [] 13 for i in range(100): 14 t = threading.Thread(target=addNum) 15 t.start() 16 thread_list.append(t) 17 18 for t in thread_list: #等待所有執行緒執行完畢 19 t.join() 20 21 print('final num:', num ) # 執行結果是99
從邏輯上看,以上兩端程式碼是一樣的,只不過第二段程式碼的實現過程是將num賦值給一箇中間變數temp,由這個中間變數完成計算然後再把結果賦值給回num。同時,在這個過程中加了一個等待時間0.1s。為什麼執行結果卻不一樣呢?就是因為這個0.1s的等待時間,第一個執行緒拿到的num值是100,在它準備計算前有一個等待時間0.1s,所以CPU切換到了第二個執行緒,它拿到的num的值還是100(因為第一個執行緒並未完成計算,num值未變).......,直到CPU切換第100個執行緒,它拿到的num的值還是100,100個執行緒每一個都沒有執行完就進行了切換。等待這0.1s的時間過去以後,所有的執行緒一個個開始計算,最後的結果都是99。
那為什麼第一段程式碼沒有問題呢?因為CPU的計算太快了,CPU還沒來得及切換計算已經完成了。
那我們如何來解決這個問題呢?可能大家想到了可以用join讓所有的執行緒程式設計序列的,這樣就不存在同時修改資料的可能了。但是,這樣的話任務內的所有程式碼都是序列執行的,而我們現在只想讓修改共享資料這部分序列執行,而其他部分還是並行執行。
這時,我們就可以通過同步鎖來解決這種問題,程式碼如下:
1 import threading,time 2 3 def addNum(): 4 global num #在每個執行緒中都獲取這個全域性變數 5 lock.acquire() # 加鎖 6 temp=num 7 print('--get num:',num ) 8 time.sleep(0.1) 9 num =temp-1 #對此公共變數進行減1操作 10 lock.release() # 解鎖 11 12 lock = threading.Lock() # 例項化一把鎖 13 num = 100 #設定一個共享變數 14 thread_list = [] 15 for i in range(100): 16 t = threading.Thread(target=addNum) 17 t.start() 18 thread_list.append(t) 19 20 for t in thread_list: #等待所有執行緒執行完畢 21 t.join() 22 23 print('final num:', num ) # 執行結果是0
問題解決了,但是我們還有個疑問,這個同步鎖和全域性直譯器鎖(GIL)有什麼關係呢?
- Python的執行緒在GIL的控制之下,執行緒之間,對整個Python直譯器,對Python提供的C API的訪問都是互斥的,這可以看作是Python核心級的互斥機制。但是這種互斥是我們不能控制的,我們還需要另外一種可控的互斥機制——使用者級互斥。核心級通過互斥保護了核心的共享資源,同樣,使用者級互斥保護了使用者程式中的共享資源。
- GIL 的作用是:對於一個直譯器,只能有一個執行緒在執行bytecode。所以每時每刻只有一條bytecode在被一個執行緒執行。GIL保證了bytecode 這層面上是執行緒是安全的。但是如果有個操作比如 x += 1,這個操作需要多個bytecodes操作,在執行這個操作的多條bytecodes期間的時候可能中途就切換執行緒了,這樣就出現了資料競爭的情況了。
- 那我的同步鎖也是保證同一時刻只有一個執行緒被執行,是不是沒有GIL也可以?是的,那要GIL有什麼用?好像真的是沒用!!
7、死鎖和遞迴鎖
線上程間共享多個資源的時候,如果兩個執行緒分別佔有一部分資源並且同時等待對方的資源,就會造成死鎖,因為系統判斷這部分資源都正在使用,所以這兩個執行緒在無外力作用下將一直等待下去。我們來看下面這段程式碼:
1 import threading,time 2 3 class myThread(threading.Thread): 4 def doA(self): 5 lockA.acquire() 6 print(self.name,"gotlockA",time.ctime()) 7 time.sleep(3) 8 lockB.acquire() 9 print(self.name,"gotlockB",time.ctime()) 10 lockB.release() 11 lockA.release() 12 13 def doB(self): 14 lockB.acquire() 15 print(self.name,"gotlockB",time.ctime()) 16 time.sleep(2) 17 lockA.acquire() 18 print(self.name,"gotlockA",time.ctime()) 19 lockA.release() 20 lockB.release() 21 def run(self): 22 self.doA() 23 self.doB() 24 if __name__=="__main__": 25 26 lockA=threading.Lock() 27 lockB=threading.Lock() 28 threads=[] 29 for i in range(5): 30 threads.append(myThread()) 31 for t in threads: 32 t.start() 33 for t in threads: 34 t.join()
執行結果如下:
第一個執行緒執行完doA,再執行doB時,拿到了lockB,再想拿lockA時,發現lockA已經被第二個執行緒拿到了,第一個執行緒拿不到lockA了。同樣,第二個執行緒拿到了lockA,再想拿lockB時,發現此時lockB還在第一個執行緒手裡沒有釋放,所以第二個執行緒同樣也拿不到lockB。這樣就造成了一個現象:第一個執行緒在等待第二個執行緒釋放lockA,第二個執行緒在等第一個執行緒釋放lockB,這樣一直等下去造成了死鎖。解決的辦法就是使用遞迴鎖,如下:
1 import threading,time 2 3 class myThread(threading.Thread): 4 def doA(self): 5 lock.acquire() 6 print(self.name,"gotlockA",time.ctime()) 7 time.sleep(3) 8 lock.acquire() 9 print(self.name,"gotlockB",time.ctime()) 10 lock.release() 11 lock.release() 12 13 def doB(self): 14 lock.acquire() 15 print(self.name,"gotlockB",time.ctime()) 16 time.sleep(2) 17 lock.acquire() 18 print(self.name,"gotlockA",time.ctime()) 19 lock.release() 20 lock.release() 21 def run(self): 22 self.doA() 23 self.doB() 24 if __name__=="__main__": 25 26 lock = threading.RLock() # 遞迴鎖 27 threads=[] 28 for i in range(5): 29 threads.append(myThread()) 30 for t in threads: 31 t.start() 32 for t in threads: 33 t.join()
遞迴鎖就是將lockA=threading.Lock()和lockB=threading.Lock()改為了lock = threading.RLock(),執行結果如下:
8、訊號量
- 訊號量是用來控制執行緒併發數的,BoundedSemaphore或Semaphore管理一個內建的計數器,每當呼叫acquire()時減1,呼叫release()時加1;
- 計數器不能小於0,當計數器為0時,acquire()將阻塞執行緒至同步鎖定狀態,直到其他執行緒呼叫release()。(類似於停車位的概念);
- BoundedSemaphore與Semaphore的唯一區別在於前者將在呼叫release()時檢查計數器的值是否超過了計數器的初始值,如果超過了將丟擲一個異常;
- 訊號量實質上也是一把鎖;
例項:
1 import threading,time 2 class myThread(threading.Thread): 3 def run(self): 4 if semaphore.acquire(): 5 print(self.name) 6 time.sleep(1) 7 semaphore.release() 8 9 if __name__=="__main__": 10 semaphore=threading.BoundedSemaphore(5) 11 thrs=[] 12 for i in range(100): 13 thrs.append(myThread()) 14 for t in thrs: 15 t.start()
9、條件變數同步(Condition)
- 有一類執行緒需要滿足條件之後才能夠繼續執行,Python提供了threading.Condition 物件用於條件變數執行緒的支援,它除了能提供RLock()或Lock()的方法外,還提供了 wait()、notify()、notifyAll()方法;
- lock_con=threading.Condition([Lock/Rlock]):引數填寫建立鎖的型別,不是必填項,不傳引數預設建立的是Rlock鎖;
- wait():條件不滿足時呼叫,執行緒會釋放鎖並進入等待阻塞;
- notify():條件創造後呼叫,通知等待池啟用一個執行緒;
- notifyAll():條件創造後呼叫,通知等待池啟用所有執行緒
例項:
import threading,time from random import randint class Producer(threading.Thread): def run(self): global L while True: val=randint(0,100) print('生產者',self.name,":Append"+str(val),L) if lock_con.acquire(): L.append(val) lock_con.notify() lock_con.release() time.sleep(3) class Consumer(threading.Thread): def run(self): global L while True: lock_con.acquire() if len(L)==0: lock_con.wait() print('消費者',self.name,":Delete"+str(L[0]),L) del L[0] lock_con.release() time.sleep(1) if __name__=="__main__": L=[] lock_con=threading.Condition() threads=[] for i in range(5): threads.append(Producer()) threads.append(Consumer()) for t in threads: t.start() for t in threads: t.join()
10、同步條件(Event)
- 同步條件和條件變數同步差不多意思,只是少了鎖功能,因為同步條件設計於不訪問共享資源的條件環境;
- event=threading.Event():條件環境物件,初始值 為False;
例項:
1 import threading,time 2 class Boss(threading.Thread): 3 def run(self): 4 print("BOSS:今晚大家都要加班到22:00。") 5 event.isSet() or event.set() 6 time.sleep(3) 7 print("BOSS:<22:00>可以下班了。") 8 event.isSet() or event.set() 9 10 class Worker(threading.Thread): 11 def run(self): 12 event.wait() 13 print("Worker:哎……命苦啊!") 14 time.sleep(1) 15 event.clear() 16 event.wait() 17 print("Worker:Oh,Yeah!!") 18 19 if __name__=="__main__": 20 event=threading.Event() 21 threads=[] 22 for i in range(5): 23 threads.append(Worker()) 24 threads.append(Boss()) 25 for t in threads: 26 t.start() 27 for t in threads: 28 t.join()
11、執行緒佇列(queue)
- queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.
- 當資訊必須在多個執行緒之間安全地交換時,佇列線上程程式設計中特別有用;
queue佇列類的方法:
1 # 建立一個“佇列”物件 2 import queue 3 q = queue.Queue(maxsize = 10) 4 # queue.Queue類即是一個佇列的同步實現。佇列長度可為無限或者有限。可通過Queue的建構函式的可選引數maxsize來設定佇列長度。如果maxsize小於1就表示佇列長度無限。 5 6 # 將一個值放入佇列中 7 q.put(10) 8 # 呼叫佇列物件的put()方法在隊尾插入一個專案。put()有兩個引數,第一個item為必需的,為插入專案的值;第二個block為可選引數,預設為:1。如果隊列當前為空且block為1,put()方法就使呼叫執行緒暫停,直到空出一個數據單元。如果block為0,put方法將引發Full異常。 9 10 # 將一個值從佇列中取出 11 q.get() 12 # 呼叫佇列物件的get()方法從隊頭刪除並返回一個專案。可選引數為block,預設為True。如果佇列為空且block為True,get()就使呼叫執行緒暫停,直至有專案可用。如果佇列為空且block為False,佇列將引發Empty異常。 13 14 # Python Queue模組有三種佇列及建構函式: 15 queue.Queue(maxsize=10) # Python Queue模組的FIFO佇列先進先出。 16 queue.LifoQueue(maxsize=10) # LIFO類似於堆,即先進後出。 17 queue.PriorityQueue(maxsize=10) # 還有一種是優先順序佇列級別越低越先出來。 18 19 # 此包中的常用方法(q = queue.Queue()): 20 # q.qsize() 返回佇列的大小 21 # q.empty() 如果佇列為空,返回True,反之False 22 # q.full() 如果佇列滿了,返回True,反之False 23 # q.full 與 maxsize 大小對應 24 # q.get([block[, timeout]]) 獲取佇列,timeout等待時間 25 # q.get_nowait() 相當q.get(False) 26 # 非阻塞 q.put(item) 寫入佇列,timeout等待時間 27 # q.put_nowait(item) 相當q.put(item, False) 28 # q.task_done() 在完成一項工作之後,q.task_done() 函式向任務已經完成的佇列傳送一個訊號 29 # q.join() 實際上意味著等到佇列為空,再執行別的操作
例項:
1 import threading,queue 2 from time import sleep 3 from random import randint 4 class Production(threading.Thread): 5 def run(self): 6 while True: 7 r=randint(0,100) 8 q.put(r) 9 print("生產出來%s號包子"%r) 10 sleep(1) 11 class Proces(threading.Thread): 12 def run(self): 13 while True: 14 re=q.get() 15 print("吃掉%s號包子"%re) 16 if __name__=="__main__": 17 q=queue.Queue(10) 18 threads=[Production(),Production(),Production(),Proces()] 19 for t in threads: 20 t.start() 21 for t in threads: 22 t.join()
六、multiprocessing模組
1、直接呼叫
Python中的多程序使用multiprocessing模組,執行多程序使用multiprocessing.Process(target=方法,args=(引數,)),如下:
例項一
1 import multiprocessing,time 2 3 def f(name): 4 time.sleep(1) 5 print('Hello!!',name,time.ctime()) 6 7 if __name__ == '__main__': 8 p_list = [] 9 for i in range(3): 10 p = multiprocessing.Process(target=f,args=('Porcess',)) 11 p_list.append(p) 12 p.start() 13 for i in p_list: 14 i.join() 15 16 print('end')
例項二:一個簡單的多程序,multiprocessing.Process(target=run,args=(6,))
1 import multiprocessing,threading 2 def my(): 3 print('哈哈哈') 4 5 def run(num): 6 for i in range(num): 7 t = threading.Thread(target=my) 8 t.start() 9 # 總共啟動5個程序,每個程序下面啟動6個執行緒,函式my()執行30次 10 if __name__ == '__main__': 11 process = [] 12 for i in range(5): 13 # args只有一個引數一定後面要加逗號 14 p = multiprocessing.Process(target=run,args=(6,)) # 啟動一個程序 15 p.start() 16 process.append(p) 17 [p.join() for p in process] # 與執行緒用法一致
2、類式呼叫(與多程序類似)
例項
自己寫一個類繼承 multiprocessing.Process類,在子類中重寫 run()方法,如下:
1 class MyProcess(multiprocessing.Process): 2 def __init__(self): 3 super(MyProcess, self).__init__() 4 # self.name = name 5 6 def run(self): 7 time.sleep(1) 8 print('Hello!!',self.name,time.ctime()) 9 10 if __name__ == '__main__': 11 p_list = [] 12 for i in range(3): 13 p = MyProcess() 14 p.start() 15 p_list.append(p) 16 for p in p_list: 17 p.join() 18 print('end')
3、Process類
(1)構造方法:def __init__(self, group=None, target=None, name=None, args=(), kwargs={})
- group: 執行緒組,目前還沒有實現,庫引用中提示必須是None;
- target: 要執行的方法;
- name: 程序名;
- args/kwargs: 要傳入方法的引數;
(2)例項方法
- is_alive():返回程序是否在執行;
- terminate():不管任務是否完成,立即停止工作程序;
- join([timeout]):阻塞當前上下文環境的程序程,直到呼叫此方法的程序終止或到達指定的timeout(可選引數);
- start():程序準備就緒,等待CPU排程;
- run():strat()呼叫run方法,如果例項程序時未制定傳入target,這star執行t預設run()方法;
(3)屬性
- daemon:和執行緒的setDeamon功能一樣;
- exitcode(程序在執行時為None、如果為–N,表示被訊號N結束);
- name:程序名;
- pid:程序號;
4、程序間通訊
不同程序間的資料是不共享的,要想實現多個程序間的資料交換可以用以下方法:
(1)Queues
使用方法跟threading裡的queue類似,如下:
1 from multiprocessing import Process,Queue 2 3 def f(q,i): 4 q.put([42,i,'hello']) 5 print('subprocess q_id:',id(q)) 6 7 if __name__ == '__main__': 8 q = Queue() 9 p_list=[] 10 print('main q_id:',id(q)) 11 for i in range(3): 12 p = Process(target=f, args=(q,i)) 13 p_list.append(p) 14 p.start() 15 print(q.get()) 16 print(q.get()) 17 print(q.get()) 18 for p in p_list: 19 p.join()
(2)Pipes
Pipes函式的作用是:返回由管道連線的一對連線物件,管道預設情況下是雙工的(雙向的)
1 from multiprocessing import Process,Pipe 2 3 def f(child_conn): 4 child_conn.send('子程序') 5 child_conn.send([42, None, 'hello']) 6 print(child_conn.recv()) # 子程序接收父程序傳送的資訊,列印'陣列' 7 child_conn.close() 8 9 if __name__ == '__main__': 10 parent_conn,child_conn = Pipe() 11 p = Process(target=f, args=(child_conn,)) 12 p.start() 13 # 父程序接收子程序傳送的資訊 14 print(parent_conn.recv()) # 列印 '子程序' 15 print(parent_conn.recv()) # 列印 '[42, None, 'hello']' 16 parent_conn.send('陣列') # 父程序給子程序傳送資訊 17 p.join()
Pipe方法返回的兩個連線物件表示管道的兩端。每個連線物件都有send()和recv()方法。請注意,如果兩個程序(或執行緒)試圖同時從管道的同一端讀取或寫入資料,則管道中的資料可能會損壞。當然,在同一時間使用管道的不同端不會有流程損壞的風險。
(3)Managers
Managers()返回的manager物件控制一個伺服器程序,該程序儲存Python物件,並允許其他程序使用代理操作它們。
1 from multiprocessing import Process,Manager 2 3 def f(dic,lis,n): 4 dic[n] = '1' 5 dic['2'] = 2 6 dic[0.25] = None 7 lis.append(n) 8 # print(lis) 9 10 if __name__ == '__main__': 11 # with Manager() as manager: 12 manager = Manager() 13 dic = manager.dict() 14 lis = manager.list(range(5)) 15 p_list = [] 16 for i in range(10): 17 p = Process(target=f, args=(dic,lis,i)) 18 p.start() 19 p_list.append(p) 20 for res in p_list: 21 res.join() 22 23 print(dic) 24 print(lis)
Manager()返回的管理器將支援型別list、dict、Namespace、Lock、RLock、Semaphore、BoundedSemaphore、Condition、Event、Barrier、Queue、Value和Array。
5、程序鎖
1 from multiprocessing import Process,Lock 2 3 def f(l,i): 4 l.acquire() 5 print('hello world',i) 6 l.release() 7 8 if __name__ == '__main__': 9 lock = Lock() 10 p_list = [] 11 for num in range(10): 12 p = Process(target=f, args=(lock,num)) 13 p.start() 14 p_list.append(p) 15 for p in p_list: 16 p.join() 17 print('end')
6、程序池
在程式實際處理問題過程中,忙時會有成千上萬的任務需要被執行,閒時可能只有零星任務。那麼在成千上萬個任務需要被執行的時候,我們就需要去建立成千上萬個程序麼?第一,建立程序需要消耗時間,銷燬程序也需要消耗時間。第二,即便開啟了成千上萬的程序,因為CPU核心數有限,作業系統也不能讓他們同時執行,這樣反而會影響程式的效率。因此我們不能無限制的根據任務開啟或者結束程序。那麼我們要怎麼做呢?
在這裡,要給大家介紹一個程序池的概念。定義一個池子,在裡面放上固定數量的程序,有需求來了,就拿一個池中的程序來處理任務,等到處理完畢,程序並不關閉,而是將程序再放回程序池中繼續等待任務。如果有很多工需要執行,池中的程序數量不夠,任務就要等待之前的程序執行任務完畢歸來,拿到空閒程序才能繼續執行。也就是說,池中程序的數量是固定的,那麼同一時間最多有固定數量的程序在執行。這樣不會增加作業系統的排程難度,還節省了開關程序的時間,也一定程度上能夠實現併發效果。
Python中的程序池使用multiprocessing模組的Pool類,主要方法如下:
1 apply(func [,args [,kwargs]]):在一個池工作程序中執行func(*args,**kwargs),然後返回結果。 2 '''需要注意的是:此操作並不會在所有的池工作程序中並執行func函式。如果要通過不同引數併發地執行func函式,必須從不同執行緒呼叫p.apply()函式或者使用apply_async()''' 3 4 apply_async(func [,args [,kwargs]]):在一個池工作程序中執行func(*args,**kwargs),然後返回結果。 5 '''此方法的結果是AsyncResult類的例項,callback是可呼叫物件,接收輸入引數。當func的結果變為可用時,將回調函式傳遞給callback。callback禁止執行任何阻塞操作,否則將接收其他非同步操作中的結果。''' 6 7 close():關閉程序池,防止進一步操作。如果所有操作持續掛起,它們將在工作程序終止前完成 8 9 join():等待所有工作程序退出。此方法只能在close()或teminate()之後呼叫
(1)程序池和多程序效率對比
1 import time,multiprocessing 2 def func(i): 3 i += 1 4 5 if __name__ == '__main__': 6 # 程序池 7 p = multiprocessing.Pool(5) 8 start = time.time() 9 p.map(func,range(100)) # 使用map函式迴圈呼叫func函式 10 p.close() # 不允許再向程序池中新增任務 11 p.join() 12 print(time.time()-start) # 0.21729111671447754 13 # 多程序 14 start = time.time() 15 p_list = [] 16 for i in range(100): 17 # 多程序呼叫func函式 18 p = multiprocessing.Process(target=func,args=(i,)) 19 p.start() 20 p_list.append(p) 21 for p in p_list: 22 p.join() 23 print(time.time()-start) # 3.7114222049713135
可以看到使用程序池只用了0.21秒,而使用多程序則使用了3.7秒之多,這是因為程序池只用了5個程序,而多程序則使用了100個程序。多程序排程100個程序比程序池中排程5個程序更耗時且更耗資源。
(2)程序池的同步/非同步呼叫
同步:序列
1 import multiprocessing,os,time 2 def func(i): 3 print('%s run'%os.getpid()) 4 time.sleep(1) 5 i += 1 6 7 if __name__ == '__main__': 8 # 程序池 9 p = multiprocessing.Pool(5) 10 for i in range(20): 11 p.apply(func,args=(i,)) # 同步提交,序列執行
非同步:並行
1 import time,multiprocessing 2 def func(i): 3 time.sleep(1) 4 i += 1 5 print(i) 6 7 if __name__ == '__main__': 8 # 程序池 9 p = multiprocessing.Pool(5) 10 for i in range(20): 11 p.apply_async(func, args=(i,)) # 非同步提交,並行執行 12 p.close() # 不允許再向程序池中新增任務,close()必須在join()前面 13 # 使用非同步提交的任務,必須新增join() 14 p.join() # 等待子程序結束再往下執行,否則主程序結束了子程序還沒執行完 15 print('end')
(3)接收程序池呼叫函式的返回結果
方法一:
1 import time,multiprocessing 2 3 def func(i): 4 time.sleep(1) 5 i += 1 6 return i 7 8 if __name__ == '__main__': 9 # 程序池 10 p = multiprocessing.Pool(5) 11 res_l = [] 12 for i in range(20): 13 res = p.apply_async(func,args=(i,)) # 非同步提交 14 # print(res.get()) # 阻塞,等待任務結果 15 res_l.append(res) # 返回結果之後,將結果放入列表,歸還程序,之後再執行新的任務 16 # 需要注意的是,程序池中的三個程序不會同時開啟或者同時結束,而是執行完一個就釋放一個程序,這個程序就去接收新的任務 17 p.close() # 不允許再向程序池中新增任務,close()必須在join()前面 18 p.join() # 等待子程序結束再往下執行,必須新增join(),否則主程序結束了子程序還沒執行完 19 for i in res_l: 20 print(i.get()) # 使用get來獲取apply_aync的結果,如果是apply,則沒有get方法,因為apply是同步執行,立刻獲取結果,也根本無需get 21 print('end')
方法二:
1 from multiprocessing import Process,Pool 2 import time 3 4 def Foo(i): 5 time.sleep(2) 6 return i + 100 7 8 def Bar(arg): # 回撥函式是在主程序中完成的,直接接收子程序中函式的返回值,不能傳另外的引數 9 print('----->exec done:',arg) 10 11 if __name__ == '__main__': 12 pool = Pool(5) # 允許程序池裡同時放入5個程序 13 for i in range(10): 14 pool.apply_async(func=Foo, args=(i,),callback=Bar) # 並行執行,Bar函式接收Foo函式的返回結果,callback回撥執行者為主程序 15 pool.close() 16 pool.join() # 程序池中程序執行完畢後再關閉,如果註釋,那麼程式直接關閉 17 print('end')
(4)爬蟲例項
1 import multiprocessing,requests 2 3 def get_url(url): 4 res = requests.get(url) 5 return {'url':url, 6 'status_code':res.status_code, 7 'content':res.text} 8 9 def parser(dic): 10 print(dic['url'],dic['status_code'],len(dic['content'])) 11 12 13 if __name__ == '__main__': 14 url_list = ['http://www.baidu.com', 15 'http://www.hao123.com', 16 'http://www.163.com', 17 'http://www.csdn.com'] 18 p = multiprocessing.Pool(4) 19 res_l = [] 20 for url in url_list: 21 p.apply_async(get_url,args=(url,),callback=parser) 22 p.close() 23 p.join() 24 print('END!!')
七、多執行緒、多程序總結
1、多執行緒:
多用於IO密集型行為(上傳/下載)
2、多程序
多用於CPU密集型任務(計算/排序)