Python_多程序
Python 多程序庫 multiprocessing ,支援子程序、通訊、資料共享、執行不同形式的同步
多程序,繞過gil ,實現多核的利用,多程序也是原生程序,由作業系統維護
在pycharm中,可能沒有辦法正常使用multiprocessing.Process,最好是在Linux中執行
Process | 用於建立程序模組 |
Pool | 用於建立管理程序池 |
Queue | 用於程序通訊,資源共享 |
Pipe | 用於管道通訊 |
Manager | 用於資源共享,同步程序 |
1.Process類
Process(group = None,target =None,name=None, args=[ ], kwargs={ })
group | 執行緒組 |
target | 要執行的方法 |
name | 程序名 |
args/kwargs | 要傳入方法的引數 |
process屬性&方法:
authkey | 程序的身份驗證金鑰 |
daemon | 同thread的setDaemon,守護程序 |
exitcode | 程序執行時為None,若為—N,則表示被訊號N結束 |
pid | 程序號 |
name | 程序名 |
is_alive() | 返回程序是否正在執行 |
join([timeout]) | 阻塞到執行緒結束或到timeout值 |
start() | 程序準備就緒,等待CPU排程 |
run() | start()呼叫run方法,如果例項程序時未制定傳入target,start執行預設run()方法。 |
terminate() | 不管任務是否完成,立即停止工作程序 |
多程序的建立:
#!/usr/bin/python
# -*- coding:utf-8 -*-
'''多程序的建立'''
from multiprocessing import Process
import time
def fun(name):
time.sleep(1)
print('hello,%s' % name)
print('----')
if __name__ =='__main__':
for i in range(5): # 程序同步
p = Process(target=fun, args=('Presley',))
p.start()
p.join()
print('結束。')
多程序
程序id :
#!/usr/bin/python3
# -*- coding:utf-8 -*-
from multiprocessing import Process
import os
def info(title):
print(title)
print('moudle name :',__name__)
print('parent process id ', os.getppid())
print('process id ', os.getpid())
if __name__ =='__main__':
info('hei. ') # pycharm id和 主程序id
for i in range(3):
p = Process(target=info, args=('Presley',)) # 主程序id 和 info 子程序id
p.start()
p.join()
多程序id
hei.
moudle name : __main__
parent process id 1610
process id 1826
Presley
moudle name : __main__
parent process id 1826
process id 1827
Presley
moudle name : __main__
parent process id 1826
process id 1828
Presley
moudle name : __main__
parent process id 1826
process id 1829
result
2.Queue類
不同程序間記憶體是不共享的,想要實現兩個程序間的資料交換,可以用Queue進行程序間通訊
queue是在多程序中做了一層封裝的佇列,以保證在當前程序裡程序安全
方法:queue
程序中的隊,以保證程序安全
from multiprocessing import Process,Queue
def info(q):
# global q # 錯誤,queue中 ,global 不行,因為子程序無法訪問父程序的記憶體資料
q.put([34, None, 'yes'])
if __name__ =='__main__':
q = Queue()
for i in range(3):
p = Process(target=info, args=[q,]) # 多個子程序的資料可以都可以放父程序資料
p.start()
print('來自父程序%s:%s'%(i, q.get()))
p.join()
多程序_queue
來自父程序0:[34, None, 'yes']
來自父程序1:[34, None, 'yes']
來自父程序2:[34, None, 'yes']
result
3.Pipe類
管道操作(雙向佇列):會返回一對物件,管道的兩端分別賦給子程序和父程序
和佇列操作差不多,所以一般運用佇列較多
方法:
send() | 傳送序列 |
recv() | 接收序列 |
fileno() | 返回一個整型的檔案描述符 |
close() | 退出 |
poll() | 判斷子程序是否結束 |
send_bytes() | 以bytes格式傳送序列 |
recv_bytes() | 以bytes格式接收序列 |
from multiprocessing import Process,Pipe
import time
def info(conn):
time.sleep(0.5)
conn.send([32,None,'ni hao wa'])
conn.close()
if __name__=='__main__':
conn_parent ,conn_child = Pipe()
print(conn_parent.fileno())
for i in range(3):
p = Process(target=info,args=(conn_child,))
print(bool(conn_child.poll)) # 程序是否結束
p.start()
# 如果沒有訊息可接收,recv方法會一直阻塞。如果管道已經被關閉,那麼recv方法會丟擲EOFError。
print('父端接收%s:%s'% (i,conn_parent.recv()))
p.join()
多程序_Pipe
200
True
父端接收0:[32, None, 'ni hao wa']
True
父端接收1:[32, None, 'ni hao wa']
True
父端接收2:[32, None, 'ni hao wa']
result
4.Manager
通過Manager可以簡單的使用list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Barries,Value+Arrary等型別的高階介面
Manager()返回的manager物件控制了一個server程序,此程序包含的python物件可以被其他的程序通過proxies來訪問。從而達到多程序間資料通訊且安全
例:對list,dict的應用例子:
#!/usr/bin/python3
# -*- coding:utf-8 -*-
from multiprocessing import Process,Manager
def fun(d,l,n):
d[2] = '3'
d['e'] = 'e'
d[34] = None
l.append(n)
print(l)
if __name__ == '__main__':
with Manager() as manager:
d = manager.dict()
l = manager.list()
join_list = []
for i in range(6):
p = Process(target=fun, args=(d,l,i))
p.start()
join_list.append(p)
for res in join_list:
res.join()
print(l)
print(d)
example
[5]
[5, 2]
[5, 2, 3]
[5, 2, 3, 0]
[5, 2, 3, 0, 4]
[5, 2, 3, 0, 4, 1]
[5, 2, 3, 0, 4, 1]
[5, 2, 3, 0, 4, 1]
[5, 2, 3, 0, 4, 1]
[5, 2, 3, 0, 4, 1]
[5, 2, 3, 0, 4, 1]
[5, 2, 3, 0, 4, 1]
{2: '3', 'e': 'e', 34: None}
result
Manager的詳細參考:https://www.aliyun.com/jiaocheng/490316.html
5.Pool 類(程序池)
當程序數過多時,用於限制程序數
非同步:程序並行
同步:程序序列
方法:
apply_async(func,args,kwds,callback) | 程序非同步,並行(func:執行一個函式,args/ dwds:程序引數,callback:Foo執行結果返回到callback執行的函式中) |
apply(func,args,kwds) | 程序同步,序列 |
close() | 關閉程序池 |
terminate() | 結束工作程序,不在處理未完成的任務 |
join() | 主程序阻塞,等待子程序執行完畢 |
from multiprocessing import Pool,freeze_support
import time
def Foo(i):
time.sleep(1)
print('exec..')
return i+100 # 返回到Bar中
def Bar(arg):
print('來自Foo 的i :',arg) # 接收 Foo中 的返回值
if __name__ == '__main__':
freeze_support() # 僅在Windows上才匯入此模組程序程式才不會出錯,Linux上不用
pool = Pool(5) # 限制每次進行的程序數為 5
for i in range(10):
pool.apply_async(func=Foo, args=(i,),callback=Bar) # 程序非同步 # callback 把前面func的放在Bar中列印
# pool.apply(func=Foo, args=(i,)) # 同步,序列 # 沒有callback屬性
print('結束。。')
pool.close() # 注意:join必須放在close()後面,否則將不會等待子程序列印結束,而直接結束
pool.join()
程序池
結束。。
exec..
exec..
exec..
exec..
exec..
來自Foo 的i : 104
來自Foo 的i : 102
來自Foo 的i : 103
來自Foo 的i : 100
來自Foo 的i : 101
exec..
exec..
exec..
exec..
exec..
來自Foo 的i : 105
來自Foo 的i : 106
來自Foo 的i : 107
來自Foo 的i : 108
來自Foo 的i : 109
非同步結果
exec..
exec..
exec..
exec..
exec..
exec..
exec..
exec..
exec..
exec..
結束。。
同步結果