python爬蟲多執行緒之queue
首先先來介紹下queue這個包吧,這個包叫佇列,沒錯,就是那個和棧反過來的那個佇列,大家一聽佇列就隨口說出先進先出,而棧則是後進先出,為什麼要用用佇列來實現,其實我也不知道,反正用過之後很順手,具體哪裡也說不上來
先來看下佇列的內建方法的,我們只需要記住兩個,一個是put 放 ,另一個是get 獲得,因為我們q = queue.Queue()建立了一個佇列後,這個佇列是空的,要先放東西進去才能從裡面拿東西出來
q = queue.Queue()
q.qsize() 返回佇列的大小
q.empty() 如果佇列為空,返回True,反之False
q.full() 如果佇列滿了,返回True,反之False
q.full 與 maxsize 大小對應
q.get([block[, timeout]]) 獲取佇列,timeout等待時間
q.get_nowait() 相當q.get(False)
q.put(item) 寫入佇列,timeout等待時間
q.put_nowait(item) 相當q.put(item, False)
q.task_done() 在完成一項工作之後,q.task_done() 函式向任務已經完成的佇列傳送一個訊號
q.join() 實際上意味著等到佇列為空,再執行別的操作
下面是我自己寫的一個簡單的程式碼,我會用文字來註釋,以便大家能看懂,真心簡單,熟悉讀完這篇文章後你如果還不會的話,那我也沒什麼說的了
#開啟多個執行緒,同時執行任務,有幾個執行緒就執行幾個任務 import threading import time import queue class MyThread(threading.Thread): def __init__(self, func): threading.Thread.__init__(self) self.func = func def run(self): self.func() def worker(): while not q.empty(): item = q.get() # 或得任務 print('Processing : ',item) time.sleep(1) def main(): threads = [] for task in range(100): q.put(task) for i in range(threadNum): #開啟三個執行緒 thread = MyThread(worker) thread.start() threads.append(thread) for thread in threads: thread.join() if __name__ == '__main__': q = queue.Queue() threadNum = 3 main()
看程式碼先看主入口,根據裡面的函式呼叫一步步來
1. 首先我例項化了一個佇列 q = queue.Queue() ,然後我設定執行緒數為3,接著呼叫main() 方法
2. 進入main函式中,我建立了一個空列表,用來放執行緒,第一個for 迴圈中我做的是將(0,100)之間資料放入隊列當中,當然,100是取不到的,放完佇列後又來了個for迴圈,這個for迴圈是用來建立執行緒的,前面說了threadNum = 3,那麼說明我這裡要迴圈三次,threadNum的值分別為0,1,2,當然,這三個數用不到,因為我沒在MyThread寫執行緒名,沒有把thread作為引數傳入類中,所以每個執行緒都是無名氏,我們看到thread = MyThread(worker) 這段程式碼,先來看MyThread這個類,worker這個引數先不看,MyThread中這個類繼承了threading.Thread,在init方法中初始化了父類,也就是threading.Thread,並且定義了一個屬性,屬性名叫func,因為threading這個類有自己的run方法,我們可以重寫父類的run方法,self.func() 表明但我們start後,執行緒會自動呼叫run方法,就會執行self.func() 這句
3. 看完MyThread這個類後我們就來看worker這個引數,有的人就會問,看了半天沒看到有這個引數啊,誰說引數一定是變數啊,也有可能是函式啊,現在就有人恍然大悟了,看到worker這個方法。這個方法裡面一個while迴圈,我使用的一個佇列的方法,這個方法是q.empty(),這個方法說明當佇列q為空時,他的值為True,前面寫了個not ,說明整句的意思是,當佇列q不為空時才會執行這個while迴圈。一開始定義一個變數item 來接收得到的資料,列印一遍並且休眠一秒,不休眠的話程式執行太快,看不到多執行緒的效果,以上 thread = MyThread(worker) 這句程式碼就解釋完了
4. thread.start() 這句是啟動執行緒,threads.append(thread)是將執行緒加入到前面定義的threads空列表中,最後這個for迴圈是將每個執行緒都join()一下,join的意思是等執行緒結束後才會執行後面的語句,我們這裡的意思是前面三個執行緒跑完了才能輪到後面三個執行緒
5. 基本的程式碼都講解完畢了,現在來執行程式碼
就會發現,一次輸出三句,停頓一秒後又輸出三句,因為開了三個執行緒,有興趣的可以把我程式碼複製下來並且將執行緒數改下來看下效果,看到這估計有人就明白了,這個queue佇列相當於或類似一個全域性的列表,只負責存和取,沒錯,他的作用就是這樣,最起碼我用的只有這麼多
下面還有一個列子,這個列子是我想執行一個永不停止的執行緒,每次佇列被取完後,我都會將資料原樣放回去,具體的步驟我就不說了,上面有寫
import threading
import queue
import time
class Mythread(threading.Thread):
def __init__(self,fun):
threading.Thread.__init__(self)
self.fun = fun
def run(self):
self.fun()
def worker():
global data
while True:
if not q.empty():
a = q.get()
parse(a[0], a[1])
time.sleep(1)
data.append(a)
else:
for i in data:
q.put(i)
data = []
def parse(qd, zd):
mystr = qd + zd
print('=============',mystr)
def main():
threads = []
for i in LstAdd:
q.put(i)
for i in range(thendNum):
thread = Mythread(worker)
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
if __name__ == '__main__':
LstAdd = [
('abcde1','adfa1'),
('abcde2','adfa2'),
('abcde3','adfa3'),
('abcde4','adfa4'),
('abcde5','adfa5'),
('abcde6','adfa6'),
('abcde7','adfa7'),
('abcde8','adfa8'),
('abcde9','adfa9'),
]
data = []
q = queue.Queue()
thendNum = 3
main()
我想將列表中每個元素即元組的兩個元素相加並輸出,執行程式結果如下:
他會一直輸出1-9,後面的沒有截圖下來,太多了,我將每次get()的資料都存起來,這樣防止資料丟失,每次我都會判斷佇列是否為空,為空的話就將存起來的資料原樣存回去,並且情況存這些資料的列表,保證每次都是同樣的資料同樣的順序