1. 程式人生 > >Python_多程序

Python_多程序

Python 多程序庫 multiprocessing ,支援子程序、通訊、資料共享、執行不同形式的同步

多程序,繞過gil ,實現多核的利用,多程序也是原生程序,由作業系統維護

在pycharm中,可能沒有辦法正常使用multiprocessing.Process,最好是在Linux中執行

Process 用於建立程序模組
Pool 用於建立管理程序池
Queue 用於程序通訊,資源共享
Pipe 用於管道通訊
Manager 用於資源共享,同步程序                   

 

1.Process類


Process(group = None,target =None,name=None, args=[ ], kwargs={ })

group 執行緒組
target 要執行的方法
name 程序名
args/kwargs 要傳入方法的引數                                       

process屬性&方法:

authkey 程序的身份驗證金鑰
daemon 同thread的setDaemon,守護程序
exitcode 程序執行時為None,若為—N,則表示被訊號N結束
pid 程序號
name 程序名
is_alive() 返回程序是否正在執行
join([timeout]) 阻塞到執行緒結束或到timeout值 
start() 程序準備就緒,等待CPU排程
run() start()呼叫run方法,如果例項程序時未制定傳入target,start執行預設run()方法。         
terminate() 不管任務是否完成,立即停止工作程序

多程序的建立:

#!/usr/bin/python
# -*- coding:utf-8 -*-
'''多程序的建立'''
from multiprocessing import Process
import time

def fun(name):
    time.sleep(1)
    print('hello,%s' % name)
    print('----')

if __name__ =='__main__':
    for i in range(5):                # 程序同步
        p = Process(target=fun, args=('Presley',))
        p.start()


    p.join()
    print('結束。')
多程序

程序id :

#!/usr/bin/python3
# -*- coding:utf-8 -*-

from multiprocessing import Process
import os
def info(title):
    print(title)
    print('moudle name :',__name__)
    print('parent process id ', os.getppid())         
    print('process id ', os.getpid())                  


if __name__ =='__main__':
    info('hei. ')          # pycharm id和 主程序id             
    for i in range(3):
        p = Process(target=info, args=('Presley',))     # 主程序id   和 info 子程序id
        p.start()
    p.join()
多程序id
hei. 
moudle name : __main__
parent process id  1610
process id  1826
Presley
moudle name : __main__
parent process id  1826
process id  1827
Presley
moudle name : __main__
parent process id  1826
process id  1828
Presley
moudle name : __main__
parent process id  1826
process id  1829
result

 

2.Queue類 


不同程序間記憶體是不共享的,想要實現兩個程序間的資料交換,可以用Queue進行程序間通訊

queue是在多程序中做了一層封裝的佇列,以保證在當前程序裡程序安全

方法:queue

 程序中的隊,以保證程序安全

from multiprocessing import Process,Queue
def info(q):
    #  global q       # 錯誤,queue中 ,global 不行,因為子程序無法訪問父程序的記憶體資料
    q.put([34, None, 'yes'])


if __name__ =='__main__':
    q = Queue()
    for i in range(3):
        p = Process(target=info, args=[q,])      # 多個子程序的資料可以都可以放父程序資料
        p.start()
        print('來自父程序%s:%s'%(i, q.get()))
    p.join()
多程序_queue
來自父程序0:[34, None, 'yes']
來自父程序1:[34, None, 'yes']
來自父程序2:[34, None, 'yes']
result

 

3.Pipe類


管道操作(雙向佇列):會返回一對物件,管道的兩端分別賦給子程序和父程序

和佇列操作差不多,所以一般運用佇列較多

方法:

send() 傳送序列
recv() 接收序列
fileno()  返回一個整型的檔案描述符
close() 退出
poll()  判斷子程序是否結束
send_bytes() 以bytes格式傳送序列
recv_bytes() 以bytes格式接收序列                                   
from multiprocessing import Process,Pipe
import time
def info(conn):
    time.sleep(0.5)
    conn.send([32,None,'ni hao wa'])

    conn.close()

