1. 程式人生 > 其它 >redis的Linux系統安裝與配置、redis的api使用、高階用法之慢查詢、pipline事物

redis的Linux系統安裝與配置、redis的api使用、高階用法之慢查詢、pipline事物

一、什麼是執行緒(thread)

  執行緒是作業系統能夠進行運算排程的最小單位。它被包含在程序之中,是程序中的實際運作單位。一個執行緒指的是程序中一個單一順序的控制流,一個程序中可以包含多個執行緒,每條執行緒並行執行不同的任務。下面,我們來舉一個例子來說明執行緒的工作模式:

  • 假設你正在讀一本書,你現在想休息一下,但是你想在回來繼續閱讀的時候從剛剛停止閱讀的地方繼續讀。實現這一點的一種方法是記下頁碼、行號和字號。閱讀一本書的執行環境是這三個數字。
  • 如果你有一個室友,她也在用同樣的方法閱讀這本書,她可以在你不用的時候拿起書,從她停下來的地方繼續讀。然後你可以把它拿回去,從你標記停下的地方繼續閱讀。

  執行緒以相同的方式工作。CPU給你的錯覺是它在同一時間做多個計算。它通過在每次計算上花費一點時間來實現這一點。它可以這樣做,因為它對每個計算都有一個執行上下文。就像您可以與朋友共享一本書一樣,許多工也可以共享一個CPU。當然,真正地同時執行多執行緒需要多核CPU才可能實現。

  多執行緒多用於處理IO密集型任務頻繁寫入讀出,cpu負責排程,消耗的是磁碟空間。

二、什麼是程序(process)

  程式的執行例項稱為程序。對於作業系統來說,一個任務就是一個程序(Process),比如開啟一個瀏覽器就是啟動一個瀏覽器程序,開啟一個記事本就啟動了一個記事本程序,開啟兩個記事本就啟動了兩個記事本程序,開啟一個Word就啟動了一個Word程序。程序是很多資源的集合。

  每個程序提供執行程式所需的資源。程序具有虛擬地址空間、可執行程式碼、對系統物件的開啟控制代碼、安全上下文、程序惟一識別符號、環境變數、優先順序類、最小和最大工作集大小,以及至少一個執行執行緒。每個程序都是從一個執行緒(通常稱為主執行緒)開始的,但是可以從它的任何執行緒中建立其他執行緒。大部分程序都不止同時幹一件事,比如Word,它可以同時進行打字、拼寫檢查、列印等事情。在一個程序內部,要同時幹多件事,就需要同時執行多個“子任務”,我們把程序內的這些“子任務”稱為執行緒(Thread)。

  多程序多用於CPU密集型任務,例如:排序、計算,都是消耗CPU的。

三、程序與執行緒的區別

  • 執行緒是作業系統能夠進行運算排程的最小單位,而程序是一組與計算相關的資源。一個程序可以包含一個或多個執行緒。
  • 地址空間和其它資源(如開啟檔案):程序間相互獨立,同一程序下的各執行緒間共享。某程序內的執行緒在其它程序不可見。 
  • 通訊:程序間通訊IPC,執行緒間可以直接讀寫程序資料段(如全域性變數)來進行通訊——需要程序同步和互斥手段的輔助,以保證資料的一致性。
  • 排程和切換:執行緒上下文切換比程序上下文切換要快得多。
  • 建立執行緒比程序開銷小(開一個程序,裡面就有空間了,而執行緒在程序裡面,就沒必要在開一個空間了)

四、全域性直譯器鎖GIL(Global Interpreter Lock)

  我們想執行的速度快一點的話,就得使用多執行緒或者多程序。在Python裡面,多執行緒被很多人詬病,為什麼呢,因為Python的直譯器(CPython)使用了一個叫GIL的全域性直譯器鎖,它不能利用多核CPU,只能執行在一個CPU上面,但是你在執行程式的時候,看起來好像還是在一起執行的,是因為作業系統輪流讓各個任務交替執行,任務1執行0.01秒,切換到任務2,任務2執行0.01秒,再切換到任務3,執行0.01秒……這樣反覆執行下去。表面上看,每個任務都是交替執行的,但是,由於CPU的執行速度實在是太快了,我們感覺就像所有任務都在同時執行一樣,這個叫做上下文切換。

  在CPython中,由於全域性直譯器鎖,在同一時刻只能有一個執行緒進入直譯器,只能有一個執行緒執行Python程式碼(即使某些面向效能的庫可能會克服這個限制)。如果希望應用程式更好地利用多核計算機的計算資源,建議使用多程序。但是,如果您希望同時執行多個I/O密集型的任務,執行緒仍然是一個合適的選擇。

五、threading模組

1、直接呼叫

Python中的多執行緒使用threading模組,執行多執行緒使用threading.Thread(target=方法,args=(引數,)),如下:

 1 import threading,time
 2 def run():          # 定義每個執行緒需要執行的函式
 3     time.sleep(3)
 4     print('呵呵呵')
 5 
 6 # 序列
 7 for i in range(5):  # 序列,需要執行15秒
 8     run()
 9 
10 # 多執行緒:
11 for j in range(5):    # 並行:執行3秒
12     t = threading.Thread(target=run)  # 例項化了一個執行緒
13     t.start()

