1. 程式人生 > 其它 >Python 進階 執行緒池

Python 進階 執行緒池

Python 進階 執行緒池

1. 概述

執行緒池的基類是 concurrent.futures 模組中的 Executor,Executor 提供了兩個子類,即 ThreadPoolExecutorProcessPoolExecutor,其中 ThreadPoolExecutor 用於建立執行緒池,而 ProcessPoolExecutor 用於建立程序池。

2. 用法

Exectuor 提供瞭如下常用方法:

  • submit(fn, args, **kwargs):將 fn 函式提交給執行緒池。args 代表傳給 fn 函式的引數,kwargs 代表以關鍵字引數的形式為 fn 函式傳入引數。
  • map(func, iterables, timeout=None, chunksize=1):該函式類似於全域性函式 map(func, *iterables),只是該函式將會啟動多個執行緒,以非同步方式立即對 iterables 執行 map 處理。
  • shutdown(wait=True):關閉執行緒池。

submit 方法會返回一個 Future 物件,Future 類主要用於獲取執行緒任務函式的返回值。由於執行緒任務會在新執行緒中以非同步方式執行,因此,執行緒執行的函式相當於一個“將來完成”的任務,所以 Python 使用 Future 來代表。

Future 提供瞭如下方法:

  • cancel():取消該 Future 代表的執行緒任務。如果該任務正在執行,不可取消,則該方法返回 False;否則,程式會取消該任務,並返回 True。
  • cancelled():返回 Future 代表的執行緒任務是否被成功取消。
  • running():如果該 Future 代表的執行緒任務正在執行、不可被取消,該方法返回 True。
  • done():如果該 Funture 代表的執行緒任務被成功取消或執行完成,則該方法返回 True。
  • result(timeout=None):獲取該 Future 代表的執行緒任務最後返回的結果。如果 Future 代表的執行緒任務還未完成,該方法將會阻塞當前執行緒,其中 timeout 引數指定最多阻塞多少秒。
  • exception(timeout=None):獲取該 Future 代表的執行緒任務所引發的異常。如果該任務成功完成,沒有異常,則該方法返回 None。
  • add_done_callback(fn):為該 Future 代表的執行緒任務註冊一個“回撥函式”,當該任務成功完成時,程式會自動觸發該 fn 函式。

3. 示例

同步程式碼,以crawler函式模擬爬蟲函式,時間延遲模擬網路IO

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import time


def crawler():
    print('crawl page...')
    time.sleep(2)


def main():
    start = time.time()
    for _ in range(4):
        crawler()
    end = time.time()
    print(f'take {(end - start):2.3f} second')


if __name__ == '__main__':
    main()

大概8秒鐘

$ python3 demo00.py
crawl page...
crawl page...
crawl page...
crawl page...
take 8.007 second

3.1 示例1:建立執行緒池

看一下使用執行緒池

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import threading
import time
from concurrent.futures import ThreadPoolExecutor


def crawler():
    print(f'{threading.current_thread().name} crawl page...')
    time.sleep(2)


def main():
    start = time.time()
    # 建立執行緒池,最大執行緒數為4,執行緒名稱字首為 crawler(可選)
    pool = ThreadPoolExecutor(max_workers=4, thread_name_prefix='crawler')
    for _ in range(4):
        pool.submit(crawler)
    pool.shutdown()
    end = time.time()
    print(f'take {(end - start):2.3f} second')


if __name__ == '__main__':
    main()

$ python3 demo01.py
crawler_0 crawl page...
crawler_1 crawl page...
crawler_2 crawl page...
crawler_3 crawl page...
take 2.003 second

3.2 示例2:傳引數

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import threading
import time
from concurrent.futures import ThreadPoolExecutor


def crawler(page_num, index):
    print(f'{threading.current_thread().name} crawl page {page_num}, index: {index}')
    time.sleep(2)


def main():
    start = time.time()
    pool = ThreadPoolExecutor(max_workers=4, thread_name_prefix='crawler')
    for page_num in range(1, 5):
        # 可傳多個引數
        pool.submit(crawler, page_num, page_num - 1)
    pool.shutdown()
    end = time.time()
    print(f'take {(end - start):2.3f} second')


