1. 程式人生 > >多進程 multiprocessing 多線程Threading 線程池和進程池concurrent.futures

多進程 multiprocessing 多線程Threading 線程池和進程池concurrent.futures

不用 文件 進程池 lba ren 行操作 接收參數 出現 ali

multiprocessing.procsess
定義一個函數
def func():pass
在if __name__=="__main__":中實例化
p = process(target=子進程要執行的函數,args(函數的參數且必須以元組的方式傳參))
p.start()    開啟子進程
p.join()    感知子進程的結束,主進程等待子進程執行完後才退出
p.terminate()    結束一個子進程
p.is_alive()    查看某個進程是否還在運行

屬性
p.name    p.pid    進程名和進程號
p.deamon    守護進程,主進程的代碼執行完,子進程也隨即結束,在子進程開始執行前設置
用類開啟子進程的方式,必須有run()
Myprocsess(procsess):
    
def __init__(name,*args,**kwargs):pass def run():pass p = Myprocsess(name) **在子進程中不要出現input # lock semaphore event queue joinableQueue 對象在主進程創建 multiprocessing.Lock 實例化 鎖 對數據的一種保護方式,一次只能一個進程對數據進行操作 l = Lock() l.acquire() 拿鑰匙 加鎖,其他進程不能訪問 l.release() 還鑰匙 釋放鎖,釋放了之後,其他進程才能拿鎖對數據進行操作 其余進程阻塞等待 multiprocessing.semaphore 實例化信號量,傳入同時允許訪問的進程的數量 sem
= semaphore(4) sem.acquire() 獲取鑰匙,同時允許4個進程訪問 sem.release() 還鑰匙,讓其他進程得以訪問 multiprocessing.Event 通過一個信號來控制多個進程,一個信號可以讓所有的進程都阻塞,也可以解除所有進程的阻塞 e = Event() e.is_set() 查看一個事件的狀態,默認被設置成False阻塞,True為不阻塞 e.set() 修改一個事件的狀態為Ture e.clear() 修改一個事件的狀態為Flase e.wait() 是依據事件的狀態來決定是否阻塞,Flase阻塞,True不阻塞 multiprocessing.Queue 隊列 先進先出 q
= Queue(隊列數) q.put() 放一個進隊列 q.get() 從隊列中取一個出來 q.full() 隊列是否滿了 q.empty() 隊列是否空了 multiprocessing.JoinableQueue q = JoinableQueue(20) q.put() 放入隊列,count +1 q.join() # 阻塞 直到一個隊列中的所有數據 全部被處理完畢 q.get() 取消息 q.task_done() # count -1 在處理完get到的消息後,處理完畢後告訴隊列 # 隊列中只能存放實例化時設置的參數的個數,隊列未滿才會繼續put # 使用JoinableQueue,get()後用.task_done(),這樣隊列put端put後的join將在最後一次調用task_done後不阻塞 multiprocessing.Pipe con,pro = Pipe() # 數據不安全性,加鎖 process(func,args=(con,pro)) con.recv() con.close() pro.send(f) pro.close() # con,pro不用send或recv就close,只留一個recv取完數據將會引發EOFError異常 multiprocessing.Manager m = Manager() dic=m.dict({count:100}) p = Process(target=func,args=(dic,l)) # manager數據間的共享,不同子進程對數據修改,主進程可拿值,不安全,加鎖 multiprocessing.Pool p = Pool(5) p.map(func,iterable) # 默認的異步執行任務,切自帶close和join功能 p.Pool(5) p.apply_async(func,args=(i,)) # 異步調用 和主進程完全異步 p.close() # 結束進程池接收任務 p.join() # 感知進程池中的任務執行結束 res = p.apply_async(func,args=(i,)) # 返回對象 對象.get() # 阻塞等待結果 func對象的計算返回結果 ret = p.map(func,iterable) # 等待所有的進程執行完,將func的計算結果用列表返回 p.apply_async(func,args=(i,),callback=func2) # 回調函數是在主進程執行 # 回調函數,不能接收參數,只能將func的執行結果作為參數傳給回調函數

多線程Threading
# 多線程和多進程的使用類似。
# 進程 是 最小的 內存分配單位
# 線程 是 操作系統調度的最小單位
# 線程直接被CPU執行,進程內至少含有一個線程,也可以開啟多個線程
    # 開啟一個線程所需要的時間要遠遠小於開啟一個進程
    # 多個線程內部有自己的數據棧,數據不共享
    # 全局變量在多個線程之間是共享的
# GIL鎖(即全局解釋器鎖)
# 在Cpython解釋器下的python程序 在同一時刻 多個線程中只能有一個線程被CPU執行
# 高CPU : 計算類 --- 高CPU利用率
# 高IO  : 爬取網頁 200個網頁    # qq聊天 send recv    # 處理日誌文件 讀文件    # 處理web請求    # 讀數據庫 寫數據庫
threading.Thread(target=wahaha,args=(i,)).start()
print(threading.active_count())
print(threading.current_thread())
print(threading.enumerate())

threading.Condition
con = Condition()    # 條件    # 一個條件被創建之初 默認有一個False狀態    # False狀態 會影響wait一直處於等待狀態
con.acquire()
con.wait()         # 等鑰匙
con.notify(num)    # 造鑰匙
con.release()
# 用完即銷毀,notify(數量),條件狀態設置為notify(數量)個True,wait()通過一次,就減少一個True

threading.Timer    # 定時器
t = Timer(5,func).start()   # 非阻塞的
# 只是延時執行func函數,異步開啟線程

import queue
q = queue.Queue()        # 隊列 先進先出
q = queue.LifoQueue()    # 棧 先進後出
q = queue.PriorityQueue() 
q.put((20,a)) 
# 優先級隊列,第一個元組元素是優先級,數字越小越優先,優先級相同,消息按ascll排序

# concurrent.futures 線程池和進程池
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
pool = ThreadPoolExecutor(max_workers=5)    # 設置進程池中的進程個數
pool.map(func,range(20))     # == for i in range(20):submit(func,i)
# 拿不到返回值,參數 *iterables
p = pool.submit(func,i)
# 返回對象,p.result()取值    添加回調函數 .add_done_callback(call_back) ,回調函數接收參數p對象,用p.result()取值
pool.shutdown()  # close+join

from gevent import monkey;monkey.patch_all()
import gevent    # 協程 本質上是一個線程    # print(threading.current_thread().getName())    # 虛擬線程
g1 = gevent.spawn(eat)
g2 = gevent.spawn(play)
g1.join()
g2.join()    # gevent.joinall(g_lst)  # for g in g_lst:g.join()
# 進程和線程的任務切換右操作系統完成
# 協程任務之間的切換由程序(代碼)完成,只有遇到協程模塊能識別的IO操作的時候,程序才會進行任務切換,實現並發的效果

多進程 multiprocessing 多線程Threading 線程池和進程池concurrent.futures