2、使用類繼承式呼叫

自己寫一個類繼承 threading.Thread類,在子類中重寫 run()方法,如下:

 1 import threading,time
 2 
 3 class MyThread(threading.Thread):
 4     def __init__(self, num):
 5         threading.Thread.__init__(self)
 6         self.num = num
 7 
 8     def run(self):  # 定義每個執行緒要執行的函式
 9         print("running on number:%s" % self.num)
10         time.sleep(3)
11 
12 if __name__ == '__main__':
13     t1 = MyThread(1)
14     t2 = MyThread(2)
15     t1.start()
16     t2.start()

threading模組的常用方法

 1 # threading 模組提供的常用方法:
 2 # threading.currentThread(): 返回當前的執行緒變數。
 3 # threading.enumerate(): 返回一個包含正在執行的執行緒的list。正在執行指執行緒啟動後、結束前,不包括啟動前和終止後的執行緒。
 4 # threading.activeCount(): 返回正在執行的執行緒數量,與len(threading.enumerate())有相同的結果。
 5 # 除了使用方法外,執行緒模組同樣提供了Thread類來處理執行緒,Thread類提供了以下方法:
 6 # run(): 用以表示執行緒活動的方法。
 7 # start():啟動執行緒活動。
 8 # join([time]): 等待至執行緒中止。這阻塞呼叫執行緒直至執行緒的join() 方法被呼叫中止-正常退出或者丟擲未處理的異常-或者是可選的超時發生。
 9 # isAlive(): 返回執行緒是否活動的。
10 # getName(): 返回執行緒名。
11 # setName(): 設定執行緒名。

 3、多執行緒執行速度測試

下面再舉一個例子來對比單執行緒和多執行緒的執行速度:

 1 import requests,time,threading
 2 # 定義需要下載的網頁字典
 3 urls = {
 4     'besttest':'http://www.besttest.cn',
 5     'niuniu':'http://www.nnzhp.cn',
 6     'dsx':'http://www.imdsx.cn',
 7     'cc':'http://www.cc-na.cn',
 8     'alin':'http://www.limlhome.cn'
 9 }
10 # 下載網頁並儲存成html檔案
11 # 子執行緒執行的函式,如果裡面有返回值的話,是不能獲取到的
12 # 只能在函式外面定義一個list或者字典來存每次處理的結果
13 data = {}
14 def down_html(file_name,url):
15     start_time = time.time()
16     res = requests.get(url).content  # content就是返回的二進位制檔案內容
17     open(file_name+'.html','wb').write(res)
18     end_time = time.time()
19     run_time = end_time - start_time
20     data[url] = run_time
21 
22 # 序列
23 start_time = time.time()  # 記錄開始執行時間
24 for k,v in urls.items():
25     down_html(k,v)
26 end_time = time.time()    # 記錄執行結束時間
27 run_time = end_time - start_time
28 print(data)
29 print('序列下載總共花了%s秒'%run_time)
30 
31 # 並行
32 start_time = time.time()
33 for k,v in urls.items():
34     t = threading.Thread(target=down_html,args=(k,v))  # 多執行緒的函式如果傳參的話,必須得用args
35     t.start()
36 end_time = time.time()
37 run_time = end_time-start_time
38 print(data)
39 print('並行下載總共花了%s秒'%run_time)

序列執行結果:

並行執行結果:

  從以上執行結果可以看出,並行下載的時間遠短於序列。但是仔細觀察會發現:並行執行時,打印出執行時間後,程式並沒有結束執行,而是等待了一段時間後才結束執行。實際上並行執行時,列印的是主執行緒執行的時間,主執行緒只是負責調起5個子執行緒去執行下載網頁內容,調起子執行緒以後主執行緒就執行完成了,所以執行時間才特別短,主執行緒結束後子執行緒並沒有結束。所以0.015s這個時間是主執行緒執行的時間,而不是並行下載的時間。如果想看到並行下載的時間,就需要引入執行緒等待。

4、執行緒等待(t.join())

 1 import requests,time,threading
 2 # 定義需要下載的網頁字典
 3 urls = {
 4     'besttest':'http://www.besttest.cn',
 5     'niuniu':'http://www.nnzhp.cn',
 6     'dsx':'http://www.imdsx.cn',
 7     'cc':'http://www.cc-na.cn',
 8     'alin':'http://www.limlhome.cn'
 9 }
