1. 程式人生 > >python--線程池(concurrent.futures)

python--線程池(concurrent.futures)

time pass ever 表示 iterator may 檢測 列表 多參數

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*- 
 3 # author:love_cat
 4 
 5 # 為什麽需要線程池
 6 # 1.主線程中可以獲取某一個線程的狀態或者某一個任務的狀態,以及返回值
 7 # 2.當一個線程完成時,主線程能夠立即知道
 8 # 3.futures可以讓多線程和多進程編碼接口一致
 9 
10 # 導入相應的模塊
11 from concurrent.futures import ThreadPoolExecutor
12 import time
13 
14 
15 def get_sleep(name, t):
16 time.sleep(t) 17 print(f"{name}睡了{t}秒") 18 19 20 # 創建一個一定容量的線程池 21 # max_workers=3表示池子裏最多可以容納三個線程 22 executor = ThreadPoolExecutor(max_workers=3) 23 24 # 往池子裏添加任務 25 # 第一個是函數名,註意不要加括號,否則變成調用了 26 # 然後依次寫參數 27 executor.submit(get_sleep, "satori", 4) 28 executor.submit(get_sleep, "mashiro"
, 3) 29 executor.submit(get_sleep, "miku", 2) 30 31 # 註意:submit不會阻塞,submit相當於開啟了一個線程 32 # 然後主線程會立即往下執行 33 print("i love satori") # 因此這句話會最先被打印出來 34 35 36 # 程序運行結果 37 ‘‘‘ 38 i love satori 39 miku睡了2秒 40 mashiro睡了3秒 41 satori睡了4秒 42 ‘‘‘
 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*- 
 3 # author:love_cat
4 5 from concurrent.futures import ThreadPoolExecutor 6 import time 7 8 9 def get_sleep(name, t): 10 time.sleep(t) 11 print(f"{name}睡了{t}秒") 12 return f"{name}--{t}秒" 13 14 15 executor = ThreadPoolExecutor(max_workers=3) 16 17 # 我們submit函數是具有返回值的,比方說我們賦值為task 18 # 那麽task1,task2,task3可以獲取對應線程的執行狀態 19 task1 = executor.submit(get_sleep, "satori", 4) 20 task2 = executor.submit(get_sleep, "mashiro", 3) 21 task3 = executor.submit(get_sleep, "miku", 2) 22 23 24 # task.done()表示任務是否完成 25 print(task1.done()) 26 print(task2.done()) 27 print(task3.done()) 28 29 # 我們等五秒,因此上面的任務肯定已經全部執行完畢 30 # 再打印狀態 31 print("-------等待五秒鐘-------") 32 time.sleep(5) 33 print(task1.done()) 34 print(task2.done()) 35 print(task3.done()) 36 37 # 當然我們也可以獲取任務的返回值 38 print(task1.result()) 39 print(task2.result()) 40 print(task3.result()) 41 42 # 程序運行結果 43 ‘‘‘ 44 False 45 False 46 False 47 -------等待五秒鐘------- 48 miku睡了2秒 49 mashiro睡了3秒 50 satori睡了4秒 51 True 52 True 53 True 54 satori--4秒 55 mashiro--3秒 56 miku--2秒 57 ‘‘‘ 58 # 首先主線程在添加完任務之後,會立刻執行task.done(),此時三個任務還沒有執行完畢,因此打印三個False 59 # 打印等待五秒鐘 60 # 主線程等待五秒鐘之後,三個任務已經執行完畢,並且會打印各自的內容。 61 # 執行task.done(),由於此時三個任務執行完畢,因此打印三個True 62 # 然後通過task.result()會得到任務的返回值
 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*- 
 3 # author:love_cat
 4 
 5 # 關於task.result()方法,這個方法是阻塞的
 6 # 只有在獲取到函數的返回值之後才會執行,那麽此時任務也已經執行完畢
 7 
 8 from concurrent.futures import ThreadPoolExecutor
 9 import time
10 
11 
12 def get_sleep(t):
13     print(f"{t}")
14     time.sleep(t)
15     return f"我睡了{t}秒"
16 
17 
18 executor = ThreadPoolExecutor(max_workers=3)
19 
20 task1 = executor.submit(get_sleep, 3)
21 task2 = executor.submit(get_sleep, 4)
22 task3 = executor.submit(get_sleep, 1)
23 
24 task2.result()
25 print("主線程執行完畢··········")
26 
27 ‘‘‘
28 3
29 4
30 1
31 主線程執行完畢··········
32 ‘‘‘
33 # 可以看到先打印3,4,1,然後等待大概四秒鐘,打印"主線程執行完畢··········"
34 # task.result()方法是會阻塞的,其實也很好理解,task.result()是為了獲取任務的返回值,如果任務都還沒有執行完,那麽當然會卡住

