Python 進階 執行緒池
阿新 • • 發佈:2022-04-07
Python 進階 執行緒池
1. 概述
執行緒池的基類是 concurrent.futures
模組中的 Executor,Executor 提供了兩個子類,即 ThreadPoolExecutor
和 ProcessPoolExecutor
,其中 ThreadPoolExecutor 用於建立執行緒池,而 ProcessPoolExecutor 用於建立程序池。
2. 用法
Exectuor 提供瞭如下常用方法:
- submit(fn, args, **kwargs):將 fn 函式提交給執行緒池。args 代表傳給 fn 函式的引數,kwargs 代表以關鍵字引數的形式為 fn 函式傳入引數。
- map(func, iterables, timeout=None, chunksize=1):該函式類似於全域性函式 map(func, *iterables),只是該函式將會啟動多個執行緒,以非同步方式立即對 iterables 執行 map 處理。
- shutdown(wait=True):關閉執行緒池。
submit 方法會返回一個 Future 物件,Future 類主要用於獲取執行緒任務函式的返回值
。由於執行緒任務會在新執行緒中以非同步方式執行,因此,執行緒執行的函式相當於一個“將來完成”的任務,所以 Python 使用 Future 來代表。
Future 提供瞭如下方法:
- cancel():取消該 Future 代表的執行緒任務。如果該任務正在執行,不可取消,則該方法返回 False;否則,程式會取消該任務,並返回 True。
- cancelled():返回 Future 代表的執行緒任務是否被成功取消。
- running():如果該 Future 代表的執行緒任務正在執行、不可被取消,該方法返回 True。
- done():如果該 Funture 代表的執行緒任務被成功取消或執行完成,則該方法返回 True。
- result(timeout=None):獲取該 Future 代表的執行緒任務最後返回的結果。如果 Future 代表的執行緒任務還未完成,該方法將會阻塞當前執行緒,其中 timeout 引數指定最多阻塞多少秒。
- exception(timeout=None):獲取該 Future 代表的執行緒任務所引發的異常。如果該任務成功完成,沒有異常,則該方法返回 None。
- add_done_callback(fn):為該 Future 代表的執行緒任務註冊一個“回撥函式”,當該任務成功完成時,程式會自動觸發該 fn 函式。
3. 示例
同步程式碼,以crawler函式模擬爬蟲函式,時間延遲模擬網路IO
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import time
def crawler():
print('crawl page...')
time.sleep(2)
def main():
start = time.time()
for _ in range(4):
crawler()
end = time.time()
print(f'take {(end - start):2.3f} second')
if __name__ == '__main__':
main()
大概8秒鐘
$ python3 demo00.py
crawl page...
crawl page...
crawl page...
crawl page...
take 8.007 second
3.1 示例1:建立執行緒池
看一下使用執行緒池
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import threading
import time
from concurrent.futures import ThreadPoolExecutor
def crawler():
print(f'{threading.current_thread().name} crawl page...')
time.sleep(2)
def main():
start = time.time()
# 建立執行緒池,最大執行緒數為4,執行緒名稱字首為 crawler(可選)
pool = ThreadPoolExecutor(max_workers=4, thread_name_prefix='crawler')
for _ in range(4):
pool.submit(crawler)
pool.shutdown()
end = time.time()
print(f'take {(end - start):2.3f} second')
if __name__ == '__main__':
main()
$ python3 demo01.py
crawler_0 crawl page...
crawler_1 crawl page...
crawler_2 crawl page...
crawler_3 crawl page...
take 2.003 second
3.2 示例2:傳引數
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import threading
import time
from concurrent.futures import ThreadPoolExecutor
def crawler(page_num, index):
print(f'{threading.current_thread().name} crawl page {page_num}, index: {index}')
time.sleep(2)
def main():
start = time.time()
pool = ThreadPoolExecutor(max_workers=4, thread_name_prefix='crawler')
for page_num in range(1, 5):
# 可傳多個引數
pool.submit(crawler, page_num, page_num - 1)
pool.shutdown()
end = time.time()
print(f'take {(end - start):2.3f} second')
if __name__ == '__main__':
main()
$ python3 demo02.py
crawler_0 crawl page 1, index: 0crawler_1 crawl page 2, index: 1
crawler_2 crawl page 3, index: 2
crawler_3 crawl page 4, index: 3
take 2.003 second
3.3 示例3:獲取返回值
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import threading
import time
from concurrent.futures import ThreadPoolExecutor
def crawler(page_num, index):
print(f'{threading.current_thread().name} crawl page {page_num}, index: {index}')
time.sleep(2)
return f'page {page_num} finished'
def main():
start = time.time()
pool = ThreadPoolExecutor(max_workers=4, thread_name_prefix='crawler')
for page_num in range(1, 5):
future = pool.submit(crawler, page_num, page_num - 1)
# 列印返回結果
print(future.result())
pool.shutdown()
end = time.time()
print(f'take {(end - start):2.3f} second')
if __name__ == '__main__':
main()
執行一下發現???變成同步了。
$ python3 demo03.py
crawl page 1, index: 0
page 1 finished
crawl page 2, index: 1
page 2 finished
crawl page 3, index: 2
page 3 finished
crawl page 4, index: 3
page 4 finished
take 8.009 second
原因是future.result() 會阻塞當前的主執行緒,只有等它執行完了,才會繼續執行下一個submit。這麼坑?
可是我就是要獲取它的返回值,怎麼解決?
還記得前面的add_done_callback(fn)
方法嗎,傳入的函式fn相當於給此執行緒註冊一個回撥函式
,當執行緒結束後自動呼叫,不會造成阻塞。
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import threading
import time
from concurrent.futures import ThreadPoolExecutor
def crawler(page_num, index):
print(f'{threading.current_thread().name} crawl page {page_num}, index: {index}')
time.sleep(2)
return f'page {page_num} finished'
def get_result(future):
print(future.result())
def main():
start = time.time()
pool = ThreadPoolExecutor(max_workers=4, thread_name_prefix='crawler')
for page_num in range(1, 5):
future = pool.submit(crawler, page_num, page_num - 1)
# print(future.result())
# 註冊回撥函式,當執行緒執行完畢列印返回結果
future.add_done_callback(get_result)
pool.shutdown()
end = time.time()
print(f'take {(end - start):2.3f} second')
if __name__ == '__main__':
main()
$ python3 demo03.py
crawler_0 crawl page 1, index: 0
crawler_1 crawl page 2, index: 1
crawler_2 crawl page 3, index: 2
crawler_3 crawl page 4, index: 3
page 1 finished
page 4 finishedpage 2 finished
page 3 finished
take 2.003 second
3.4 示例4:快速建立執行緒
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import threading
import time
from concurrent.futures import ThreadPoolExecutor
l1 = [1, 2, 3, 4]
l2 = ['a', 'b', 'c', 'd']
def crawler(page_num, index):
print(f'{threading.current_thread().name} crawl page {page_num}, index: {index}')
time.sleep(2)
return f'page {page_num} finished'
def main():
start = time.time()
pool = ThreadPoolExecutor(max_workers=4, thread_name_prefix='crawler')
# 使用map函式快速建立執行緒,類似於python內建的map函式
futures = pool.map(crawler, l1, l2)
pool.shutdown()
# 返回值是一個生成器型別
print(type(futures))
print(list(futures))
end = time.time()
print(f'take {(end - start):2.3f} second')
if __name__ == '__main__':
main()
$ python3 demo04.py
crawler_0 crawl page 1, index: a
crawler_1 crawl page 2, index: b
crawler_2 crawl page 3, index: c
crawler_3 crawl page 4, index: d
<class 'generator'>
['page 1 finished', 'page 2 finished', 'page 3 finished', 'page 4 finished']
take 2.003 second
注意:建立現成的說法並不準確,因為執行緒在建立執行緒池的時候就產生了,只是在等待執行任務