10 # 下載網頁並儲存成html檔案
11 # 子執行緒執行的函式,如果裡面有返回值的話,是不能獲取到的
12 # 只能在函式外面定義一個list或者字典來存每次處理的結果
13 data = {}
14 def down_html(file_name,url):
15     start_time = time.time()
16     res = requests.get(url).content  # content就是返回的二進位制檔案內容
17     open(file_name+'.html','wb').write(res)
18     end_time = time.time()
19     run_time = end_time - start_time
20     data[url] = run_time
21 
22 # 序列
23 start_time = time.time()  # 記錄開始執行時間
24 for k,v in urls.items():
25     down_html(k,v)
26 end_time = time.time()    # 記錄執行結束時間
27 run_time = end_time - start_time
28 print(data)
29 print('序列下載總共花了%s秒'%run_time)
30 
31 # 多執行緒
32 start_time = time.time()
33 threads = []          # 存放啟動的5個子執行緒
34 for k,v in urls.items():
35     # 多執行緒的函式如果傳參的話,必須得用args
36     t = threading.Thread(target=down_html,args=(k,v))
37     t.start()
38     threads.append(t)
39 for t in threads:     # 主執行緒迴圈等待5個子執行緒執行結束
40     t.join()          # 迴圈等待
41 print(data)           # 通過函式前面定義的data字典獲取每個執行緒執行的時間
42 end_time = time.time()
43 run_time = end_time - start_time
44 print('並行下載總共花了%s秒'%run_time)

 多執行緒執行結果:

  從執行結果來看,總執行時間只是稍稍大於最大的下載網頁的時間(主執行緒調起子執行緒也需要一點時間),符合多執行緒的目的。有了執行緒等待,主執行緒就會等到子執行緒全部執行結束後再結束,這樣統計出的才是真正的並行下載時間。

  看到這裡,我們還需要回答一個問題:為什麼Python的多執行緒不能利用多核CPU,但是在寫程式碼的時候,多執行緒的確在併發,而且還比單執行緒快

  電腦cpu有幾核,那麼只能同時執行幾個執行緒。但是python的多執行緒,只能利用一個cpu的核心。因為Python的直譯器使用了GIL的一個叫全域性直譯器鎖,它不能利用多核CPU,只能執行在一個cpu上面,但是執行程式的時候,看起來好像還是在一起執行的,是因為作業系統輪流讓各個任務交替執行,任務1執行0.01秒,切換到任務2,任務2執行0.01秒,再切換到任務3,執行0.01秒……這樣反覆執行下去。表面上看,每個任務都是交替執行的,但是,由於CPU的執行速度實在是太快了,我們感覺就像所有任務都在同時執行一樣。這個叫做上下文切換。

  Python只有一個GIL,執行python時,就要拿到這個鎖才能執行,在遇到I/O 操作時會釋放這把鎖。如果是純計算的程式,沒有 I/O 操作,直譯器會每隔100次操作就釋放這把鎖,讓別的執行緒有機會 執行(這個次數可以通sys.setcheckinterval來調整)同一時間只會有一個獲得GIL執行緒在跑,其他執行緒都處於等待狀態。

1、如果是CPU密集型程式碼(迴圈、計算等),由於計算工作量多和大,計算很快就會達到100,然後觸發GIL的釋放與在競爭,多個執行緒來回切換損耗資源,所以在多執行緒遇到CPU密集型程式碼時,單執行緒會比較快;

2、如果是I\O密集型程式碼(檔案處理、網路爬蟲),開啟多執行緒實際上是併發(不是並行),IO操作會進行IO等待,執行緒A等待時,自動切換到執行緒B,這樣就提升了效率。

5、守護執行緒(setDaemon(True))

所謂守護執行緒的意思就是:只要主執行緒結束,那麼子執行緒立即結束,不管子執行緒有沒有執行完成。

 1 import threading
 2 def run():
 3     time.sleep(3)
 4     print('哈哈哈')
 5 
 6 for i in range(50):
 7     t = threading.Thread(target=run)
 8     t.setDaemon(True)  # 把子執行緒設定成為守護執行緒
 9     t.start()
10 print('Done,執行完成')
11 time.sleep(3)

6、同步鎖

多個執行緒同時修改一個數據的時候,可能會把資料覆蓋,所以需要加執行緒鎖(threading.lock())。我們先來看看下面兩段程式碼

程式碼一:

 1 import threading
 2 
 3 def addNum():
 4     global num #在每個執行緒中都獲取這個全域性變數
 5     num-=1
 6 
 7 num = 100      #設定一個全域性變數
 8 thread_list = []
 9 for i in range(100):
10     t = threading.Thread(target=addNum)
11     t.start()
12     thread_list.append(t)
13 
14 for t in thread_list: #等待所有執行緒執行完畢
15     t.join()
16 
17 print('final num:', num )  # 執行結果為0

程式碼二:

 1 import threading,time
 2 
 3 def addNum():
 4     global num #在每個執行緒中都獲取這個全域性變數
 5 
 6     temp=num
 7     # print('--get num:',num )
 8     time.sleep(0.1)
 9     num =temp-1 #對此公共變數進行減1操作
