python執行緒池 ThreadPoolExecutor 的用法示例
前言
從Python3.2開始,標準庫為我們提供了 concurrent.futures 模組,它提供了 ThreadPoolExecutor (執行緒池)和ProcessPoolExecutor (程序池)兩個類。
相比 threading 等模組,該模組通過 submit 返回的是一個 future 物件,它是一個未來可期的物件,通過它可以獲悉執行緒的狀態主執行緒(或程序)中可以獲取某一個執行緒(程序)執行的狀態或者某一個任務執行的狀態及返回值:
主執行緒可以獲取某一個執行緒(或者任務的)的狀態,以及返回值。
當一個執行緒完成的時候,主執行緒能夠立即知道。
讓多執行緒和多程序的編碼介面一致。
執行緒池的基本使用
# coding: utf-8 from concurrent.futures import ThreadPoolExecutor import time def spider(page): time.sleep(page) print(f"crawl task{page} finished") return page with ThreadPoolExecutor(max_workers=5) as t: # 建立一個最大容納數量為5的執行緒池 task1 = t.submit(spider,1) task2 = t.submit(spider,2) # 通過submit提交執行的函式到執行緒池中 task3 = t.submit(spider,3) print(f"task1: {task1.done()}") # 通過done來判斷執行緒是否完成 print(f"task2: {task2.done()}") print(f"task3: {task3.done()}") time.sleep(2.5) print(f"task1: {task1.done()}") print(f"task2: {task2.done()}") print(f"task3: {task3.done()}") print(task1.result()) # 通過result來獲取返回值
執行結果如下:
task1: False
task2: False
task3: False
crawl task1 finished
crawl task2 finished
task1: True
task2: True
task3: False
1
crawl task3 finished
1.使用 with 語句 ,通過 ThreadPoolExecutor 構造例項,同時傳入 max_workers 引數來設定執行緒池中最多能同時執行的執行緒數目。
2.使用 submit 函式來提交執行緒需要執行的任務到執行緒池中,並返回該任務的控制代碼(類似於檔案、畫圖),注意 submit() 不是阻塞的,而是立即返回。
3.通過使用 done() 方法判斷該任務是否結束。上面的例子可以看出,提交任務後立即判斷任務狀態,顯示四個任務都未完成。在延時2.5後,task1 和 task2 執行完畢,task3 仍在執行中。
4.使用 result() 方法可以獲取任務的返回值。
主要方法
- wait
wait(fs,timeout=None,return_when=ALL_COMPLETED)
wait 接受三個引數:
fs: 表示需要執行的序列
timeout: 等待的最大時間,如果超過這個時間即使執行緒未執行完成也將返回
return_when:表示wait返回結果的條件,預設為 ALL_COMPLETED 全部執行完成再返回
還是用上面那個例子來熟悉用法
示例:
from concurrent.futures import ThreadPoolExecutor,wait,FIRST_COMPLETED,ALL_COMPLETED import time def spider(page): time.sleep(page) print(f"crawl task{page} finished") return page with ThreadPoolExecutor(max_workers=5) as t: all_task = [t.submit(spider,page) for page in range(1,5)] wait(all_task,return_when=FIRST_COMPLETED) print('finished') print(wait(all_task,timeout=2.5)) # 執行結果 crawl task1 finished finished crawl task2 finished crawl task3 finished DoneAndNotDoneFutures(done={<Future at 0x28c8710 state=finished returned int>,<Future at 0x2c2bfd0 state=finished returned int>,<Future at 0x2c1b7f0 state=finished returned int>},not_done={<Future at 0x2c3a240 state=running>}) crawl task4 finished
1.程式碼中返回的條件是:當完成第一個任務的時候,就停止等待,繼續主執行緒任務
2.由於設定了延時, 可以看到最後只有 task4 還在執行中
- as_completed
上面雖然提供了判斷任務是否結束的方法,但是不能在主執行緒中一直判斷啊。最好的方法是當某個任務結束了,就給主執行緒返回結果,而不是一直判斷每個任務是否結束。
ThreadPoolExecutorThreadPoolExecutor 中 的 as_completed() 就是這樣一個方法,當子執行緒中的任務執行完後,直接用 result() 獲取返回結果
用法如下:
# coding: utf-8 from concurrent.futures import ThreadPoolExecutor,as_completed import time def spider(page): time.sleep(page) print(f"crawl task{page} finished") return page def main(): with ThreadPoolExecutor(max_workers=5) as t: obj_list = [] for page in range(1,5): obj = t.submit(spider,page) obj_list.append(obj) for future in as_completed(obj_list): data = future.result() print(f"main: {data}") # 執行結果 crawl task1 finished main: 1 crawl task2 finished main: 2 crawl task3 finished main: 3 crawl task4 finished main: 4
as_completed() 方法是一個生成器,在沒有任務完成的時候,會一直阻塞,除非設定了 timeout。
當有某個任務完成的時候,會 yield 這個任務,就能執行 for 迴圈下面的語句,然後繼續阻塞住,迴圈到所有的任務結束。同時,先完成的任務會先返回給主執行緒。
- map
map(fn,*iterables,timeout=None)
fn: 第一個引數 fn 是需要執行緒執行的函式;
iterables:第二個引數接受一個可迭代物件;
timeout: 第三個引數 timeout 跟 wait() 的 timeout 一樣,但由於 map 是返回執行緒執行的結果,如果 timeout小於執行緒執行時間會拋異常 TimeoutError。
用法如下:
import time from concurrent.futures import ThreadPoolExecutor def spider(page): time.sleep(page) return page start = time.time() executor = ThreadPoolExecutor(max_workers=4) i = 1 for result in executor.map(spider,[2,3,1,4]): print("task{}:{}".format(i,result)) i += 1 # 執行結果 task1:2 task2:3 task3:1 task4:4
使用 map 方法,無需提前使用 submit 方法,map 方法與 python 高階函式 map 的含義相同,都是將序列中的每個元素都執行同一個函式。
上面的程式碼對列表中的每個元素都執行 spider() 函式,並分配各執行緒池。
可以看到執行結果與上面的 as_completed() 方法的結果不同,輸出順序和列表的順序相同,就算 1s 的任務先執行完成,也會先列印前面提交的任務返回的結果。
多執行緒實戰
以某網站為例,演示執行緒池和單執行緒兩種方式爬取的差異
# coding: utf-8 import requests from concurrent.futures import ThreadPoolExecutor,as_completed import time import json from requests import adapters from proxy import get_proxies headers = { "Host": "splcgk.court.gov.cn","Origin": "https://splcgk.court.gov.cn","User-Agent": "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML,like Gecko) Chrome/71.0.3578.98 Safari/537.36","Referer": "https://splcgk.court.gov.cn/gzfwww/ktgg",} url = "https://splcgk.court.gov.cn/gzfwww/ktgglist?pageNo=1" def spider(page): data = { "bt": "","fydw": "","pageNum": page,} for _ in range(5): try: response = requests.post(url,headers=headers,data=data,proxies=get_proxies()) json_data = response.json() except (json.JSONDecodeError,adapters.SSLError): continue else: break else: return {} return json_data def main(): with ThreadPoolExecutor(max_workers=10) as t: obj_list = [] begin = time.time() for page in range(1,15): obj = t.submit(spider,page) obj_list.append(obj) for future in as_completed(obj_list): data = future.result() print(data) print('*' * 50) times = time.time() - begin print(times) if __name__ == "__main__": main()
執行結果:
單執行緒實戰
下面我們可以使用單執行緒來爬取,程式碼基本和上面的一樣,加個單執行緒函式
程式碼如下:
# coding: utf-8 import requests from concurrent.futures import ThreadPoolExecutor,adapters.SSLError): continue else: break else: return {} return json_data def single(): begin = time.time() for page in range(1,15): data = spider(page) print(data) print('*' * 50) times = time.time() - begin print(times) if __name__ == "__main__": single()
執行結果:
可以看到,總共花了 19 秒。真是肉眼可見的差距啊!如果資料量大的話,執行時間差距會更大!
以上就是python執行緒池 ThreadPoolExecutor 的用法示例的詳細內容,更多關於python執行緒池 ThreadPoolExecutor 的用法及實戰的資料請關注我們其它相關文章!