1. 程式人生 > >python-線程池

python-線程池

chang executor pre mes -- raise chunk over 基本

********線程池********


Python標準模塊--Concurrent.futures

1.介紹

Concurrent.futures模塊提供了高度封裝的異步調用接口
ThreadPoolExecutor:線程池,提供異步調用
ProcessPoolExecutor:進程池,提供異步調用

Both implement the same interface, which is defined by the abstract Executor class.

2.基本方法

#submit(fn, *args, **kwargs)
異步提交任務

#map(func, *iterables, timeout=None, chunksize=1) 
取代for循環submit的操作 #shutdown(wait=True) 相當於進程池的pool.close()+pool.join()操作 wait=True,等待池內所有任務執行完畢回收完資源後才繼續 wait=False,立即返回,並不會等待池內的任務執行完畢 但不管wait參數為何值,整個程序都會等到所有任務執行完畢 submit和map必須在shutdown之前 #result(timeout=None) 取得結果 #add_done_callback(fn) 回調函數 ****ProcessPoolExecutor**** # 介紹 ‘‘‘ The ProcessPoolExecutor class is an Executor subclass that uses a pool of processes to execute calls asynchronously. ProcessPoolExecutor uses the multiprocessing module, which allows it to side-step the Global Interpreter Lock but also means that only picklable objects can be executed and returned. class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None) An Executor subclass that executes calls asynchronously using a pool of at most max_workers processes. If max_workers is None or not given, it will default to the number of processors on the machine. If max_workers is lower or equal to 0, then a ValueError will be raised.
‘‘‘ # 用法 from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor import os, time, random def task(n): print(%s is runing % os.getpid()) time.sleep(random.randint(1, 3)) return n ** 2 if __name__ == __main__: executor = ProcessPoolExecutor(os.cpu_count() + 1) futures
= [] for i in range(11): future = executor.submit(task, i) futures.append(future) executor.shutdown(True) print(+++>) for future in futures: print(future.result()) *****ThreadPoolExecutor**** #介紹 ‘‘‘ ThreadPoolExecutor is an Executor subclass that uses a pool of threads to execute calls asynchronously. class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix=‘‘) An Executor subclass that uses a pool of at most max_workers threads to execute calls asynchronously. Changed in version 3.5: If max_workers is None or not given, it will default to the number of processors on the machine, multiplied by 5, assuming that ThreadPoolExecutor is often used to overlap I/O instead of CPU work and the number of workers should be higher than the number of workers for ProcessPoolExecutor. New in version 3.6: The thread_name_prefix argument was added to allow users to control the threading. Thread names for worker threads created by the pool for easier debugging. ‘‘‘ #用法 #與ProcessPoolExecutor相同 *****map**** from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor import os, time, random def task(n): print(%s is runing % os.getpid()) time.sleep(random.randint(1, 3)) return n ** 2 if __name__ == __main__: executor = ThreadPoolExecutor(os.cpu_count() * 5) # for i in range(41): # future=executor.submit(task,i) executor.map(task, range(1, 42)) # map取代了for+submit ****回調函數**** from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor from multiprocessing import Pool import requests import json import os def get_page(url): print(<進程%s> get %s % (os.getpid(), url)) respone = requests.get(url) if respone.status_code == 200: return {url: url, text: respone.text} def parse_page(res): res = res.result() print(<進程%s> parse %s % (os.getpid(), res[url])) parse_res = url:<%s> size:[%s]\n % (res[url], len(res[text])) with open(db.txt, a) as f: f.write(parse_res) if __name__ == __main__: urls = [ https://www.baidu.com, https://www.python.org, https://www.openstack.org, https://help.github.com/, http://www.sina.com.cn/ ] # p=Pool(3) # for url in urls: # p.apply_async(get_page,args=(url,),callback=pasrse_page) # p.close() # p.join() p = ProcessPoolExecutor(3) for url in urls: p.submit(get_page, url).add_done_callback(parse_page) # parse_page拿到的是一個future對象obj,需要用obj.result()拿到結果

python-線程池