10 
11 num = 100  #設定一個共享變數
12 thread_list = []
13 for i in range(100):
14     t = threading.Thread(target=addNum)
15     t.start()
16     thread_list.append(t)
17 
18 for t in thread_list: #等待所有執行緒執行完畢
19     t.join()
20 
21 print('final num:', num )  # 執行結果是99

  從邏輯上看,以上兩端程式碼是一樣的,只不過第二段程式碼的實現過程是將num賦值給一箇中間變數temp,由這個中間變數完成計算然後再把結果賦值給回num。同時,在這個過程中加了一個等待時間0.1s。為什麼執行結果卻不一樣呢?就是因為這個0.1s的等待時間,第一個執行緒拿到的num值是100,在它準備計算前有一個等待時間0.1s,所以CPU切換到了第二個執行緒,它拿到的num的值還是100(因為第一個執行緒並未完成計算,num值未變).......,直到CPU切換第100個執行緒,它拿到的num的值還是100,100個執行緒每一個都沒有執行完就進行了切換。等待這0.1s的時間過去以後,所有的執行緒一個個開始計算,最後的結果都是99。

  那為什麼第一段程式碼沒有問題呢?因為CPU的計算太快了,CPU還沒來得及切換計算已經完成了。

  那我們如何來解決這個問題呢?可能大家想到了可以用join讓所有的執行緒程式設計序列的,這樣就不存在同時修改資料的可能了。但是,這樣的話任務內的所有程式碼都是序列執行的,而我們現在只想讓修改共享資料這部分序列執行,而其他部分還是並行執行。

  這時,我們就可以通過同步鎖來解決這種問題,程式碼如下:

 1 import threading,time
 2 
 3 def addNum():
 4     global num #在每個執行緒中都獲取這個全域性變數
 5     lock.acquire()  # 加鎖
 6     temp=num
 7     print('--get num:',num )
 8     time.sleep(0.1)
 9     num =temp-1 #對此公共變數進行減1操作
10     lock.release()  # 解鎖
11 
12 lock = threading.Lock()  # 例項化一把鎖
13 num = 100  #設定一個共享變數
14 thread_list = []
15 for i in range(100):
16     t = threading.Thread(target=addNum)
17     t.start()
18     thread_list.append(t)
19 
20 for t in thread_list: #等待所有執行緒執行完畢
21     t.join()
22 
23 print('final num:', num )  # 執行結果是0

問題解決了,但是我們還有個疑問,這個同步鎖和全域性直譯器鎖(GIL)有什麼關係呢?

  • Python的執行緒在GIL的控制之下,執行緒之間,對整個Python直譯器,對Python提供的C API的訪問都是互斥的,這可以看作是Python核心級的互斥機制。但是這種互斥是我們不能控制的,我們還需要另外一種可控的互斥機制——使用者級互斥。核心級通過互斥保護了核心的共享資源,同樣,使用者級互斥保護了使用者程式中的共享資源。
  • GIL 的作用是:對於一個直譯器,只能有一個執行緒在執行bytecode。所以每時每刻只有一條bytecode在被一個執行緒執行。GIL保證了bytecode 這層面上是執行緒是安全的。但是如果有個操作比如 x += 1,這個操作需要多個bytecodes操作,在執行這個操作的多條bytecodes期間的時候可能中途就切換執行緒了,這樣就出現了資料競爭的情況了。
  • 那我的同步鎖也是保證同一時刻只有一個執行緒被執行,是不是沒有GIL也可以?是的,那要GIL有什麼用?好像真的是沒用!!

7、死鎖和遞迴鎖

  線上程間共享多個資源的時候,如果兩個執行緒分別佔有一部分資源並且同時等待對方的資源,就會造成死鎖,因為系統判斷這部分資源都正在使用,所以這兩個執行緒在無外力作用下將一直等待下去。我們來看下面這段程式碼:

 1 import threading,time
 2 
 3 class myThread(threading.Thread):
 4     def doA(self):
 5         lockA.acquire()
 6         print(self.name,"gotlockA",time.ctime())
 7         time.sleep(3)
 8         lockB.acquire()
 9         print(self.name,"gotlockB",time.ctime())
10         lockB.release()
11         lockA.release()
12 
13     def doB(self):
14         lockB.acquire()
15         print(self.name,"gotlockB",time.ctime())
16         time.sleep(2)
17         lockA.acquire()
18         print(self.name,"gotlockA",time.ctime())
19         lockA.release()
20         lockB.release()
21     def run(self):
22         self.doA()
23         self.doB()
24 if __name__=="__main__":
25 
26     lockA=threading.Lock()
27     lockB=threading.Lock()
28     threads=[]
29     for i in range(5):
30         threads.append(myThread())
31     for t in threads:
32         t.start()
33     for t in threads:
34         t.join()

執行結果如下:

第一個執行緒執行完doA,再執行doB時,拿到了lockB,再想拿lockA時,發現lockA已經被第二個執行緒拿到了,第一個執行緒拿不到lockA了。同樣,第二個執行緒拿到了lockA,再想拿lockB時,發現此時lockB還在第一個執行緒手裡沒有釋放,所以第二個執行緒同樣也拿不到lockB。這樣就造成了一個現象:第一個執行緒在等待第二個執行緒釋放lockA,第二個執行緒在等第一個執行緒釋放lockB,這樣一直等下去造成了死鎖。解決的辦法就是使用遞迴鎖,如下:

 1 import threading,time
 2 
 3 class myThread(threading.Thread):
 4     def doA(self):
 5         lock.acquire()
 6         print(self.name,"gotlockA",time.ctime())
 7         time.sleep(3)
 8         lock.acquire()
 9         print(self.name,"gotlockB",time.ctime())