#!/usr/bin/env python
# -*- coding:utf-8 -*- 
# author:love_cat

# 一般我們要確保所有的任務都執行完畢,才選擇讓主線程往下走
from concurrent.futures import ThreadPoolExecutor
import time


def get_sleep(t):
    print(f"{t}")
    time.sleep(t)
    return f"我睡了{t}秒"


executor = ThreadPoolExecutor(max_workers=3)

task1 = executor.submit(get_sleep, 3)
task2 = executor.submit(get_sleep, 4)
task3 = executor.submit(get_sleep, 1)

# 將所有的task添加到一個列表中
all_task = [task1, task2, task3]

for task in all_task:
    print(task.result())

print("主線程執行完畢···")

‘‘‘
3
4
1
我睡了3秒
我睡了4秒
我睡了1秒
主線程執行完畢···
‘‘‘

# 首先打印3,4,1很好理解,但是為什麽先是"我睡了3秒",難道不應該是"我睡了1秒"嗎?
# 關於task.result()的返回順序問題,是按照添加任務的順序返回的
# 先執行的是task1.result(),所以必須要先等待三秒
# 再執行task2.result(),由於已經過了三秒,因此再等待一秒
# 最後執行task3.result(),此時task3早已經執行完畢,然後直接打印"我睡了1秒"
# 因此task.result()的返回順序,是由任務的添加順序決定的
# 比如,我換一種方式添加

all_task = [task3, task2, task1]
for task in all_task:
    print(task.result())

‘‘‘
我睡了1秒
我睡了4秒
我睡了3秒
‘‘‘
# 返回結果也驗證了我們上述的判斷

#!/usr/bin/env python
# -*- coding:utf-8 -*- 
# author:love_cat

# 如何取消一個任務,註意任務一旦開始執行那麽便不能被取消了

from concurrent.futures import ThreadPoolExecutor
import time


def get_sleep(t):
    print(f"{t}")
    time.sleep(t)
    return f"我睡了{t}秒"


executor = ThreadPoolExecutor(max_workers=3)

task1 = executor.submit(get_sleep, 3)
task2 = executor.submit(get_sleep, 4)
task3 = executor.submit(get_sleep, 1)

# task.cancel()表示取消一個函數,返回值是一個布爾類型。
# 通過True或者False,來判斷是否取消成功
print(task3.cancel())
‘‘‘
3
4
1
False
‘‘‘
# 可以看到返回結果為False,因為程序已經執行了
#!/usr/bin/env python
# -*- coding:utf-8 -*- 
# author:love_cat

# 取消任務,只有任務在還沒有執行的時候

from concurrent.futures import ThreadPoolExecutor
import time


def get_sleep(t):
    print(f"{t}")
    time.sleep(t)
    return f"我睡了{t}秒"


# 這次我們只添加兩個任務,首先task3肯定被添加到了任務隊列裏
# 但由於最大工作數是2,因此暫時是不會執行的,只有當某個其他的任務執行完畢,才會被執行
# 那麽此時就可以取消了,因為task3還沒有被執行
executor = ThreadPoolExecutor(max_workers=2)

task1 = executor.submit(get_sleep, 3)
task2 = executor.submit(get_sleep, 4)
task3 = executor.submit(get_sleep, 1)

print(task3.cancel())
‘‘‘
3
4
True
‘‘‘
# 可以看到返回結果True,說明取消成功了,而且task3的任務也沒有執行

#!/usr/bin/env python
# -*- coding:utf-8 -*- 
# author:love_cat

# 如何獲取已完成的任務的返回值
# 這裏使用as_complete函數
from concurrent.futures import ThreadPoolExecutor, as_completed
import time


def get_sleep(name, t):
    time.sleep(t)
    return f"{name}睡了{t}秒"


executor = ThreadPoolExecutor(max_workers=3)

task1 = executor.submit(get_sleep, "mashiro", 3)
task2 = executor.submit(get_sleep, "satori", 4)
task3 = executor.submit(get_sleep, "miku", 1)
all_task = [task1, task2, task3]

# 會異步檢測有哪些任務完成
# 既然這樣的話,那麽我們可以推測,返回值應該和添加順序無關,哪個先完成,哪個就先被返回
for task in as_completed(all_task):
    print(task.result())

