1. 程式人生 > >多工(程序, 程序間通訊-Queue ,程序池)

多工(程序, 程序間通訊-Queue ,程序池)

1. 程序

  • 程式:例如xxx.py這是程式,是一個靜態的

  • 程序:一個程式執行起來後,程式碼+用到的資源 稱之為程序,它是作業系統分配資源的基本單元。

    不僅可以通過執行緒完成多工,程序也是可以的
    2. 程序的狀態
    工作中,任務數往往大於cpu的核數,即一定有一些任務正在執行,而另外一些任務在等待cpu進行執行,因此導致了有了不同的狀態
    在這裡插入圖片描述

  • 就緒態:執行的條件都已經慢去,正在等在cpu執行

  • 執行態:cpu正在執行其功能

  • 等待態:等待某些條件滿足,例如一個程式sleep了,此時就處於等待態
    程序的建立-multiprocessing
    multiprocessing模組就是跨平臺版本的多程序模組,提供了一個Process類來代表一個程序物件,這個物件可以理解為是一個獨立的程序,可以執行另外的事情

# -*- coding:utf-8 -*-
from multiprocessing import Process
import time


def run_proc():
    """子程序要執行的程式碼"""
    while True:
        print("----2----")
        time.sleep(1)


if __name__=='__main__':
    p = Process(target=run_proc)
    p.start()
    while True:
        print("----1----")
        time.sleep(1)

說明:
建立子程序時,只需要傳入一個執行函式和函式的引數,建立一個Process例項,用start()方法啟動
程序pid:

# -*- coding:utf-8 -*-
from multiprocessing import Process
import os
import time

def run_proc():
    """子程序要執行的程式碼"""
    print('子程序執行中,pid=%d...' % os.getpid())  # os.getpid獲取當前程序的程序號
    print('子程序將要結束...')

if __name__ == '__main__':
    print('父程序pid: %d' % os.getpid())  # os.getpid獲取當前程序的程序號
    p = Process(target=run_proc)
    p.start()

*** Process語法結構如下:***
Process([group [, target [, name [, args [, kwargs]]]]])

  • target:如果傳遞了函式的引用,可以任務這個子程序就執行這裡的程式碼
  • args:給target指定的函式傳遞的引數,以元組的方式傳遞
  • kwargs:給target指定的函式傳遞命名引數
  • name:給程序設定一個名字,可以不設定
  • group:指定程序組,大多數情況下用不到

Process建立的例項物件的常用方法:

  • start():啟動子程序例項(建立子程序)
  • is_alive():判斷程序子程序是否還在活著
  • join([timeout]):是否等待子程序執行結束,或等待多少秒
  • terminate():不管任務是否完成,立即終止子程序

Process建立的例項物件的常用屬性:

  • name:當前程序的別名,預設為Process-N,N為從1開始遞增的整數
  • pid:當前程序的pid(程序號)

給子程序指定的函式傳遞引數

# -*- coding:utf-8 -*-
from multiprocessing import Process
import os
from time import sleep


def run_proc(name, age, **kwargs):
    for i in range(10):
        print('子程序執行中,name= %s,age=%d ,pid=%d...' % (name, age, os.getpid()))
        print(kwargs)
        sleep(0.2)

if __name__=='__main__':
    p = Process(target=run_proc, args=('test',18), kwargs={"m":20})
    p.start()
    sleep(1)  # 1秒中之後,立即結束子程序
    p.terminate()
    p.join()

執行結果:

子程序執行中,name= test,age=18 ,pid=45097...
{'m': 20}
子程序執行中,name= test,age=18 ,pid=45097...
{'m': 20}
子程序執行中,name= test,age=18 ,pid=45097...
{'m': 20}
子程序執行中,name= test,age=18 ,pid=45097...
{'m': 20}
子程序執行中,name= test,age=18 ,pid=45097...
{'m': 20}

程序間不同享全域性變數

# -*- coding:utf-8 -*-
from multiprocessing import Process
import os
import time

nums = [11, 22]

