爬蟲與多執行緒
多執行緒和多程序爬蟲
一.執行緒
1.什麼是執行緒。
執行緒是作業系統能夠進行運算排程的最小單位。它被包含在程序中,是進城中的實際運作單位。一條執行緒指的是程序中一個單一順序的控制流,一個執行緒可以併發多個執行緒,每條執行緒執行不同的任務。
2.執行緒常用的方法
方法 |
說明 |
start() |
執行緒準備就緒,等待CPU排程 |
setName() |
為執行緒設定名稱 |
getName() |
獲取執行緒名稱 |
setDaemon() |
設定為守護執行緒 |
Join() |
逐個執行每個執行緒,執行完畢後繼續往下執行 |
run() |
執行緒被排程後會執行該方法,如果想自定義執行緒類,需要重寫run()方法 |
3.Threading類
3.1 執行緒的普通建立方式
Threadding用於提供執行緒相關的操作,執行緒是應用程式中工作的最小單元。
import threading import time def show(arg): time.sleep(1) print('thread' + str(arg)) for i in range(10): t = threading.Thread(target=show, args=(i,)) t.start() print('主執行緒結束') 結果: 主執行緒結束 thread0 thread1 thread2 thread5 thread4 thread3 thread7 thread6 thread8 thread9
上述程式碼建立了10個“前臺”執行緒,然後控制器就交給了CPU,CPU根據指定演算法進行排程,分片執行指令。
3.2 自定義執行緒類
繼承自threading.Thread類來自定義執行緒類,但是其本質卻是重構thread類中的run()方法。
import threading
class myThread(threading.Thread): def __init__(self, sum): threading.Thread.__init__(self) self.sum = sum
def run(self): print('物件數是:',self.sum) if __name__ == '__main__':
t1 = myThread(1) t2 = myThread(2) t1.start() t2.start()
3.3 計運算元執行緒執行的時間
PS:sleep的時候是不會佔用CPU的,作業系統會把執行緒掛起。
import threading import time def show(n): time.sleep(1) print('thread' + str(n)) start_time = time.time() obj_list = []
for i in range(5): t = threading.Thread(target=show,args=(i,)) t.start() obj_list.append(t) for obj in obj_list: obj.join() print('花費的時間為:',time.time() - start_time)
3.4 守護執行緒
執行緒的setDaemon(True)將執行緒變成主執行緒的守護執行緒,意思是當主程序結束後,子執行緒也會隨之退出。意味著當主執行緒結束後,程式就結束了。
1 import threading 2 import time 3 4 def show(n): 5 time.sleep(1) 6 print('thread' + str(n)) 7 8 start_time = time.time() 9 obj_list = [] 10 11 for i in range(5): 12 t = threading.Thread(target=show,args=(i,)) 13 t.setDaemon(True) 14 t.start() 15 obj_list.append(t) 16 17 print('花費的時間為:',time.time() - start_time)
3.5 GIL(全域性直譯器鎖)
在Python的執行環境中,無論電腦是單核還是雙核,作業系統同時只會執行一個執行緒。究其原因,是因為GIL(全域性直譯器鎖)。
在Python中,一個執行緒要想要執行,必須要先拿到GIL。可以吧GIL想象成一個“通行證”,並且在一個程序中,GIL只有一個。沒有通行證的執行緒就不會被執行。
Python多執行緒的工作過程:
- 拿到公共資料
- 申請GIL
- Python直譯器呼叫os的原生執行緒
- os操作CPU執行運算
- 當該執行緒的執行時間到了之後,無論是否執行完,GIL被釋放
- 其他執行緒重複上面的操作
- 其他程序執行完成後,切換到原來的執行緒(從記錄的上下文繼續執行)
3.6 執行緒鎖(Lock,RLock)
由於執行緒之間是進行隨機排程,並且每個執行緒可能只執行n條執行之後,當多個執行緒同時修改同一條資料時可能會出現髒資料,所以,出現了執行緒鎖 - 同一時刻允許一個執行緒執行操作。
import threading,time def run(n): global num num += 1 num = 0 obj_list = [] for i in range(20000): t = threading.Thread(target=run,args=(i,)) t.start() obj_list.append(t) for obj in obj_list: obj.join() print('num:',num) 髒資料:19999
3.6.1 互斥鎖(Lock)
為了防止上面情況的發生,我們可以使用互斥鎖(Lock)來解決。
import threading,time lock = threading.Lock() # 例項化一個鎖物件 def run(n): lock.acquire() # 獲取鎖 global num num += 1 lock.release() # 釋放鎖 num = 0 obj_list = [] for i in range(20000): t = threading.Thread(target=run,args=(i,)) t.start() obj_list.append(t) for obj in obj_list: obj.join() print('num:',num)
3.6.2 遞迴鎖(RLock)
RLock的用法和Lock一樣,只是他支援巢狀。在多個鎖沒有釋放的時候一般會使用Rlock類。
import threading,time lock = threading.RLock() # 例項化一個鎖物件 num = 0 obj_list = [] def run(n): lock.acquire() # 獲取鎖 global num num += 1 lock.release() # 釋放鎖 for i in range(20000): t = threading.Thread(target=run,args=(i,)) t.start() obj_list.append(t) for obj in obj_list: obj.join() print('num:',num)
3.7 訊號量(Semaphore)
互斥鎖 同時只允許一個執行緒更改資料,而Semaphore是同時允許一定數量的執行緒更改資料 ,比如廁所有3個坑,那最多隻允許3個人上廁所,後面的人只能等裡面有人出來了才能再進去。
import threading,time lock = threading.BoundedSemaphore(6) # 例項化一個鎖物件 def run(n): lock.acquire() # 獲取鎖 time.sleep(1) print('run the thread: %s' % n) lock.release() # 釋放鎖 num = 0 for i in range(200): t = threading.Thread(target=run,args=(i,)) t.start()
3.8事件(Event)
python執行緒的事件用於主執行緒控制其他執行緒的執行,事件是一個簡單的執行緒同步物件,主要提供了以下幾種方法:
方法 |
說明 |
clear() |
將flag設定為“false” |
set() |
將flag設定為“true” |
is_set() |
判斷是否設定了flag |
wait() |
一直監聽flag,沒有檢測到會一直處於阻塞狀態 |
事件處理的機制:全域性定義了一個“Flag”,如果“Flag”值為 False,那麼當程式執行 event.wait 方法時就會阻塞,如果“Flag”值為True,那麼event.wait 方法時便不再阻塞。
import threading,time event = threading.Event() # 建立事件物件 def lighter(): count = 0 event.set() #初始值為綠燈 while 1: if 5 < count <= 10: event.clear() #紅燈,清楚標誌位 print('\33[41;1mred light is on...\033[0m') elif count > 10: event.set() # 綠燈,設定標誌位 count = 0 else: print('\33[41;1mred light is on...\033[0m') time.sleep(1) count += 1 def car(name): while True: if event.is_set(): # 判斷是否設定了標誌位 print("[%s] 綠燈亮,請行駛..." % name) time.sleep(1) else: print("[%s] 紅燈亮,請等待..." % name) event.wait() print("[%s] 綠燈亮,開始行駛..." % name) light = threading.Thread(target=lighter,)
car = threading.Thread(target=car, args=('test',))
light.start()
car.start()
3.9條件(Condition)
使得執行緒等待,只有滿足某條件時,才釋放n個執行緒。
互斥鎖是最簡單的執行緒同步機制,Python提供的Condition物件提供了對複雜執行緒同步問題的支援。Condition被稱為條件變數,除了提供與Lock類似的acquire和release方法外,還提供了wait和notify方法。執行緒首先acquire一個條件變數,然後判斷一些條件。如果條件不滿足則wait;如果條件滿足,進行一些處理改變條件後,通過notify方法通知其他執行緒,其他處於wait狀態的執行緒接到通知後會重新判斷條件。不斷的重複這一過程,從而解決複雜的同步問題。
可以認為Condition物件維護了一個鎖(Lock/RLock)和一個waiting池。執行緒通過acquire獲得Condition物件,當呼叫wait方法時,執行緒會釋放Condition內部的鎖並進入blocked狀態,同時在waiting池中記錄這個執行緒。當呼叫notify方法時,Condition物件會從waiting池中挑選一個執行緒,通知其呼叫acquire方法嘗試取到鎖。
Condition物件的建構函式可以接受一個Lock/RLock物件作為引數,如果沒有指定,則Condition物件會在內部自行建立一個RLock。
除了notify方法外,Condition物件還提供了notifyAll方法,可以通知waiting池中的所有執行緒嘗試acquire內部鎖。由於上述機制,處於waiting狀態的執行緒只能通過notify方法喚醒,所以notifyAll的作用在於防止有執行緒永遠處於沉默狀態。
演示條件變數同步的經典問題是生產者與消費者問題:假設有一群生產者(Producer)和一群消費者(Consumer)通過一個市場來互動產品。生產者的”策略“是如果市場上剩餘的產品少於1000個,那麼就生產100個產品放到市場上;而消費者的”策略“是如果市場上剩餘產品的數量多餘100個,那麼就消費3個產品。用Condition解決生產者與消費者問題的程式碼如下:
import threading import time class Producer(threading.Thread): def run(self): global count while True: if con.acquire(): if count > 1000: con.wait() else: count = count+100 msg = self.name+' produce 100, count=' + str(count) print msg con.notify() con.release() time.sleep(1) class Consumer(threading.Thread): def run(self): global count while True: if con.acquire(): if count < 100: con.wait() else: count = count-3 msg = self.name+' consume 3, count='+str(count) print msg con.notify() con.release() time.sleep(1) count = 500 con = threading.Condition() def test(): for i in range(2): p = Producer() p.start() for i in range(5): c = Consumer() c.start() if __name__ == '__main__': test()
4.多執行緒的爬蟲例項
'''使用多執行緒爬取1500個url中的圖片'''
import threading,requests,time link_list = [] with open('url.txt','r') as f: for u in f.readlines(): url = u.split()[0].replace('\n','') link_list.append(url) start_time = time.time() class myThread(threading.Thread): def __init__(self,name,link_range): threading.Thread.__init__(self) self.name = name self.link_range = link_range self.userName = 0 def run(self): print('開始:',self.name) self.craw(self.name,self.link_range) print('結束:',self.name) def writeImages(self, ThreadName, url): print("正在儲存檔案 %s ..." % ThreadName+str(self.userName)) path = r'D:\zhappian' + '\\' + ThreadName+str(self.userName) + '.png' file = open(path, 'wb') images = requests.get(url,timeout = 20).content file.write(images) file.close() self.userName += 1 def craw(self,name,link_range): for i in range(link_range[0],link_range[len(link_range)-1]): self.writeImages(name,link_list[i]) obj_list = [] url_list = [(0,300),(301,600),(601,900),(901,1200),(1201,1500)] # 建立新執行緒 for i in range(5): t = myThread('Thread-'+str(i+1), url_list[i]) t.start() obj_list.append(t) # 等待所有執行緒執行完成 for url in url_list: url.join() end_time = time.time() print('爬蟲的執行時間為:',end_time - start_time)
5.佇列(queue)
queue是python中的標準庫,俗稱佇列。在python中,多個執行緒之間的資料是共享的,多個執行緒進行資料交換的時候,不能夠保證資料的安全性和一致性,所以當多個執行緒需要進行資料交換的時候,佇列就出現了,佇列可以完美解決執行緒間的資料交換,保證執行緒間資料的安全性和一致性。
PS: 在python2.x中,模組名為Queue
queue模組有三種佇列及建構函式
l Python queue模組的FIFO佇列先進先出。 queue.Queue(maxsize)
l LIFO類似於堆,即先進後出。 queue.LifoQueue(maxsize)
l 還有一種是優先順序佇列級別越低越先出來。 queue.PriorityQueue(maxsize)
queue模組中的常用方法:
方法 |
說明 |
queue.qsize() |
返回佇列的大小 |
queue.empty() |
如果佇列為空,返回True,反之False |
queue.full() |
如果佇列滿了,返回True,反之False (queue.full 與 maxsize 大小對應) |
queue.get([block[, timeout]]) |
獲取佇列,立即取出一個元素, timeout超時時間 |
queue.put(item[, timeout]]) |
寫入佇列,立即放入一個元素, timeout超時時間 |
queue.join() |
阻塞呼叫執行緒,直到佇列中的所有任務被處理掉, 實際上意味著等到佇列為空,再執行別的操作 |
queue.task_done() |
在完成一項工作之後,queue.task_done()函式向任務已經完成的佇列傳送一個訊號
|
5.1程式碼例項
l 建立佇列
import queue q = queue.Queue()
l empty方法(如果佇列為空,返回True)
import queue q = queue.Queue() print(q.empty()) #輸出:True
l full方法(如果佇列滿了,返回True)
import queue q = queue.Queue(1) #指定佇列大小 q.put('a') print(q.full()) #輸出:True
l put方法和get方法
import queue q = queue.Queue() q.put('a') q.put('b') print(q.get()) #輸出:a print(q.get()) #輸出:b
l qsize方法(返回佇列裡元素個數)
import queue q = queue.Queue() q.put('a') q.put('b') print(q.qsize()) #輸出:2
5.2 生產者消費者
import threading,time import queue q = queue.Queue(maxsize=10) # 生產者 def Producer(name): count = 1 while True: q.put("冠軍%s" % count) print("冠軍",count) count +=1 time.sleep(0.1) #消費者 def Consumer(name): #while q.qsize()>0: while True: print("[%s] 取到[%s]..." %(name, q.get())) time.sleep(1) p = threading.Thread(target=Producer,args=("IG",)) c = threading.Thread(target=Consumer,args=("LPL",)) c1 = threading.Thread(target=Consumer,args=("LCK",)) p.start() c.start() c1.start()
6.多執行緒和爬蟲例項。
import threading,requests,time import queue start_time = time.time() obj_list = [] work_queue = queue.Queue(1500) link_list = [] with open('url.txt','r') as f: for u in f.readlines(): url = u.split()[0].replace('\n','') link_list.append(url) # 填充佇列 for url in link_list: work_queue.put(url) class myThread(threading.Thread): def __init__(self,name,q): threading.Thread.__init__(self) self.name = name self.userName = 0 self.q = q def run(self): print('開始:',self.name) while True: try: self.craw(self.name,self.q) except Exception as e: break print('結束:',self.name) def writeImages(self, ThreadName, url): print("正在儲存檔案 %s ..." % ThreadName+str(self.userName)) path = r'D:\zhappian' + '\\' + ThreadName+str(self.userName) + '.png' file = open(path, 'wb') images = requests.get(url,timeout = 20).content file.write(images) file.close() self.userName += 1 def craw(self,name,q): url = q.get(timeout = 2) try: self.writeImages(name, url) except Exception as e: print(q.qsize(),url,e) # 建立新執行緒 for i in range(5): t = myThread('Thread-'+str(i+1), work_queue) t.start() obj_list.append(t) # 等待所有執行緒執行完成 for url in obj_list: url.join() end_time = time.time() print('爬蟲的執行時間為:',end_time - start_time)