if __name__ == '__main__':
    main()

$ python3 demo02.py
crawler_0 crawl page 1, index: 0crawler_1 crawl page 2, index: 1

crawler_2 crawl page 3, index: 2
crawler_3 crawl page 4, index: 3
take 2.003 second

3.3 示例3:獲取返回值

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import threading
import time
from concurrent.futures import ThreadPoolExecutor


def crawler(page_num, index):
    print(f'{threading.current_thread().name} crawl page {page_num}, index: {index}')
    time.sleep(2)
    return f'page {page_num} finished'


def main():
    start = time.time()
    pool = ThreadPoolExecutor(max_workers=4, thread_name_prefix='crawler')
    for page_num in range(1, 5):
        future = pool.submit(crawler, page_num, page_num - 1)
        # 列印返回結果
        print(future.result())
    pool.shutdown()
    end = time.time()
    print(f'take {(end - start):2.3f} second')


if __name__ == '__main__':
    main()

執行一下發現???變成同步了。

$ python3 demo03.py
crawl page 1, index: 0
page 1 finished
crawl page 2, index: 1
page 2 finished
crawl page 3, index: 2
page 3 finished
crawl page 4, index: 3
page 4 finished
take 8.009 second

原因是future.result() 會阻塞當前的主執行緒,只有等它執行完了,才會繼續執行下一個submit。這麼坑?

可是我就是要獲取它的返回值,怎麼解決?

還記得前面的add_done_callback(fn)方法嗎,傳入的函式fn相當於給此執行緒註冊一個回撥函式,當執行緒結束後自動呼叫,不會造成阻塞。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import threading
import time
from concurrent.futures import ThreadPoolExecutor


def crawler(page_num, index):
    print(f'{threading.current_thread().name} crawl page {page_num}, index: {index}')
    time.sleep(2)
    return f'page {page_num} finished'


def get_result(future):
    print(future.result())


def main():
    start = time.time()
    pool = ThreadPoolExecutor(max_workers=4, thread_name_prefix='crawler')
    for page_num in range(1, 5):
        future = pool.submit(crawler, page_num, page_num - 1)
        # print(future.result())
        # 註冊回撥函式,當執行緒執行完畢列印返回結果
        future.add_done_callback(get_result)
    pool.shutdown()
    end = time.time()
    print(f'take {(end - start):2.3f} second')


if __name__ == '__main__':
    main()

$ python3 demo03.py
crawler_0 crawl page 1, index: 0
crawler_1 crawl page 2, index: 1
crawler_2 crawl page 3, index: 2
crawler_3 crawl page 4, index: 3
page 1 finished
page 4 finishedpage 2 finished
page 3 finished

take 2.003 second

3.4 示例4:快速建立執行緒

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import threading
import time
from concurrent.futures import ThreadPoolExecutor

l1 = [1, 2, 3, 4]
l2 = ['a', 'b', 'c', 'd']


def crawler(page_num, index):
    print(f'{threading.current_thread().name} crawl page {page_num}, index: {index}')
    time.sleep(2)
    return f'page {page_num} finished'


def main():
    start = time.time()
    pool = ThreadPoolExecutor(max_workers=4, thread_name_prefix='crawler')
    # 使用map函式快速建立執行緒,類似於python內建的map函式
    futures = pool.map(crawler, l1, l2)
    pool.shutdown()
    # 返回值是一個生成器型別
    print(type(futures))
    print(list(futures))
    end = time.time()
    print(f'take {(end - start):2.3f} second')


if __name__ == '__main__':
    main()

$ python3 demo04.py
crawler_0 crawl page 1, index: a
crawler_1 crawl page 2, index: b
crawler_2 crawl page 3, index: c
crawler_3 crawl page 4, index: d
<class 'generator'>
['page 1 finished', 'page 2 finished', 'page 3 finished', 'page 4 finished']
take 2.003 second

注意:建立現成的說法並不準確,因為執行緒在建立執行緒池的時候就產生了,只是在等待執行任務