def work1():
    """子程序要執行的程式碼"""
    print("in process1 pid=%d ,nums=%s" % (os.getpid(), nums))
    for i in range(3):
        nums.append(i)
        time.sleep(1)
        print("in process1 pid=%d ,nums=%s" % (os.getpid(), nums))

def work2():
    """子程序要執行的程式碼"""
    print("in process2 pid=%d ,nums=%s" % (os.getpid(), nums))

if __name__ == '__main__':
    p1 = Process(target=work1)
    p1.start()
    p1.join()

    p2 = Process(target=work2)
    p2.start()

執行結果:

in process1 pid=11349 ,nums=[11, 22]
in process1 pid=11349 ,nums=[11, 22, 0]
in process1 pid=11349 ,nums=[11, 22, 0, 1]
in process1 pid=11349 ,nums=[11, 22, 0, 1, 2]
in process2 pid=11350 ,nums=[11, 22]

1 . 程序間通訊-Queue
可以使用multiprocessing模組的Queue實現多程序之間的資料傳遞,Queue本身是一個訊息列隊程式,首先用一個小例項來演示一下Queue的工作原理:

#coding=utf-8
from multiprocessing import Queue
q=Queue(3) #初始化一個Queue物件,最多可接收三條put訊息
q.put("訊息1") 
q.put("訊息2")
print(q.full())  #False
q.put("訊息3")
print(q.full()) #True

#因為訊息列隊已滿下面的try都會丟擲異常,第一個try會等待2秒後再丟擲異常,第二個Try會立刻丟擲異常
try:
    q.put("訊息4",True,2)
except:
    print("訊息列隊已滿,現有訊息數量:%s"%q.qsize())

try:
    q.put_nowait("訊息4")
except:
    print("訊息列隊已滿,現有訊息數量:%s"%q.qsize())

#推薦的方式,先判斷訊息列隊是否已滿,再寫入
if not q.full():
    q.put_nowait("訊息4")

#讀取訊息時,先判斷訊息列隊是否為空,再讀取
if not q.empty():
    for i in range(q.qsize()):
        print(q.get_nowait())

執行結果:

False
True
訊息列隊已滿,現有訊息數量:3
訊息列隊已滿,現有訊息數量:3
訊息1
訊息2
訊息3

說明
初始化Queue()物件時(例如:q=Queue()),若括號中沒有指定最大可接收的訊息數量,或數量為負值,那麼就代表可接受的訊息數量沒有上限(直到記憶體的盡頭);

  • Queue.qsize():返回當前佇列包含的訊息數量;
  • Queue.empty():如果佇列為空,返回True,反之False ;
  • Queue.full():如果佇列滿了,返回True,反之False;
  • Queue.get([block[, timeout]]):獲取佇列中的一條訊息,然後將其從列隊中移除,block預設值為True;

1)如果block使用預設值,且沒有設定timeout(單位秒),訊息列隊如果為空,此時程式將被阻塞(停在讀取狀態),直到從訊息列隊讀到訊息為止,如果設定了timeout,則會等待timeout秒,若還沒讀取到任何訊息,則丟擲"Queue.Empty"異常;

2)如果block值為False,訊息列隊如果為空,則會立刻丟擲"Queue.Empty"異常;

  • Queue.get_nowait():相當Queue.get(False);
  • Queue.put(item,[block[, timeout]]):將item訊息寫入佇列,block預設值為True;

1)如果block使用預設值,且沒有設定timeout(單位秒),訊息列隊如果已經沒有空間可寫入,此時程式將被阻塞(停在寫入狀態),直到從訊息列隊騰出空間為止,如果設定了timeout,則會等待timeout秒,若還沒空間,則丟擲"Queue.Full"異常;

2)如果block值為False,訊息列隊如果沒有空間可寫入,則會立刻丟擲"Queue.Full"異常;

  • Queue.put_nowait(item):相當Queue.put(item, False);
    程序池Pool
    當需要建立的子程序數量不多時,可以直接利用multiprocessing中的Process動態成生多個程序,但如果是上百甚至上千個目標,手動的去建立程序的工作量巨大,此時就可以用到multiprocessing模組提供的Pool方法。