‘‘‘
miku睡了1秒
mashiro睡了3秒
satori睡了4秒
‘‘‘

# 因此我們分析的是正確的,as_complete函數會不斷地檢測有哪些任務完成
# 那麽as_complete是如何做到這一點的呢?
# 我們可以看一下源碼

# as_complete源碼
‘‘‘
def as_completed(fs, timeout=None):
    """An iterator over the given futures that yields each as it completes.

    Args:
        fs: The sequence of Futures (possibly created by different Executors) to
            iterate over.
        timeout: The maximum number of seconds to wait. If None, then there
            is no limit on the wait time.

    Returns:
        An iterator that yields the given Futures as they complete (finished or
        cancelled). If any given Futures are duplicated, they will be returned
        once.

    Raises:
        TimeoutError: If the entire result iterator could not be generated
            before the given timeout.
    """
    if timeout is not None:
        end_time = timeout + time.time()

    fs = set(fs)
    total_futures = len(fs)
    with _AcquireFutures(fs):
        finished = set(
                f for f in fs
                if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
        pending = fs - finished
        waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
    finished = list(finished)
    try:
        yield from _yield_finished_futures(finished, waiter,
                                           ref_collect=(fs,))

        while pending:
            if timeout is None:
                wait_timeout = None
            else:
                wait_timeout = end_time - time.time()
                if wait_timeout < 0:
                    raise TimeoutError(
                            ‘%d (of %d) futures unfinished‘ % (
                            len(pending), total_futures))

            waiter.event.wait(wait_timeout)

            with waiter.lock:
                finished = waiter.finished_futures
                waiter.finished_futures = []
                waiter.event.clear()

            # reverse to keep finishing order
            finished.reverse()
            yield from _yield_finished_futures(finished, waiter,
                                               ref_collect=(fs, pending))

    finally:
        # Remove waiter from unfinished futures
        for f in fs:
            with f._condition:
                f._waiters.remove(waiter)
‘‘‘

# 可以看到函數當中出現了yield from,說明這個as_complete是一個生成器
# yield from 出現了兩次
# 第一次:有可能主線程當中遇到了阻塞,在執行到as_complete函數的時候,已經有一部分任務執行完畢,所以第一個yield from將那些已經結束的任務yield出去
# 第二次:然後對於那些沒有完成的任務,就不斷地循環檢測它們的狀態,如果有完成的任務,那麽繼續yield出去

# 因此異步的特點就在於此,當任務完成之後,我們主線程會立刻感受到,從而獲取已經完成的任務的返回值

#!/usr/bin/env python
# -*- coding:utf-8 -*- 
# author:love_cat

# 我們也可以通過executor獲取返回結果
# 可以用到executor下的map方法

from concurrent.futures import ThreadPoolExecutor
import time


def get_sleep(t):
    time.sleep(t)
    return f"睡了{t}秒"


executor = ThreadPoolExecutor(max_workers=3)

# 首先這裏的map和python內置的map是比較類似的
# map裏面傳入函數,後面是任務的參數,作為一個可叠代對象
# 這樣就可以自動執行了,不需要submit,map函數是位於executor下的一個方法

for data in executor.map(get_sleep, [3, 4, 1]):
    print(data)

# 程序運行結果
‘‘‘
mashiro睡了3秒
satori睡了4秒
miku睡了1秒
‘‘‘
# 可以看到返回值的順序和你添加任務的順序保持一致
# map函數不像as_complete一樣,具有異步檢測機制,可以先返回已經完成的任務

# 並且也不用再使用result()方法了,返回的直接就是任務的返回值
# 至於原因我們也可以看一下源碼
‘‘‘
    def map(self, fn, *iterables, timeout=None, chunksize=1):
        """Returns an iterator equivalent to map(fn, iter).

        Args:
            fn: A callable that will take as many arguments as there are
                passed iterables.
            timeout: The maximum number of seconds to wait. If None, then there
                is no limit on the wait time.
            chunksize: The size of the chunks the iterable will be broken into
                before being passed to a child process. This argument is only
                used by ProcessPoolExecutor; it is ignored by
                ThreadPoolExecutor.

        Returns:
            An iterator equivalent to: map(func, *iterables) but the calls may
            be evaluated out-of-order.

        Raises:
            TimeoutError: If the entire result iterator could not be generated
                before the given timeout.
            Exception: If fn(*args) raises for any values.
        """
        if timeout is not None:
            end_time = timeout + time.time()

        fs = [self.submit(fn, *args) for args in zip(*iterables)]

        # Yield must be hidden in closure so that the futures are submitted
        # before the first iterator value is required.
        def result_iterator():
            try:
                # reverse to keep finishing order
                fs.reverse()
                while fs:
                    # Careful not to keep a reference to the popped future
                    if timeout is None:
                        yield fs.pop().result()
                    else:
                        yield fs.pop().result(end_time - time.time())
            finally:
                for future in fs:
                    future.cancel()
        return result_iterator()
‘‘‘
# 可以到在yield的時候,並沒有yield出來task,而是直接將task.result()給yield出來了
# 因此循環得到的就是task的返回值

‘‘‘
fs = [self.submit(fn, *args) for args in zip(*iterables)]
‘‘‘
# 而且從這句源碼我們也可以看到,當調用map的時候,本質上調用的還是executor.submit,因為self就相當於我們這裏的executor
#!/usr/bin/env python
# -*- coding:utf-8 -*- 
# author:love_cat

# 關於map方法,再補充一點
# 有個地方,個人覺得非常惡心(只是本人這樣覺得)

from concurrent.futures import ThreadPoolExecutor
import time


# 之前的例子是一個參數,那麽如果改成兩個,map函數那裏如何傳參呢?
def get_sleep(name, t):
    time.sleep(t)
    return f"{name}睡了{t}秒"


executor = ThreadPoolExecutor(max_workers=3)

‘‘‘
def map(self, fn, *iterables, timeout=None, chunksize=1):
‘‘‘
# 通過源碼,得到map函數定義如下。可以看到接收很多參數,但參數都是可叠代的類型
# 於是我們想到了 executor.map(get_sleep,["mashiro",3],["satori",4],["miku",1])
# 但是這樣的話就完了。我們可以看看源碼對我們傳入的參數是怎麽處理的
‘‘‘
fs = [self.submit(fn, *args) for args in zip(*iterables)]
‘‘‘
# 可以看到源碼幹了這麽一件事
# 如果我們像之前那樣傳參的話,舉個栗子
‘‘‘
def mmp(*iterable):
    for args in zip(*iterable):
        print(*args)

mmp(["mashiro", 3], ["satori", 4], ["miku", 1])

    
‘mashiro‘, ‘satori‘, ‘miku‘
3, 4, 1    
‘‘‘
# 那麽此時self.submit(fn,*args) ===>self.submit(fn, "mashiro", "satori", "miku")
# 這與我們傳參完全搞反了
# 因此,當我們要傳入多個參數的是,應該這樣傳,executor.map(get_sleep,["mashiro","satori","miku"],[3,4,1])

for data in executor.map(get_sleep, ["mashiro", "satori", "miku"], [3, 4, 1]):
    print(data)
# 程序運行結果
‘‘‘
mashiro睡了3秒
satori睡了4秒
miku睡了1秒
‘‘‘
# 這樣程序便可正確執行

# 關於傳參的方式,我個人的話可能會這麽設計
‘‘‘
def mmp(*iterable):
    for args in iterable:
        print(*args)

mmp(["mashiro", 3], ["satori", 4], ["miku", 1])
輸出結果:
    mashiro 3
    satori 4
    miku 1
‘‘‘

#!/usr/bin/env python
# -*- coding:utf-8 -*- 
# author:love_cat

# 讓主線程等待
# 可以使用wait方法

from concurrent.futures import ThreadPoolExecutor, wait
import time


def get_sleep(name, t):
    print(f"{name}睡完{t}秒了,主線程你走了嗎?")
    time.sleep(t)
    return f"{name}睡了{t}秒"


executor = ThreadPoolExecutor(max_workers=3)

task1 = executor.submit(get_sleep, "mashiro", 3)
task2 = executor.submit(get_sleep, "satori", 4)
task3 = executor.submit(get_sleep, "miku", 1)
all_task = [task1, task2, task3]

# wait(all_task),會使主線程卡在這裏,只有等待所有任務完成才會往下走
wait(all_task)   # 裏面有個return_when參數,默認是ALL_COMPLETE,會等待所有任務完成。也可以指定FIRST_COMPLETE,等待第一個任務完成就往下走
print("你們都睡完了,我才能往下走···")
# 程序運行結果
‘‘‘
mashiro睡完3秒了,主線程你走了嗎?
satori睡完4秒了,主線程你走了嗎?
miku睡完1秒了,主線程你走了嗎?
你們都睡完了,我才能往下走···
‘‘‘

python--線程池(concurrent.futures)