10         lock.release()
11         lock.release()
12 
13     def doB(self):
14         lock.acquire()
15         print(self.name,"gotlockB",time.ctime())
16         time.sleep(2)
17         lock.acquire()
18         print(self.name,"gotlockA",time.ctime())
19         lock.release()
20         lock.release()
21     def run(self):
22         self.doA()
23         self.doB()
24 if __name__=="__main__":
25 
26     lock = threading.RLock()  # 遞迴鎖
27     threads=[]
28     for i in range(5):
29         threads.append(myThread())
30     for t in threads:
31         t.start()
32     for t in threads:
33         t.join()

遞迴鎖就是將lockA=threading.Lock()和lockB=threading.Lock()改為了lock = threading.RLock(),執行結果如下:

8、訊號量

  • 訊號量是用來控制執行緒併發數的,BoundedSemaphore或Semaphore管理一個內建的計數器,每當呼叫acquire()時減1,呼叫release()時加1;
  • 計數器不能小於0,當計數器為0時,acquire()將阻塞執行緒至同步鎖定狀態,直到其他執行緒呼叫release()。(類似於停車位的概念);
  • BoundedSemaphore與Semaphore的唯一區別在於前者將在呼叫release()時檢查計數器的值是否超過了計數器的初始值,如果超過了將丟擲一個異常;
  • 訊號量實質上也是一把鎖;

例項:

 1 import threading,time
 2 class myThread(threading.Thread):
 3     def run(self):
 4         if semaphore.acquire():
 5             print(self.name)
 6             time.sleep(1)
 7             semaphore.release()
 8 
 9 if __name__=="__main__":
10     semaphore=threading.BoundedSemaphore(5)
11     thrs=[]
12     for i in range(100):
13         thrs.append(myThread())
14     for t in thrs:
15         t.start()

9、條件變數同步(Condition)

  • 有一類執行緒需要滿足條件之後才能夠繼續執行,Python提供了threading.Condition 物件用於條件變數執行緒的支援,它除了能提供RLock()或Lock()的方法外,還提供了 wait()、notify()、notifyAll()方法;
  • lock_con=threading.Condition([Lock/Rlock]):引數填寫建立鎖的型別,不是必填項,不傳引數預設建立的是Rlock鎖;
  • wait():條件不滿足時呼叫,執行緒會釋放鎖並進入等待阻塞;
  • notify():條件創造後呼叫,通知等待池啟用一個執行緒;
  • notifyAll():條件創造後呼叫,通知等待池啟用所有執行緒

例項:

import threading,time
from random import randint
class Producer(threading.Thread):
    def run(self):
        global L
        while True:
            val=randint(0,100)
            print('生產者',self.name,":Append"+str(val),L)
            if lock_con.acquire():
                L.append(val)
                lock_con.notify()
                lock_con.release()
            time.sleep(3)

class Consumer(threading.Thread):
    def run(self):
        global L
        while True:
                lock_con.acquire()
                if len(L)==0:
                    lock_con.wait()
                print('消費者',self.name,":Delete"+str(L[0]),L)
                del L[0]
                lock_con.release()
                time.sleep(1)

if __name__=="__main__":
    L=[]
    lock_con=threading.Condition()
    threads=[]
    for i in range(5):
        threads.append(Producer())
    threads.append(Consumer())
    for t in threads:
        t.start()
    for t in threads:
        t.join()

10、同步條件(Event)

  • 同步條件和條件變數同步差不多意思,只是少了鎖功能,因為同步條件設計於不訪問共享資源的條件環境;
  • event=threading.Event():條件環境物件,初始值 為False;

例項:

 1 import threading,time
 2 class Boss(threading.Thread):
 3     def run(self):
 4         print("BOSS:今晚大家都要加班到22:00。")
 5         event.isSet() or event.set()
 6         time.sleep(3)
 7         print("BOSS:<22:00>可以下班了。")
 8         event.isSet() or event.set()
 9 
10 class Worker(threading.Thread):
11     def run(self):
12         event.wait()
13         print("Worker:哎……命苦啊!")
14         time.sleep(1)
15         event.clear()
16         event.wait()
17         print("Worker:Oh,Yeah!!")
18 
19 if __name__=="__main__":
20     event=threading.Event()
21     threads=[]
22     for i in range(5):
23         threads.append(Worker())
24     threads.append(Boss())
25     for t in threads:
26         t.start()
27     for t in threads:
28         t.join()

11、執行緒佇列(queue)

  • queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.
  • 當資訊必須在多個執行緒之間安全地交換時,佇列線上程程式設計中特別有用;

queue佇列類的方法:

 1 # 建立一個“佇列”物件
 2 import queue
 3 q = queue.Queue(maxsize = 10)
 4 # queue.Queue類即是一個佇列的同步實現。佇列長度可為無限或者有限。可通過Queue的建構函式的可選引數maxsize來設定佇列長度。如果maxsize小於1就表示佇列長度無限。
 5 
 6 # 將一個值放入佇列中
 7 q.put(10)
 8 # 呼叫佇列物件的put()方法在隊尾插入一個專案。put()有兩個引數,第一個item為必需的,為插入專案的值;第二個block為可選引數,預設為:1。如果隊列當前為空且block為1,put()方法就使呼叫執行緒暫停,直到空出一個數據單元。如果block為0,put方法將引發Full異常。
 9 
