Python concurrent.futures模組使用例項
阿新 • • 發佈:2020-01-09
這篇文章主要介紹了Python concurrent.futures模組使用例項,文中通過示例程式碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
concurrent.futures的作用:
管理併發任務池。concurrent.futures模組提供了使用工作執行緒或程序池執行任務的介面。執行緒和程序池API都是一樣,所以應用只做最小的修改就可以線上程和程序之間地切換
1、基於執行緒池使用map()
futures_thread_pool_map.py
#!/usr/bin/env python # -*- coding: utf-8 -*- from concurrent import futures import threading import time def task(n): print('{}: 睡眠 {}'.format(threading.current_thread().name,n)) time.sleep(n / 10) print('{}: 執行完成 {}'.format(threading.current_thread().name,n)) return n / 10 ex = futures.ThreadPoolExecutor(max_workers=2) print('main: 開始執行') results = ex.map(task,range(5,-1)) #返回值是generator 生成器 print('main: 未處理的結果 {}'.format(results)) print('main: 等待真實結果') real_results = list(results) print('main: 最終結果: {}'.format(real_results))
執行效果
[root@ mnt]# python3 futures_thread_pool_map.py main: 開始執行 ThreadPoolExecutor-0_0: 睡眠 5 ThreadPoolExecutor-0_1: 睡眠 4 main: 未處理的結果 <generator object Executor.map.<locals>.result_iterator at 0x7f1c97484678> main: 等待真實結果 ThreadPoolExecutor-0_1: 執行完成 4 ThreadPoolExecutor-0_1: 睡眠 3 ThreadPoolExecutor-0_0: 執行完成 5 ThreadPoolExecutor-0_0: 睡眠 2 ThreadPoolExecutor-0_0: 執行完成 2 ThreadPoolExecutor-0_0: 睡眠 1 ThreadPoolExecutor-0_1: 執行完成 3 ThreadPoolExecutor-0_0: 執行完成 1 main: 最終結果: [0.5,0.4,0.3,0.2,0.1]
2、futures執行單個任務
futures_thread_pool_submit.py
#!/usr/bin/env python # -*- coding: utf-8 -*- from concurrent import futures import threading import time def task(n): print('{}: 睡眠 {}'.format(threading.current_thread().name,n)) return n / 10 ex = futures.ThreadPoolExecutor(max_workers=2) print('main :開始') f = ex.submit(task,5) print('main: future: {}'.format(f)) print('等待執行結果') results = f.result() print('main: result:{}'.format(results)) print('main: future 之後的結果:{}'.format(f))
執行效果
[root@ mnt]# python3 futures_thread_pool_submit.py main :開始 ThreadPoolExecutor-0_0: 睡眠 5 main: future: <Future at 0x7f40c0a6a400 state=running> 等待執行結果 ThreadPoolExecutor-0_0: 執行完成 5 main: result:0.5 main: future 之後的結果:<Future at 0x7f40c0a6a400 state=finished returned float>
3、futures.as_completed()按任意順序執行結果
futures_as_completed.py
#!/usr/bin/env python # -*- coding: utf-8 -*- import random import time from concurrent import futures def task(n): time.sleep(random.random()) return (n,n / 10) ex = futures.ThreadPoolExecutor(max_workers=2) print('main: 開始') wait_for = [ ex.submit(task,i) for i in range(5,-1) ] for f in futures.as_completed(wait_for): print('main: result:{}'.format(f.result()))
執行效果
[root@ mnt]# python3 futures_as_completed.py main: 開始 main: result:(5,0.5) main: result:(4,0.4) main: result:(3,0.3) main: result:(1,0.1) main: result:(2,0.2)
4、Future回撥之futures.add_done_callback()
futures_future_callback.py
#!/usr/bin/env python # -*- coding: utf-8 -*- from concurrent import futures import time def task(n): print('task {} : 睡眠'.format(n)) time.sleep(0.5) print('task {} : 完成'.format(n)) return n / 10 def done(fn): if fn.cancelled(): print('done {}:取消'.format(fn.arg)) elif fn.done(): error = fn.exception() if error: print('done {} : 錯誤返回 : {}'.format(fn.arg,error)) else: result = fn.result() print('done {} : 正常返回 : {}'.format(fn.arg,result)) if __name__ == '__main__': ex = futures.ThreadPoolExecutor(max_workers=2) print('main : 開始') f = ex.submit(task,5) f.arg = 5 f.add_done_callback(done) result = f.result()
執行效果
[root@ mnt]# python3 futures_future_callback.py main : 開始 task 5 : 睡眠 task 5 : 完成 done 5 : 正常返回 : 0.5
5、Future任務取消之futures.cancel()
futures_future_callback_cancel.py
#!/usr/bin/env python # -*- coding: utf-8 -*- from concurrent import futures import time def task(n): print('task {} : 睡眠'.format(n)) time.sleep(0.5) print('task {} : 完成'.format(n)) return n / 10 def done(fn): if fn.cancelled(): print('done {}:取消'.format(fn.arg)) elif fn.done(): error = fn.exception() if error: print('done {} : 錯誤返回 : {}'.format(fn.arg,result)) if __name__ == '__main__': ex = futures.ThreadPoolExecutor(max_workers=2) print('main : 開始') tasks = [] for i in range(10,-1): print('main: submitting {}'.format(i)) f = ex.submit(task,i) f.arg = i f.add_done_callback(done) tasks.append((i,f)) for i,task_obj in reversed(tasks): if not task_obj.cancel(): print('main: 不能取消{}'.format(i)) ex.shutdown()
執行效果
[root@mnt]# python3 futures_future_callback_cancel.py main : 開始 main: submitting 10 task 10 : 睡眠 main: submitting 9 task 9 : 睡眠 main: submitting 8 main: submitting 7 main: submitting 6 main: submitting 5 main: submitting 4 main: submitting 3 main: submitting 2 main: submitting 1 done 1:取消 done 2:取消 done 3:取消 done 4:取消 done 5:取消 done 6:取消 done 7:取消 done 8:取消 main: 不能取消9 main: 不能取消10 task 10 : 完成 done 10 : 正常返回 : 1.0 task 9 : 完成 done 9 : 正常返回 : 0.9
6、Future異常的處理
futures_future_exception
#!/usr/bin/env python # -*- coding: utf-8 -*- from concurrent import futures def task(n): print('{} : 開始'.format(n)) raise ValueError('這個值不太好 {}'.format(n)) ex = futures.ThreadPoolExecutor(max_workers=2) print('main: 開始...') f = ex.submit(task,5) error = f.exception() print('main: error:{}'.format(error)) try: result = f.result() except ValueError as e: print('訪問結果值的異常 {}'.format(e))
執行效果
[root@mnt]# python3 futures_future_exception.py main: 開始... 5 : 開始 main: error:這個值不太好 5 訪問結果值的異常 這個值不太好 5
7、Future上下文管理即利用with開啟futures.ThreadPoolExecutor()
futures_context_manager.py
#!/usr/bin/env python # -*- coding: utf-8 -*- from concurrent import futures def task(n): print(n) with futures.ThreadPoolExecutor(max_workers=2) as ex: print('main: 開始') ex.submit(task,1) ex.submit(task,2) ex.submit(task,3) ex.submit(task,4) print('main: 結束')
執行效果
[root@ mnt]# python3 futures_context_manager.py main: 開始 2 4 main: 結束
8、基於程序池使用map()
futures_process_pool_map.py
#!/usr/bin/env python # -*- coding: utf-8 -*- from concurrent import futures import os def task(n): return (n,os.getpid()) if __name__ == '__main__': ex = futures.ProcessPoolExecutor(max_workers=2) results = ex.map(task,range(50,-1)) for n,pid in results: print('task {} in 程序id {}'.format(n,pid))
執行效果
[root@ mnt]# python3 futures_process_pool_map.py task 5 in 程序id 9192 task 4 in 程序id 8668 task 3 in 程序id 9192 task 2 in 程序id 8668 task 1 in 程序id 9192
9、基於程序池異常處理
futures_process_pool_broken.py
#!/usr/bin/env python # -*- coding: utf-8 -*- from concurrent import futures import os import signal def task(n): return (n,os.getpid()) if __name__ == '__main__': with futures.ProcessPoolExecutor(max_workers=2) as ex: print('獲取工作程序的id') f1 = ex.submit(os.getpid) pid1 = f1.result() print('結束程序 {}'.format(pid1)) os.kill(pid1,signal.SIGHUP) print('提交其它程序') f2 = ex.submit(os.getpid) try: pid2 = f2.result() except futures.process.BrokenProcessPool as e: print('不能開始新的任務:{}'.format(e))
執行效果
[root@ mnt]# python3 futures_process_pool_broken.py 獲取工作程序的id 結束程序 104623 提交其它程序 不能開始新的任務:A process in the process pool was terminated abruptly while the future was running or pending.
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支援我們。