1. 程式人生 > >[python模塊]隊列queue

[python模塊]隊列queue

queue FifoQueue FiloQueue 隊列

一、隊列queue

隊列queue 多應用在多線程場景,多線程訪問共享變量。

對於多線程而言,訪問共享變量時,隊列queue的線程安全的。

因為queue使用了一個線程鎖(pthread.Lock()),以及三個條件變量(pthread.condition()),來保證了線程安全。

總結:隊列提供了一個安全可靠的共享數據使用方案。

隊列內置控制安全的幾個參數,非用戶使用名稱作用
self.mutex互斥鎖任何獲取隊列的狀態(empty(),qsize()等),或者修改隊列的內容的操作(get,put等)都必須持有該互斥鎖。共有兩種操作require獲取鎖,release釋放鎖。同時該互斥鎖被三個共享變量同時享有,即操作conditiond時的require和release操作也就是操作了該互斥鎖。
self.not_full

條件變量

隊列沒滿

當隊列中有元素添加後,會通知notify其他等待添加元素的線程,喚醒等待require互斥鎖,或者有線程從隊列中取出一個元素後,通知其它線程喚醒以等待require互斥鎖。
self.not_empty

條件變量

隊列不為空

線程添加數據到隊列中後,會調用self.not_empty.notify()通知其它線程,喚醒等待require互斥鎖後,讀取隊列。
self.all_tasks_done

條件變量

隊列數據全部處理完

消費者線程從隊列中get到任務後,任務處理完成,當所有的隊列中的任務處理完成後,會使調用queue.join()的線程返回,表示隊列中任務以處理完畢。
###queue的初始化函數###
def __init__(self, maxsize=0):
        self.maxsize = maxsize
        self._init(maxsize)
        
        # mutex must be held whenever the queue is mutating.  All methods    
        # that acquire mutex must release it before returning.  mutex    
        # is shared between the three conditions, so acquiring and    
        # releasing the conditions also acquires and releases mutex.  
          
        self.mutex = _threading.Lock()   
         
        # Notify not_empty whenever an item is added to the queue; a    
        # thread waiting to get is notified then.    
        
        self.not_empty = _threading.Condition(self.mutex)    
        
        # Notify not_full whenever an item is removed from the queue;    
        # a thread waiting to put is notified then.    
        
        self.not_full = _threading.Condition(self.mutex)    
        
        # Notify all_tasks_done whenever the number of unfinished tasks    
        # drops to zero; thread waiting to join() is notified to resume   
         
        self.all_tasks_done = _threading.Condition(self.mutex)    
        self.unfinished_tasks = 0

二、隊列數據存取規則:

數據使用方式
類名
作用示例
FIFO先進先出
Queue(maxsize)

先進入隊列的數據,先取出

maxsize:>=0

設置隊列長度,0為無限長


q = queue.Queue()
FILO先進後出
LifoQueue(maxsize)

先進入隊列的數據,最後取出

maxsize:>=0

設置隊列長度,0為無限長


q = queue.LifoQueue()
Priority優先級
PriorityQueue(maxsize)

設置優先標誌,優先取出高標誌位

maxsize:>=0

設置隊列長度,0為無限長


q = queue.PriorityQueue()

###例子一:先進先出###
import queue
q = queue.Queue()
for i in range(5):
    q.put(i)
    
for i in range(5):
    print(q.get(),end=" ")
#---結果---
0 1 2 3 4 

###例子二:後進先出###
import queue
q = queue.LifoQueue()
for i in range(5):
    q.put(i)
for i in range(5):
    print(q.get(),end=" ")
#---結果---
4 3 2 1 0

###例子三:按優先標誌位讀取###
#參考其它資料,看到許多講述優先級隊列的實現,但是我覺得直接用元組的方式比較簡單粗暴。
import queue
p = queue.PriorityQueue()
p.put((3,"3"))
p.put((1,"1"))
p.put((4,"4"))
p.put((2,"2"))

for i in range(3):
    print(p.get())
#---結果:按元組索引0排序---    
(1, '1')
(2, '2')
(3, '3')
(4, '4')
    
    
###例子四:多元組判斷###   
import queue

p = queue.PriorityQueue()

p.put((1,4,"a"))
p.put((2,1,"666"))
p.put((1,3,"4"))
p.put((2,2,"2"))


for i in range(3):
    print(p.get())   
#-----結果:元組對應的序號進行比較,主鍵是序號0,越往後,優先度越低。-----
(1, 3, '4')
(1, 4, 'a')
(2, 1, '666')
(2, 2, '2')

、隊列的常用方法和屬性:

方法和屬性作用
示例
task_done()

1、標記之前的一個任務已經完成。

2、由隊列的消費者線程調用。每一個get()調用得到一個任務,接下來的task_done()調用告訴隊列該任務已經處理完畢。

3、如果當前的join()當前處於阻塞狀態,當前的所有元素執行後都會重啟(意味著收到加入queue的每一個對象的task_done()調用的信息)


join()

阻塞:

等待隊列所有任務執行結束。

當消費者線程調用task_done(),隊列中未完成的計數就會減少,直至計數為0,解除阻塞。


put(item,block,timeout)

把對象item放入隊列:

item:對象名稱,必填項。

block:

默認是True,如果隊列滿等待。

設置成False,如果隊列滿報Full異常。

timeout:【block為True是生效】

默認是None,如果隊列滿了等待。

0:不等待,隊列滿了立即報Full。

正數1~:等待相應秒數,秒數到了,隊列還是滿的,報錯Full。


put_nowait(item)向隊列裏存對象,不等待
get(block,timeout)

從隊列取出對象,並把對象從隊列中刪除

block:

默認是True,隊列為空等待。

可以變更為False,如果隊列為空,報Empty錯誤。

timeout:【block為True是生效】

默認是None,隊列為空,等待。

0:不等待,隊列為空直接報Empty。

正數1~:等待相應秒數,如果依然為空,則報Empty


get_nowait()從隊列裏取對象,不等待
qsize()

返回隊列長度的近似值。

qsize長度不做為get和put方法的操作依據。


empty()

隊列為空返回True

不做為get和put方法的操作依據。


full()

隊列滿了返回True

不做為get和put方法的操作依據。



四、隊列數據進出規則實例 :

也是一個最簡單的生產者消費者例子。


'''例子一:隊列基本的進出規則'''


import queue,time,threading,random

def productor(name,s):                        # 生產者函數,向隊列裏放產品
	time.sleep(s)
	print ('服務員{}有時間了'.format(name))
	q.put(name)

def customer():                               # 消費者函數,從隊列裏取產品
	s = q.get()
	print ('服務員{}被叫走了'.format(s))


l = []
q = queue.LifoQueue()        # 後進先出,把LifoQueue改成Queue,先進先出。

for i in range(5):
	n = random.randint(1,7)
	t = threading.Thread(target=productor,args=(i,n))    # 生產線程
	l.append(t)
	t.start()

for i in l:
	i.join()
	customer()
	
	
	        
#-----運行結果:因為有Random,所以結果不固定,主要觀察消費順序。------
服務員0有時間了
服務員0被叫走了

服務員1有時間了
服務員1被叫走了

服務員4有時間了
服務員3有時間了
服務員2有時間了

服務員2被叫走了
服務員3被叫走了
服務員4被叫走了


參考資料:

http://python.jobbole.com/87592/

https://www.2cto.com/kf/201608/540910.html

https://www.cnblogs.com/itogo/p/5635629.html

[python模塊]隊列queue