10 # 將一個值從佇列中取出
11 q.get()
12 # 呼叫佇列物件的get()方法從隊頭刪除並返回一個專案。可選引數為block,預設為True。如果佇列為空且block為True,get()就使呼叫執行緒暫停,直至有專案可用。如果佇列為空且block為False,佇列將引發Empty異常。
13 
14 # Python Queue模組有三種佇列及建構函式:
15 queue.Queue(maxsize=10)          # Python Queue模組的FIFO佇列先進先出。
16 queue.LifoQueue(maxsize=10)      # LIFO類似於堆,即先進後出。
17 queue.PriorityQueue(maxsize=10)  # 還有一種是優先順序佇列級別越低越先出來。
18 
19 # 此包中的常用方法(q = queue.Queue()):
20 # q.qsize() 返回佇列的大小
21 # q.empty() 如果佇列為空,返回True,反之False
22 # q.full() 如果佇列滿了,返回True,反之False
23 # q.full 與 maxsize 大小對應
24 # q.get([block[, timeout]]) 獲取佇列,timeout等待時間
25 # q.get_nowait() 相當q.get(False)
26 # 非阻塞 q.put(item) 寫入佇列,timeout等待時間
27 # q.put_nowait(item) 相當q.put(item, False)
28 # q.task_done() 在完成一項工作之後,q.task_done() 函式向任務已經完成的佇列傳送一個訊號
29 # q.join() 實際上意味著等到佇列為空,再執行別的操作

例項:

 1 import threading,queue
 2 from time import sleep
 3 from random import randint
 4 class Production(threading.Thread):
 5     def run(self):
 6         while True:
 7             r=randint(0,100)
 8             q.put(r)
 9             print("生產出來%s號包子"%r)
10             sleep(1)
11 class Proces(threading.Thread):
12     def run(self):
13         while True:
14             re=q.get()
15             print("吃掉%s號包子"%re)
16 if __name__=="__main__":
17     q=queue.Queue(10)
18     threads=[Production(),Production(),Production(),Proces()]
19     for t in threads:
20         t.start()
21     for t in threads:
22         t.join()

六、multiprocessing模組

1、直接呼叫

Python中的多程序使用multiprocessing模組,執行多程序使用multiprocessing.Process(target=方法,args=(引數,)),如下:

例項一

 1 import multiprocessing,time
 2 
 3 def f(name):
 4     time.sleep(1)
 5     print('Hello!!',name,time.ctime())
 6 
 7 if __name__ == '__main__':
 8     p_list = []
 9     for i in range(3):
10         p = multiprocessing.Process(target=f,args=('Porcess',))
11         p_list.append(p)
12         p.start()
13     for i in p_list:
14         i.join()
15 
16     print('end')

例項二:一個簡單的多程序,multiprocessing.Process(target=run,args=(6,))

 1 import multiprocessing,threading
 2 def my():
 3     print('哈哈哈')
 4 
 5 def run(num):
 6     for i in range(num):
 7         t = threading.Thread(target=my)
 8         t.start()
 9 # 總共啟動5個程序,每個程序下面啟動6個執行緒,函式my()執行30次
10 if __name__ == '__main__':
11     process = []
12     for i in range(5):
13         # args只有一個引數一定後面要加逗號
14         p = multiprocessing.Process(target=run,args=(6,))  # 啟動一個程序
15         p.start()
16         process.append(p)
17         [p.join() for p in process]  # 與執行緒用法一致

2、類式呼叫(與多程序類似)

例項

自己寫一個類繼承 multiprocessing.Process類,在子類中重寫 run()方法,如下:

 1 class MyProcess(multiprocessing.Process):
 2     def __init__(self):
 3         super(MyProcess, self).__init__()
 4         # self.name = name
 5 
 6     def run(self):
 7         time.sleep(1)
 8         print('Hello!!',self.name,time.ctime())
 9 
10 if __name__ == '__main__':
11     p_list = []
12     for i in range(3):
13         p = MyProcess()
14         p.start()
15         p_list.append(p)
16     for p in p_list:
17         p.join()
18     print('end')

3、Process類

(1)構造方法:def __init__(self, group=None, target=None, name=None, args=(), kwargs={})

  • group: 執行緒組,目前還沒有實現,庫引用中提示必須是None;
  • target: 要執行的方法;
  • name: 程序名;
  • args/kwargs: 要傳入方法的引數;

(2)例項方法

  • is_alive():返回程序是否在執行;
  • terminate():不管任務是否完成,立即停止工作程序;
  • join([timeout]):阻塞當前上下文環境的程序程,直到呼叫此方法的程序終止或到達指定的timeout(可選引數);
  • start():程序準備就緒,等待CPU排程;
  • run():strat()呼叫run方法,如果例項程序時未制定傳入target,這star執行t預設run()方法;

(3)屬性

  •  daemon:和執行緒的setDeamon功能一樣;
  • exitcode(程序在執行時為None、如果為–N,表示被訊號N結束);
  • name:程序名;
  • pid:程序號;

4、程序間通訊

  不同程序間的資料是不共享的,要想實現多個程序間的資料交換可以用以下方法:

