Python使用多執行緒(附:爬蟲使用的執行緒池)
阿新 • • 發佈:2018-12-29
python開啟多執行緒。
使用的庫:
python 3.+ :threading(較高階,常用), _thread(python2.+中叫 thread)(偏底層)
python 2.+ :thread
實現多執行緒:(python3.6,使用 threading 庫)
1:函式實現
#第一種:通過函式建立執行緒 def 函式a(): pass # 獲得一個執行緒物件。 t = threading.Thread(target=函式a的名字,name=自己隨便取的執行緒名字,args=(引數1,...)) # 啟動執行緒 t.start();
2:繼承執行緒類(類實現)
class Fetcher(threading.Thread): def __init__(self): Thread.__init__(self): #加這一步的目的是保證,主執行緒退出後子執行緒也會跟著中斷退出 self.daemon = True # 必須要寫 def run(self): #執行緒執行的函式 pass # 獲得物件,並啟動執行緒 t = Fetcher() t.start()
鎖:
執行緒同時操作一個全域性變數時會產生執行緒競爭所以,需要鎖來避免競爭。
實現鎖:
lock = threading.Lock()
lock.acquire() #獲得鎖,加鎖
#..操作全域性變數..(寫檔案,寫入資料庫等)
lock.release() #釋放鎖
佇列:(執行緒池中使用)
多執行緒同步就是多個執行緒競爭一個全域性變數時按順序讀寫,一般情況下要用鎖,但是使用標準庫裡的Queue的時候它內部已經實現了鎖,不用程式設計師自己寫了。
使用佇列(只介紹線上程池中的常用方法):
# 匯入佇列類:
from queue import Queue
# 建立一個佇列:
q = Queue(maxsize=0) # maxsize為佇列大小,為0預設佇列大小可無窮大。
# 佇列是先進先出的資料結構:
q.put(item) #往佇列新增一個item,佇列滿了則阻塞
q.get(item) #從佇列得到一個item,佇列為空則阻塞
還有相應的不等待的版本,這裡略過。
佇列不為空,或者為空但是取得item的執行緒沒有告知任務完成時都是處於阻塞狀態
q.join() #阻塞直到所有任務完成
# 執行緒告知任務當前任務已結束,可以開始下次任務使用 task_done() 函式
q.task_done() #線上程內呼叫
==============================實現簡單執行緒池(爬蟲使用)==================================
1:建立一個執行緒類:
class TestThread(threading.Thread):
def __init__(self,task,lock):
threading.Thread.__init__(self);
self.task = task ; # 一個佇列,用來存放要爬取的url
self.lock = lock ; # 一個鎖物件,用來保護存入資料庫等的操作
self.daemon = True; # 當主執行緒退出時,子執行緒也退出。
self.start(); # 運行當前執行緒。
# 執行緒中的主執行方法
def run(self):
while True:
# 從佇列中獲取url,若沒有則一直阻塞,直到獲取到一個url。
url = self.task.get();
"""
這裡進行爬資料的操作
"""
# 向任務稟報當前任務執行結束,可以開始下一任務
self.task.task_done();
2:建立一個執行緒池類
class ThreadPool():
def __init__(self,thread_num,lock):
self.task = queue.Queue(); # 生成一個佇列,用來儲存要爬取的連線
self.lock = lock; # 一個鎖物件,用來保護寫入檔案
# 根據傳入的執行緒數目,建立對應個執行緒
for i in range(thread_num):
TestThread(self.task,lock);
def add_task(self,url):
# 往佇列中放url,直到佇列滿則阻塞。
self.task.put(url)
def wait_complite(self):
# 阻塞佇列中的所有執行緒,直到主執行緒執行完畢
self.task.join();
3:使用
if __name__ == "__main__":
print("開始於:",start)
lock = threading.Lock();
pool = ThreadPool(2,lock);
for i in range(21):
pool.add_task(i)
pool.wait_complite();
print("主執行緒結束");
print("共用時:",time.time()-start)