多進程 multiprocessing 多線程Threading 線程池和進程池concurrent.futures
阿新 • • 發佈:2018-08-26
不用 文件 進程池 lba ren 行操作 接收參數 出現 ali
multiprocessing.procsess 定義一個函數 def func():pass 在if __name__=="__main__":中實例化 p = process(target=子進程要執行的函數,args(函數的參數且必須以元組的方式傳參)) p.start() 開啟子進程 p.join() 感知子進程的結束,主進程等待子進程執行完後才退出 p.terminate() 結束一個子進程 p.is_alive() 查看某個進程是否還在運行 屬性 p.name p.pid 進程名和進程號 p.deamon 守護進程,主進程的代碼執行完,子進程也隨即結束,在子進程開始執行前設置 用類開啟子進程的方式,必須有run() Myprocsess(procsess):def __init__(name,*args,**kwargs):pass def run():pass p = Myprocsess(‘name‘) **在子進程中不要出現input # lock semaphore event queue joinableQueue 對象在主進程創建 multiprocessing.Lock 實例化 鎖 對數據的一種保護方式,一次只能一個進程對數據進行操作 l = Lock() l.acquire() 拿鑰匙 加鎖,其他進程不能訪問 l.release() 還鑰匙 釋放鎖,釋放了之後,其他進程才能拿鎖對數據進行操作 其余進程阻塞等待 multiprocessing.semaphore 實例化信號量,傳入同時允許訪問的進程的數量 sem= semaphore(4) sem.acquire() 獲取鑰匙,同時允許4個進程訪問 sem.release() 還鑰匙,讓其他進程得以訪問 multiprocessing.Event 通過一個信號來控制多個進程,一個信號可以讓所有的進程都阻塞,也可以解除所有進程的阻塞 e = Event() e.is_set() 查看一個事件的狀態,默認被設置成False阻塞,True為不阻塞 e.set() 修改一個事件的狀態為Ture e.clear() 修改一個事件的狀態為Flase e.wait() 是依據事件的狀態來決定是否阻塞,Flase阻塞,True不阻塞 multiprocessing.Queue 隊列 先進先出 q= Queue(隊列數) q.put() 放一個進隊列 q.get() 從隊列中取一個出來 q.full() 隊列是否滿了 q.empty() 隊列是否空了 multiprocessing.JoinableQueue q = JoinableQueue(20) q.put() 放入隊列,count +1 q.join() # 阻塞 直到一個隊列中的所有數據 全部被處理完畢 q.get() 取消息 q.task_done() # count -1 在處理完get到的消息後,處理完畢後告訴隊列 # 隊列中只能存放實例化時設置的參數的個數,隊列未滿才會繼續put # 使用JoinableQueue,get()後用.task_done(),這樣隊列put端put後的join將在最後一次調用task_done後不阻塞 multiprocessing.Pipe con,pro = Pipe() # 數據不安全性,加鎖 process(func,args=(con,pro)) con.recv() con.close() pro.send(f) pro.close() # con,pro不用send或recv就close,只留一個recv取完數據將會引發EOFError異常 multiprocessing.Manager m = Manager() dic=m.dict({‘count‘:100}) p = Process(target=func,args=(dic,l)) # manager數據間的共享,不同子進程對數據修改,主進程可拿值,不安全,加鎖 multiprocessing.Pool p = Pool(5) p.map(func,iterable) # 默認的異步執行任務,切自帶close和join功能 p.Pool(5) p.apply_async(func,args=(i,)) # 異步調用 和主進程完全異步 p.close() # 結束進程池接收任務 p.join() # 感知進程池中的任務執行結束 res = p.apply_async(func,args=(i,)) # 返回對象 對象.get() # 阻塞等待結果 func對象的計算返回結果 ret = p.map(func,iterable) # 等待所有的進程執行完,將func的計算結果用列表返回 p.apply_async(func,args=(i,),callback=func2) # 回調函數是在主進程執行 # 回調函數,不能接收參數,只能將func的執行結果作為參數傳給回調函數
多線程Threading # 多線程和多進程的使用類似。 # 進程 是 最小的 內存分配單位 # 線程 是 操作系統調度的最小單位 # 線程直接被CPU執行,進程內至少含有一個線程,也可以開啟多個線程 # 開啟一個線程所需要的時間要遠遠小於開啟一個進程 # 多個線程內部有自己的數據棧,數據不共享 # 全局變量在多個線程之間是共享的 # GIL鎖(即全局解釋器鎖) # 在Cpython解釋器下的python程序 在同一時刻 多個線程中只能有一個線程被CPU執行 # 高CPU : 計算類 --- 高CPU利用率 # 高IO : 爬取網頁 200個網頁 # qq聊天 send recv # 處理日誌文件 讀文件 # 處理web請求 # 讀數據庫 寫數據庫 threading.Thread(target=wahaha,args=(i,)).start() print(threading.active_count()) print(threading.current_thread()) print(threading.enumerate()) threading.Condition con = Condition() # 條件 # 一個條件被創建之初 默認有一個False狀態 # False狀態 會影響wait一直處於等待狀態 con.acquire() con.wait() # 等鑰匙 con.notify(num) # 造鑰匙 con.release() # 用完即銷毀,notify(數量),條件狀態設置為notify(數量)個True,wait()通過一次,就減少一個True threading.Timer # 定時器 t = Timer(5,func).start() # 非阻塞的 # 只是延時執行func函數,異步開啟線程
import queue q = queue.Queue() # 隊列 先進先出 q = queue.LifoQueue() # 棧 先進後出 q = queue.PriorityQueue() q.put((20,‘a‘)) # 優先級隊列,第一個元組元素是優先級,數字越小越優先,優先級相同,消息按ascll排序
# concurrent.futures 線程池和進程池 from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor pool = ThreadPoolExecutor(max_workers=5) # 設置進程池中的進程個數 pool.map(func,range(20)) # == for i in range(20):submit(func,i) # 拿不到返回值,參數 *iterables p = pool.submit(func,i) # 返回對象,p.result()取值 添加回調函數 .add_done_callback(call_back) ,回調函數接收參數p對象,用p.result()取值 pool.shutdown() # close+join from gevent import monkey;monkey.patch_all() import gevent # 協程 本質上是一個線程 # print(threading.current_thread().getName()) # 虛擬線程 g1 = gevent.spawn(eat) g2 = gevent.spawn(play) g1.join() g2.join() # gevent.joinall(g_lst) # for g in g_lst:g.join() # 進程和線程的任務切換右操作系統完成 # 協程任務之間的切換由程序(代碼)完成,只有遇到協程模塊能識別的IO操作的時候,程序才會進行任務切換,實現並發的效果
多進程 multiprocessing 多線程Threading 線程池和進程池concurrent.futures