python 遞迴鎖、訊號量、事件、執行緒佇列、程序池和執行緒池、回撥函式
阿新 • • 發佈:2020-08-26
死鎖現象
所謂死鎖: 是指兩個或兩個以上的程序或執行緒在執行過程中,因爭奪資源而造成的一種互相等待的現象,若無外力作用,它們都將無法推進下去。
此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的程序稱為死鎖程序
程式碼示例:
from threading import Thread,Lock import time mutexA = Lock() mutexB = Lock() class MyThread(Thread): def run(self): self.task1() self.task2() def task1(self): mutexA.acquire() print('%s task1 get A' %self.name) mutexB.acquire() print('%s task1 get B' % self.name) mutexB.release() mutexA.release() def task2(self): mutexB.acquire() print('%s task2 get B' % self.name) time.sleep(1) # Thread-2 拿到執行權,執行get A出現死鎖,此時thread2需要B鎖,而thread1佔用,與此同時,thread1需要A鎖,thread2佔用 mutexA.acquire() print('%s task2 get A' % self.name) mutexA.release() mutexB.release() if __name__ == '__main__': for i in range(10): t = MyThread() t.start()
輸出結果:
Thread-1 task1 get A
Thread-1 task1 get B
Thread-1 task2 get B
Thread-2 task1 get A # 出現死鎖,整個程式被阻塞
解決方法,遞迴鎖,在Python中為了支援在同一執行緒中多次請求同一資源,python提供了可重入鎖RLock。 這個RLock內部維護著一個Lock和一個counter變數,counter記錄了acquire的次數,從而使得資源可以被多次require。 直到一個執行緒所有的acquire都被release,其他的執行緒才能獲得資源。上面的例子如果使用RLock代替Lock,則不會發生死鎖,二者的區別是:遞迴鎖可以連續acquire多次,而互斥鎖只能acquire一次
程式碼示例:
from threading import Thread,RLock import time mutexA = mutexB = RLock() class MyThread(Thread): def run(self): self.task1() self.task2() def task1(self): mutexA.acquire() print('%s task1 get A' %self.name) mutexB.acquire() print('%s task1 get B' % self.name) mutexB.release() mutexA.release() time.sleep(1) # Thread-2 拿到執行權,,此時counter=0,thread2執行task1 def task2(self): mutexB.acquire() print('%s task2 get B' % self.name) mutexA.acquire() print('%s task2 get A' % self.name) mutexA.release() mutexB.release() if __name__ == '__main__': for i in range(10): t = MyThread() t.start()
輸出結果:
Thread-1 task1 get A
Thread-1 task1 get B
Thread-2 task1 get A
Thread-2 task1 get B
Thread-3 task1 get A
Thread-3 task1 get B
Thread-4 task1 get A
Thread-4 task1 get B
Thread-5 task1 get A
Thread-5 task1 get B
Thread-6 task1 get A
......
訊號量也是一種鎖。
訊號量的主要用途是用來控制執行緒的併發量的,Semaphore管理一個內建的計數器,每呼叫一次acquire()方法時,計數器-1,每呼叫一次release()方法時,內部計數器+1。
不過需要注意的是,Semaphore內部的計數器不能小於0!當它內部的計數器等於0的時候,這個執行緒會被鎖定,進入阻塞狀態,直到其他執行緒去呼叫release方法。
訊號量`semaphore` 是用於控制進入數量的鎖。有哪些應用場景呢,比如說在讀寫檔案的時候,一般只能只有一個執行緒在寫,而讀可以有多個執行緒同時進行,
如果需要限制同時讀檔案的執行緒個數,這時候就可以用到訊號量了(如果用互斥鎖,就是限制同一時刻只能有一個執行緒讀取檔案)。
又比如在做爬蟲的時候,有時候爬取速度太快了,會導致被網站禁止,所以這個時候就需要控制爬蟲爬取網站的頻率。
Semaphore(value=1) # value設定是內部維護的計數器的大小,預設為1.
主要有兩個方法:
每當呼叫acquire()時,內建計數器-1,直到為0的時候阻塞
每當呼叫release()時,內建計數器+1,並讓某個執行緒的acquire()從阻塞變為不阻塞
import threading
import time
class htmlSpider(threading.Thread):
def __init__(self, url, sem):
super().__init__()
self.url = url
self.sem = sem
def run(self):
time.sleep(2)
print("got html text success")
self.sem.release() # 內部維護的計數器加1,並通知內部維護的conditon通知acquire
class UrlProducer(threading.Thread):
def __init__(self, sem):
super().__init__()
self.sem = sem
def run(self):
for i in range(20):
self.sem.acquire() # 內部維護的計數器減1,到0就會阻塞
html_thread = htmlSpider("http://baidu.com/{}".format(i), self.sem)
html_thread.start()
if __name__ == "__main__":
sem = threading.Semaphore(3) #設定同時最多3個
url_producer = UrlProducer(sem)
url_producer.start()
從結果可以看出,每次都幾乎是三個同時的完成任務。
1. 什麼是事件
同進程的一樣,執行緒的一個關鍵特性是每個執行緒都是獨立執行且狀態不可預測。如果程式中的其 他執行緒需要通過判斷某個執行緒的狀態來確定自己下一步的操作,
這時執行緒同步問題就會變得非常棘手。為了解決這些問題,我們需要使用threading庫中的Event物件。 物件包含一個可由執行緒設定的訊號標誌,
它允許執行緒等待某些事件的發生。在 初始情況下,Event物件中的訊號標誌被設定為假。如果有執行緒等待一個Event物件,
而這個Event物件的標誌為假,那麼這個執行緒將會被一直阻塞直至該標誌為真。一個執行緒如果將一個Event物件的訊號標誌設定為真,
它將喚醒所有等待這個Event物件的執行緒。如果一個執行緒等待一個已經被設定為真的Event物件,那麼它將忽略這個事件, 繼續執行
2. Event幾種方法
event.isSet() | 返回event的狀態值; |
---|---|
event.wait() | 如果 event.isSet()==False將阻塞執行緒; |
event.set() | 設定event的狀態值為True,所有阻塞池的執行緒啟用進入就緒狀態, 等待作業系統度; |
event.clear() | 恢復event的狀態值為False。 |
3.程式碼示例
from threading import Thread,Event
import time
event=Event()
def light():
print('紅燈正亮著')
time.sleep(3)
event.set() #綠燈亮
def car(name):
print('車%s正在等綠燈' %name)
event.wait() #等燈綠 此時event為False,直到event.set()將其值設定為True,才會繼續執行.
print('車%s通行' %name)
if __name__ == '__main__':
# 紅綠燈
t1=Thread(target=light)
t1.start()
# 車
for i in range(10):
t=Thread(target=car,args=(i,))
t.start()
四、執行緒佇列Queue
1. 佇列分類
Queue | 哪個資料先存入,取資料的時候先取哪個資料,同生活中的排隊買東西 |
---|---|
LifoQueue | 哪個資料最後存入的,取資料的時候先取,同生活中手槍的彈夾,子彈最後放入的先打出 |
PriorityQueue | 存入資料時候加入一個優先順序,取資料的時候優先順序最高的取出 |
2. Queue簡介
執行緒佇列Queue,也稱FIFO,存在佇列中的資料先進先出,就好比拉肚子,吃什麼拉什麼~~呃呃,有點重口味
Queue常用函式:
Queue.qsize() 返回佇列大小
Queue.empty() 判斷佇列是否為空
Queue.full() 判斷佇列是否滿了
Queue.get([block[,timeout]]) 從佇列頭刪除並返回一個item,block預設為True,表示當佇列為空卻去get的時候會阻塞執行緒,等待直到有有item出現為止來get出這個item。如果是False的話表明當佇列為空你卻去get的時候,會引發異常。在block為True的情況下可以再設定timeout引數。表示當佇列為空,get阻塞timeout指定的秒數之後還沒有get到的話就引發Full異常。
Queue.task_done() 從場景上來說,處理完一個get出來的item之後,呼叫task_done將向佇列發出一個訊號,表示本任務已經完成(與Queue.get配對使用)。
Queue.put(…[,block[,timeout]]) 向隊尾插入一個item,同樣若block=True的話佇列滿時就阻塞等待有空位出來再put,block=False時引發異常。同get的timeout,put的timeout是在block為True的時候進行超時設定的引數。
Queue.join() 監視所有item並阻塞主執行緒,直到所有item都呼叫了task_done之後主執行緒才繼續向下執行。這麼做的好處在於,假如一個執行緒開始處理最後一個任務,它從任務佇列中拿走最後一個任務,此時任務佇列就空了但最後那個執行緒還沒處理完。當呼叫了join之後,主執行緒就不會因為佇列空了而擅自結束,而是等待最後那個執行緒處理完成了。
Queue使用:
# !usr/bin/env python
# -*- coding:utf-8 _*-
import threading
import queue
q = queue.Queue(5) # 長度,佇列中最多存放5個數據
def put():
for i in range(20):
q.put(i)
print("數字%d存入佇列成功" % i)
q.join() # 阻塞程序,直到所有任務完成,取多少次資料task_done多少次才行,否則最後的ok無法列印
print('ok')
def get():
for i in range(20):
value = q.get()
print("數字%d重佇列中取出" % value)
q.task_done() # 必須每取走一個數據,發一個訊號給join
# q.task_done() #放在這沒用,因為join實際上是一個計數器,put了多少個數據,
# 計數器就是多少,每task_done一次,計數器減1,直到為0才繼續執行
t1 = threading.Thread(target=put, args=())
t1.start()
t2 = threading.Thread(target=get, args=())
t2.start()
輸出結果:
數字0存入佇列成功
數字1存入佇列成功
數字2存入佇列成功
數字3存入佇列成功
數字4存入佇列成功
數字0重佇列中取出
數字1重佇列中取出
數字2重佇列中取出
數字3重佇列中取出
數字4重佇列中取出
......
與Queue相反,最後存入的資料最先取出,最先存入的資料最後取出,如果說FIFO是吃什麼拉什麼,那麼LIFO就是吃什麼吐什麼,先吃的後吐,後吃的先吐~~真是重口味呀!
LifoQueue函式介紹:
函式不做過多介紹了,已經在 python執行緒佇列Queue-FIFO 有了詳細講解,兩者都屬於Queue,函式都一樣!
LifoQueue使用:
# !usr/bin/env python
# -*- coding:utf-8 _*-
import queue
import threading
import time
# 可以設定佇列的長度 q=queue.LifoQueue(5),意味著佇列中最多存放5個元素,當佇列滿的時候自動進入阻塞狀態
q=queue.LifoQueue()
def put():
for i in range(10):
q.put(i)
print("資料%d被存入到佇列中" % i)
q.join()
print('ok')
def get():
for i in range(10):
value = q.get()
print("資料%d從佇列中取出" % value)
q.task_done()
t1=threading.Thread(target=put,args=())
t1.start()
t2=threading.Thread(target=get,args=())
t2.start()
輸出結果:
資料0被存入到佇列中
資料1被存入到佇列中
資料2被存入到佇列中
資料3被存入到佇列中
資料4被存入到佇列中
資料5被存入到佇列中
資料6被存入到佇列中
資料7被存入到佇列中
資料8被存入到佇列中
資料9被存入到佇列中
資料9從佇列中取出
資料8從佇列中取出
資料7從佇列中取出
資料6從佇列中取出
資料5從佇列中取出
資料4從佇列中取出
資料3從佇列中取出
資料2從佇列中取出
資料1從佇列中取出
資料0從佇列中取出
ok
在資料存入的時候設定優先順序,設定的值越小,優先順序越高,取資料的時候預設按照優先順序最高的取出
PriorityQueue函式介紹:
函式不做過多介紹了,已經在 python執行緒佇列Queue-FIFO 有了詳細講解,兩者都屬於Queue,函式都一樣!
PriorityQueue使用:
按優先順序:不管是數字、字母、列表、元組等(字典、集合沒測),使用優先順序存資料取資料,佇列中的資料必須是同一型別,都是按照實際資料的ascii碼錶的順序進行優先順序匹配,漢字是按照unicode表。
# !usr/bin/env python
# -*- coding:utf-8 _*-
import queue
import threading
import time
q = queue.PriorityQueue()
q.put([1, 'ace'])
q.put([40, 333])
q.put([3, 'afd'])
q.put([5, '4asdg'])
# 1是級別最高的,
while not q.empty(): # 不為空時候執行
print(q.get())
q = queue.PriorityQueue()
q.put('我')
q.put('你')
q.put('他')
q.put('她')
q.put('ta')
while not q.empty():
print(q.get())
執行結果:
[1, 'ace']
[3, 'afd']
[5, '4asdg']
[40, 333]
ta
他
你
她
我
五、Python程序池、執行緒池、回撥函式
1. 池的概念
不管是執行緒還是程序,都不能無限制的開下去,總會消耗和佔用資源。
也就是說,硬體的承載能力是有限度的,在保證高效率工作的同時應該還需要保證硬體的資源佔用情況,所以需要給硬體設定一個上限來減輕硬體的壓力,所以就有了池的概念。
from concurrent.futures import ProcessPoolExecutor # 匯入程序池模組
from concurrent.futures import ThreadPoolExecutor # 匯入執行緒池模組
import os
import time
import random
# 下面以程序池為例,執行緒池只是使用匯入模組不一樣,僅此而已。
def task(name):
print('name:[%s]|程序:[%s]正在執行' % (name, os.getpid()))
time.sleep(random.randint(1, 3)) # 模擬程序執行耗費時間。
# 這一步的必要性:在建立程序時,會將程式碼以模組的方式從頭到尾匯入載入執行一遍
# (所以建立執行緒如果不寫在main裡面的話,這個py檔案裡面的所有程式碼都會從頭到尾載入執行一遍
# 就會導致在建立程序的時候產生死迴圈。)
if __name__ == '__main__':
pool = ProcessPoolExecutor(4) # 設定執行緒池的大小,預設等於cpu的核心數。
for i in range(10):
pool.submit(task, '程序%s' % i) # 非同步提交(提交後不等待)
pool.shutdown(wait=True) # 關閉程序池入口不再提交,同時等待程序池全部執行完畢。(類似join方法)
print('主') # 標識一下主程序的完畢之前的語句
執行結果:
name:[程序0]|程序:[17656]正在執行
name:[程序1]|程序:[14380]正在執行
name:[程序2]|程序:[18956]正在執行
name:[程序3]|程序:[3564]正在執行
name:[程序4]|程序:[14380]正在執行
name:[程序5]|程序:[18956]正在執行
name:[程序6]|程序:[3564]正在執行
name:[程序7]|程序:[18956]正在執行
name:[程序8]|程序:[3564]正在執行
name:[程序9]|程序:[17656]正在執行
主
from concurrent.futures import ProcessPoolExecutor # 匯入程序池模組
from concurrent.futures import ThreadPoolExecutor # 匯入執行緒池模組
import os
import time
import random
def task(name):
print('name:[%s]|程序[%s]正在執行...' % (name, os.getpid()))
time.sleep(random.randint(1, 3))
return '拿到[%s]|程序%s的結果...' % (name, os.getpid())
if __name__ == '__main__':
pool = ProcessPoolExecutor(4)
result = [] # 建立一個空列表來蒐集執行結果
for i in range(10):
res = pool.submit(task, '程序%s' % i).result() # 使用.result()方法得到每次的結果,同步呼叫
result.append(res)
pool.shutdown(wait=True)
for j in result:
print(j)
print('主程序')
執行結果:
name:[程序0]|程序[3376]正在執行...
name:[程序1]|程序[27124]正在執行...
name:[程序2]|程序[10176]正在執行...
name:[程序3]|程序[28636]正在執行...
name:[程序4]|程序[3376]正在執行...
name:[程序5]|程序[27124]正在執行...
name:[程序6]|程序[10176]正在執行...
name:[程序7]|程序[28636]正在執行...
name:[程序8]|程序[3376]正在執行...
name:[程序9]|程序[27124]正在執行...
拿到[程序0]|程序3376的結果...
拿到[程序1]|程序27124的結果...
拿到[程序2]|程序10176的結果...
拿到[程序3]|程序28636的結果...
拿到[程序4]|程序3376的結果...
拿到[程序5]|程序27124的結果...
拿到[程序6]|程序10176的結果...
拿到[程序7]|程序28636的結果...
拿到[程序8]|程序3376的結果...
拿到[程序9]|程序27124的結果...
主程序
非同步呼叫:提交任務,不去等結果,繼續執行
from concurrent.futures import ProcessPoolExecutor
import os
import random
import time
def task(name):
time.sleep(random.randint(1, 3))
print('name: %s 程序[%s]執行...' % (name, os.getpid()))
if __name__ == '__main__':
pool = ProcessPoolExecutor(4)
for i in range(10):
pool.submit(task, '程序%s' % i) # 非同步呼叫,提交後不等待結果,繼續執行程式碼
pool.shutdown(wait=True)
print('主程序')
執行結果:
name: 程序3 程序[10016]執行...
name: 程序0 程序[12736]執行...
name: 程序1 程序[4488]執行...
name: 程序2 程序[3920]執行...
name: 程序5 程序[12736]執行...
name: 程序6 程序[4488]執行...
name: 程序4 程序[10016]執行...
name: 程序9 程序[4488]執行...
name: 程序8 程序[12736]執行...
name: 程序7 程序[3920]執行...
主程序
六、回撥函式
1.什麼是回撥函式
上面我們在演示非同步呼叫時候,說過提交任務不等待執行結果,繼續往下執行程式碼,那麼,執行的結果我們怎麼得到呢?
可以為程序池和執行緒池內的每個程序或執行緒繫結一個函式,該函式在程序或執行緒的任務執行完畢後自動觸發並接收任務的返回值當做引數,這個函式就是回撥函式。
from concurrent.futures import ThreadPoolExecutor
import time
import random
import requests
def task(url):
print('獲取網站[%s]資訊' % url)
response = requests.get(url) # 下載頁面
time.sleep(random.randint(1, 3))
return {'url': url, 'content': response.text} # 返回結果:頁面地址和頁面內容
futures = []
def back(res):
res = res.result() # 取到提交任務的結果(回撥函式固定寫法)
res = '網站[%s]內容長度:%s' % (res.get('url'), len(res.get('content')))
futures.append(res)
return futures
if __name__ == '__main__':
urls = [
'http://www.baidu.com',
'http://www.dgtle.com/',
'https://www.bilibili.com/'
]
pool = ThreadPoolExecutor(4)
futures = []
for i in urls:
pool.submit(task, i).add_done_callback(back) # 執行完執行緒後,使用回撥函式
pool.shutdown(wait=True)
for j in futures:
print(j)
執行結果:
獲取網站[http://www.baidu.com]資訊
獲取網站[http://www.dgtle.com/]資訊
獲取網站[https://www.bilibili.com/]資訊
網站[http://www.dgtle.com/]內容長度:39360
網站[https://www.bilibili.com/]內容長度:69377
網站[http://www.baidu.com]內容長度:2381
以上內容抄自於:
https://www.jianshu.com/p/58ec3e7a1edb
https://www.cnblogs.com/shuopython/p/11986089.html
https://www.cnblogs.com/zhangshengxiang/p/9606133.html
https://blog.csdn.net/qq_42992919/article/details/98201031