初始化Pool時,可以指定一個最大程序數,當有新的請求提交到Pool中時,如果池還沒有滿,那麼就會建立一個新的程序用來執行該請求;但如果池中的程序數已經達到指定的最大值,那麼該請求就會等待,直到池中有程序結束,才會用之前的程序來執行新的任務,請看下面的例項:

# -*- coding:utf-8 -*-
from multiprocessing import Pool
import os, time, random

def worker(msg):
    t_start = time.time()
    print("%s開始執行,程序號為%d" % (msg,os.getpid()))
    # random.random()隨機生成0~1之間的浮點數
    time.sleep(random.random()*2) 
    t_stop = time.time()
    print(msg,"執行完畢,耗時%0.2f" % (t_stop-t_start))

po = Pool(3)  # 定義一個程序池,最大程序數3
for i in range(0,10):
    # Pool().apply_async(要呼叫的目標,(傳遞給目標的引數元祖,))
    # 每次迴圈將會用空閒出來的子程序去呼叫目標
    po.apply_async(worker,(i,))

print("----start----")
po.close()  # 關閉程序池,關閉後po不再接收新的請求
po.join()  # 等待po中所有子程序執行完成,必須放在close語句之後
print("-----end-----")

執行結果:

----start----
0開始執行,程序號為21466
1開始執行,程序號為21468
2開始執行,程序號為21467
0 執行完畢,耗時1.01
3開始執行,程序號為21466
2 執行完畢,耗時1.24
4開始執行,程序號為21467
3 執行完畢,耗時0.56
5開始執行,程序號為21466
1 執行完畢,耗時1.68
6開始執行,程序號為21468
4 執行完畢,耗時0.67
7開始執行,程序號為21467
5 執行完畢,耗時0.83
8開始執行,程序號為21466
6 執行完畢,耗時0.75
9開始執行,程序號為21468
7 執行完畢,耗時1.03
8 執行完畢,耗時1.05
9 執行完畢,耗時1.69
-----end-----

multiprocessing.Pool常用函式解析:

  • apply_async(func[, args[, kwds]])
    :使用非阻塞方式呼叫func(並行執行,堵塞方式必須等待上一個程序退出才能執行下一個程序),args為傳遞給func的引數列表,kwds為傳遞給func的關鍵字引數列表;
  • close():關閉Pool,使其不再接受新的任務;
  • terminate():不管任務是否完成,立即終止;
  • join():主程序阻塞,等待子程序的退出, 必須在close或terminate之後使用;
    程序池中的Queue
    如果要使用Pool建立程序,就需要使用multiprocessing.Manager()中的Queue(),而不是multiprocessing.Queue(),否則會得到一條如下的錯誤資訊:

RuntimeError: Queue objects should only be shared between processes through inheritance.

下面的例項演示了程序池中的程序如何通訊:

# -*- coding:utf-8 -*-

# 修改import中的Queue為Manager
from multiprocessing import Manager,Pool
import os,time,random

def reader(q):
    print("reader啟動(%s),父程序為(%s)" % (os.getpid(), os.getppid()))
    for i in range(q.qsize()):
        print("reader從Queue獲取到訊息:%s" % q.get(True))

def writer(q):
    print("writer啟動(%s),父程序為(%s)" % (os.getpid(), os.getppid()))
    for i in "itcast":
        q.put(i)

if __name__=="__main__":
    print("(%s) start" % os.getpid())
    q = Manager().Queue()  # 使用Manager中的Queue
    po = Pool()
    po.apply_async(writer, (q,))

    time.sleep(1)  # 先讓上面的任務向Queue存入資料,然後再讓下面的任務開始從中取資料

    po.apply_async(reader, (q,))
    po.close()
    po.join()
    print("(%s) End" % os.getpid())

執行結果:

(11095) start
writer啟動(11097),父程序為(11095)
reader啟動(11098),父程序為(11095)
reader從Queue獲取到訊息:i
reader從Queue獲取到訊息:t
reader從Queue獲取到訊息:c
reader從Queue獲取到訊息:a
reader從Queue獲取到訊息:s
reader從Queue獲取到訊息:t
(11095) End