if __name__=='__main__':
    conn_parent ,conn_child = Pipe()
    print(conn_parent.fileno())       

    for i in range(3):
        p = Process(target=info,args=(conn_child,))
        print(bool(conn_child.poll))        # 程序是否結束
        p.start()
        # 如果沒有訊息可接收,recv方法會一直阻塞。如果管道已經被關閉,那麼recv方法會丟擲EOFError。
        print('父端接收%s:%s'% (i,conn_parent.recv()))

    p.join()
多程序_Pipe
200
True
父端接收0:[32, None, 'ni hao wa']
True
父端接收1:[32, None, 'ni hao wa']
True
父端接收2:[32, None, 'ni hao wa']
result

 

4.Manager


通過Manager可以簡單的使用list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Barries,Value+Arrary等型別的高階介面

Manager()返回的manager物件控制了一個server程序,此程序包含的python物件可以被其他的程序通過proxies來訪問。從而達到多程序間資料通訊且安全

 例:對list,dict的應用例子:

#!/usr/bin/python3
# -*- coding:utf-8 -*-
from multiprocessing import Process,Manager

def fun(d,l,n):
    d[2] = '3'
    d['e'] = 'e'
    d[34] = None
    l.append(n)
    print(l)

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()
        l = manager.list()
        join_list = []
        for i in range(6):
            p = Process(target=fun, args=(d,l,i))
            p.start()
            join_list.append(p)
        for res in join_list:
            res.join()
            print(l)
        print(d)
example
[5]
[5, 2]
[5, 2, 3]
[5, 2, 3, 0]
[5, 2, 3, 0, 4]
[5, 2, 3, 0, 4, 1]
[5, 2, 3, 0, 4, 1]
[5, 2, 3, 0, 4, 1]
[5, 2, 3, 0, 4, 1]
[5, 2, 3, 0, 4, 1]
[5, 2, 3, 0, 4, 1]
[5, 2, 3, 0, 4, 1]
{2: '3', 'e': 'e', 34: None}
result

Manager的詳細參考:https://www.aliyun.com/jiaocheng/490316.html

 

5.Pool 類(程序池)


當程序數過多時,用於限制程序數

 非同步:程序並行

同步:程序序列

方法:

apply_async(func,args,kwds,callback)

程序非同步,並行(func:執行一個函式,args/ dwds:程序引數,callback:Foo執行結果返回到callback執行的函式中)                 

apply(func,args,kwds) 程序同步,序列
close() 關閉程序池
terminate() 結束工作程序,不在處理未完成的任務
join() 主程序阻塞,等待子程序執行完畢
from multiprocessing import Pool,freeze_support
import time

def Foo(i):
    time.sleep(1)
    print('exec..')
    return i+100     # 返回到Bar中


def Bar(arg):
    print('來自Foo 的i :',arg)   # 接收 Foo中 的返回值

if __name__ == '__main__':
    freeze_support()       # 僅在Windows上才匯入此模組程序程式才不會出錯,Linux上不用
    pool = Pool(5)     # 限制每次進行的程序數為 5
    for i in range(10):
        pool.apply_async(func=Foo, args=(i,),callback=Bar)  # 程序非同步    # callback 把前面func的放在Bar中列印
        # pool.apply(func=Foo, args=(i,))         # 同步,序列   # 沒有callback屬性
    print('結束。。')
    pool.close()           # 注意:join必須放在close()後面,否則將不會等待子程序列印結束,而直接結束
    pool.join()
程序池
結束。。
exec..
exec..
exec..
exec..
exec..
來自Foo 的i : 104
來自Foo 的i : 102
來自Foo 的i : 103
來自Foo 的i : 100
來自Foo 的i : 101
exec..
exec..
exec..
exec..
exec..
來自Foo 的i : 105
來自Foo 的i : 106
來自Foo 的i : 107
來自Foo 的i : 108
來自Foo 的i : 109
非同步結果
exec..
exec..
exec..
exec..
exec..
exec..
exec..
exec..
exec..
exec..
結束。。
同步結果