(1)Queues

  使用方法跟threading裡的queue類似,如下:

 1 from multiprocessing import Process,Queue
 2 
 3 def f(q,i):
 4     q.put([42,i,'hello'])
 5     print('subprocess q_id:',id(q))
 6 
 7 if __name__ == '__main__':
 8     q = Queue()
 9     p_list=[]
10     print('main q_id:',id(q))
11     for i in range(3):
12         p = Process(target=f, args=(q,i))
13         p_list.append(p)
14         p.start()
15     print(q.get())
16     print(q.get())
17     print(q.get())
18     for p in p_list:
19             p.join()

(2)Pipes

  Pipes函式的作用是:返回由管道連線的一對連線物件,管道預設情況下是雙工的(雙向的)

 1 from multiprocessing import Process,Pipe
 2 
 3 def f(child_conn):
 4     child_conn.send('子程序')
 5     child_conn.send([42, None, 'hello'])
 6     print(child_conn.recv())  # 子程序接收父程序傳送的資訊,列印'陣列'
 7     child_conn.close()
 8 
 9 if __name__ == '__main__':
10     parent_conn,child_conn = Pipe()
11     p = Process(target=f, args=(child_conn,))
12     p.start()
13     # 父程序接收子程序傳送的資訊
14     print(parent_conn.recv())  # 列印 '子程序'
15     print(parent_conn.recv())  # 列印 '[42, None, 'hello']'
16     parent_conn.send('陣列')    # 父程序給子程序傳送資訊
17     p.join()

  Pipe方法返回的兩個連線物件表示管道的兩端。每個連線物件都有send()和recv()方法。請注意,如果兩個程序(或執行緒)試圖同時從管道的同一端讀取或寫入資料,則管道中的資料可能會損壞。當然,在同一時間使用管道的不同端不會有流程損壞的風險。

(3)Managers

  Managers()返回的manager物件控制一個伺服器程序,該程序儲存Python物件,並允許其他程序使用代理操作它們。

 1 from multiprocessing import Process,Manager
 2 
 3 def f(dic,lis,n):
 4     dic[n] = '1'
 5     dic['2'] = 2
 6     dic[0.25] = None
 7     lis.append(n)
 8     # print(lis)
 9 
10 if __name__ == '__main__':
11     # with Manager() as manager:
12     manager = Manager()
13     dic = manager.dict()
14     lis = manager.list(range(5))
15     p_list = []
16     for i in range(10):
17         p = Process(target=f, args=(dic,lis,i))
18         p.start()
19         p_list.append(p)
20     for res in p_list:
21         res.join()
22 
23     print(dic)
24     print(lis)

  Manager()返回的管理器將支援型別list、dict、Namespace、Lock、RLock、Semaphore、BoundedSemaphore、Condition、Event、Barrier、Queue、Value和Array。

5、程序鎖

 1 from multiprocessing import Process,Lock
 2 
 3 def f(l,i):
 4     l.acquire()
 5     print('hello world',i)
 6     l.release()
 7 
 8 if __name__ == '__main__':
 9     lock = Lock()
10     p_list = []
11     for num in range(10):
12         p = Process(target=f, args=(lock,num))
13         p.start()
14         p_list.append(p)
15     for p in p_list:
16         p.join()
17     print('end')

6、程序池

  在程式實際處理問題過程中,忙時會有成千上萬的任務需要被執行,閒時可能只有零星任務。那麼在成千上萬個任務需要被執行的時候,我們就需要去建立成千上萬個程序麼?第一,建立程序需要消耗時間,銷燬程序也需要消耗時間。第二,即便開啟了成千上萬的程序,因為CPU核心數有限,作業系統也不能讓他們同時執行,這樣反而會影響程式的效率。因此我們不能無限制的根據任務開啟或者結束程序。那麼我們要怎麼做呢?

  在這裡,要給大家介紹一個程序池的概念。定義一個池子,在裡面放上固定數量的程序,有需求來了,就拿一個池中的程序來處理任務,等到處理完畢,程序並不關閉,而是將程序再放回程序池中繼續等待任務。如果有很多工需要執行,池中的程序數量不夠,任務就要等待之前的程序執行任務完畢歸來,拿到空閒程序才能繼續執行。也就是說,池中程序的數量是固定的,那麼同一時間最多有固定數量的程序在執行。這樣不會增加作業系統的排程難度,還節省了開關程序的時間,也一定程度上能夠實現併發效果。

  Python中的程序池使用multiprocessing模組的Pool類,主要方法如下:

1 apply(func [,args [,kwargs]]):在一個池工作程序中執行func(*args,**kwargs),然後返回結果。
2 '''需要注意的是:此操作並不會在所有的池工作程序中並執行func函式。如果要通過不同引數併發地執行func函式,必須從不同執行緒呼叫p.apply()函式或者使用apply_async()'''
3 
4 apply_async(func [,args [,kwargs]]):在一個池工作程序中執行func(*args,**kwargs),然後返回結果。
5 '''此方法的結果是AsyncResult類的例項,callback是可呼叫物件,接收輸入引數。當func的結果變為可用時,將回調函式傳遞給callback。callback禁止執行任何阻塞操作,否則將接收其他非同步操作中的結果。'''
6    
7 close():關閉程序池,防止進一步操作。如果所有操作持續掛起,它們將在工作程序終止前完成
8 
9 join():等待所有工作程序退出。此方法只能在close()或teminate()之後呼叫

(1)程序池和多程序效率對比

 1 import time,multiprocessing
 2 def func(i):
 3     i += 1
 4 
 5 if __name__ == '__main__':
 6     # 程序池
 7     p = multiprocessing.Pool(5)
 8     start = time.time()
 9     p.map(func,range(100))  # 使用map函式迴圈呼叫func函式
10     p.close()  # 不允許再向程序池中新增任務
11     p.join()
12     print(time.time()-start)  # 0.21729111671447754
13     # 多程序
14     start = time.time()
15     p_list = []
16     for i in range(100):
17         # 多程序呼叫func函式
18         p = multiprocessing.Process(target=func,args=(i,))
19         p.start()
20         p_list.append(p)
21     for p in p_list:
22         p.join()
23     print(time.time()-start)  # 3.7114222049713135

可以看到使用程序池只用了0.21秒,而使用多程序則使用了3.7秒之多,這是因為程序池只用了5個程序,而多程序則使用了100個程序。多程序排程100個程序比程序池中排程5個程序更耗時且更耗資源。

(2)程序池的同步/非同步呼叫

同步:序列

 1 import multiprocessing,os,time
 2 def func(i):
 3     print('%s run'%os.getpid())
 4     time.sleep(1)
 5     i += 1
 6 
 7 if __name__ == '__main__':
 8     # 程序池
 9     p = multiprocessing.Pool(5)
10     for i in range(20):
11         p.apply(func,args=(i,)) # 同步提交,序列執行

非同步:並行

 1 import time,multiprocessing
 2 def func(i):
 3     time.sleep(1)
 4     i += 1
 5     print(i)
 6 
 7 if __name__ == '__main__':
 8     # 程序池
 9     p = multiprocessing.Pool(5)
10     for i in range(20):
11         p.apply_async(func, args=(i,))  # 非同步提交,並行執行
12     p.close()  # 不允許再向程序池中新增任務,close()必須在join()前面
13     # 使用非同步提交的任務,必須新增join()
14     p.join()   # 等待子程序結束再往下執行,否則主程序結束了子程序還沒執行完
15     print('end')

(3)接收程序池呼叫函式的返回結果

方法一:

 1 import time,multiprocessing
 2 
 3 def func(i):
 4     time.sleep(1)
 5     i += 1
 6     return i
 7 
 8 if __name__ == '__main__':
 9     # 程序池
10     p = multiprocessing.Pool(5)
11     res_l = []
12     for i in range(20):
13         res = p.apply_async(func,args=(i,))  # 非同步提交
14         # print(res.get()) # 阻塞,等待任務結果
15         res_l.append(res)  # 返回結果之後,將結果放入列表,歸還程序,之後再執行新的任務
16                            # 需要注意的是,程序池中的三個程序不會同時開啟或者同時結束,而是執行完一個就釋放一個程序,這個程序就去接收新的任務
17     p.close() # 不允許再向程序池中新增任務,close()必須在join()前面
18     p.join()  # 等待子程序結束再往下執行,必須新增join(),否則主程序結束了子程序還沒執行完
19     for i in res_l:
20         print(i.get()) # 使用get來獲取apply_aync的結果,如果是apply,則沒有get方法,因為apply是同步執行,立刻獲取結果,也根本無需get
21     print('end')

方法二:

 1 from multiprocessing import Process,Pool
 2 import time
 3 
 4 def Foo(i):
 5     time.sleep(2)
 6     return i + 100
 7 
 8 def Bar(arg):  # 回撥函式是在主程序中完成的,直接接收子程序中函式的返回值,不能傳另外的引數
 9     print('----->exec done:',arg)
10 
11 if __name__ == '__main__':
12     pool = Pool(5)  # 允許程序池裡同時放入5個程序
13     for i in range(10):
14         pool.apply_async(func=Foo, args=(i,),callback=Bar)  # 並行執行,Bar函式接收Foo函式的返回結果,callback回撥執行者為主程序
15     pool.close()
16     pool.join()  # 程序池中程序執行完畢後再關閉,如果註釋,那麼程式直接關閉
17     print('end')

(4)爬蟲例項

 1 import multiprocessing,requests
 2 
 3 def get_url(url):
 4     res = requests.get(url)
 5     return {'url':url,
 6             'status_code':res.status_code,
 7             'content':res.text}
 8 
 9 def parser(dic):
10     print(dic['url'],dic['status_code'],len(dic['content']))
11 
12 
13 if __name__ == '__main__':
14     url_list = ['http://www.baidu.com',
15                 'http://www.hao123.com',
16                 'http://www.163.com',
17                 'http://www.csdn.com']
18     p = multiprocessing.Pool(4)
19     res_l = []
20     for url in url_list:
21         p.apply_async(get_url,args=(url,),callback=parser)
22     p.close()
23     p.join()
24     print('END!!')

七、多執行緒、多程序總結

1、多執行緒:

  多用於IO密集型行為(上傳/下載)

2、多程序

  多用於CPU密集型任務(計算/排序)