多工(程序, 程序間通訊-Queue ,程序池)
1. 程序
-
程式:例如xxx.py這是程式,是一個靜態的
-
程序:一個程式執行起來後,程式碼+用到的資源 稱之為程序,它是作業系統分配資源的基本單元。
不僅可以通過執行緒完成多工,程序也是可以的
2. 程序的狀態
工作中,任務數往往大於cpu的核數,即一定有一些任務正在執行,而另外一些任務在等待cpu進行執行,因此導致了有了不同的狀態
-
就緒態:執行的條件都已經慢去,正在等在cpu執行
-
執行態:cpu正在執行其功能
-
等待態:等待某些條件滿足,例如一個程式sleep了,此時就處於等待態
程序的建立-multiprocessing
multiprocessing模組就是跨平臺版本的多程序模組,提供了一個Process類來代表一個程序物件,這個物件可以理解為是一個獨立的程序,可以執行另外的事情
# -*- coding:utf-8 -*- from multiprocessing import Process import time def run_proc(): """子程序要執行的程式碼""" while True: print("----2----") time.sleep(1) if __name__=='__main__': p = Process(target=run_proc) p.start() while True: print("----1----") time.sleep(1)
說明:
建立子程序時,只需要傳入一個執行函式和函式的引數,建立一個Process例項,用start()方法啟動
程序pid:
# -*- coding:utf-8 -*- from multiprocessing import Process import os import time def run_proc(): """子程序要執行的程式碼""" print('子程序執行中,pid=%d...' % os.getpid()) # os.getpid獲取當前程序的程序號 print('子程序將要結束...') if __name__ == '__main__': print('父程序pid: %d' % os.getpid()) # os.getpid獲取當前程序的程序號 p = Process(target=run_proc) p.start()
*** Process語法結構如下:***
Process([group [, target [, name [, args [, kwargs]]]]])
- target:如果傳遞了函式的引用,可以任務這個子程序就執行這裡的程式碼
- args:給target指定的函式傳遞的引數,以元組的方式傳遞
- kwargs:給target指定的函式傳遞命名引數
- name:給程序設定一個名字,可以不設定
- group:指定程序組,大多數情況下用不到
Process建立的例項物件的常用方法:
- start():啟動子程序例項(建立子程序)
- is_alive():判斷程序子程序是否還在活著
- join([timeout]):是否等待子程序執行結束,或等待多少秒
- terminate():不管任務是否完成,立即終止子程序
Process建立的例項物件的常用屬性:
- name:當前程序的別名,預設為Process-N,N為從1開始遞增的整數
- pid:當前程序的pid(程序號)
給子程序指定的函式傳遞引數
# -*- coding:utf-8 -*-
from multiprocessing import Process
import os
from time import sleep
def run_proc(name, age, **kwargs):
for i in range(10):
print('子程序執行中,name= %s,age=%d ,pid=%d...' % (name, age, os.getpid()))
print(kwargs)
sleep(0.2)
if __name__=='__main__':
p = Process(target=run_proc, args=('test',18), kwargs={"m":20})
p.start()
sleep(1) # 1秒中之後,立即結束子程序
p.terminate()
p.join()
執行結果:
子程序執行中,name= test,age=18 ,pid=45097...
{'m': 20}
子程序執行中,name= test,age=18 ,pid=45097...
{'m': 20}
子程序執行中,name= test,age=18 ,pid=45097...
{'m': 20}
子程序執行中,name= test,age=18 ,pid=45097...
{'m': 20}
子程序執行中,name= test,age=18 ,pid=45097...
{'m': 20}
程序間不同享全域性變數
# -*- coding:utf-8 -*-
from multiprocessing import Process
import os
import time
nums = [11, 22]
def work1():
"""子程序要執行的程式碼"""
print("in process1 pid=%d ,nums=%s" % (os.getpid(), nums))
for i in range(3):
nums.append(i)
time.sleep(1)
print("in process1 pid=%d ,nums=%s" % (os.getpid(), nums))
def work2():
"""子程序要執行的程式碼"""
print("in process2 pid=%d ,nums=%s" % (os.getpid(), nums))
if __name__ == '__main__':
p1 = Process(target=work1)
p1.start()
p1.join()
p2 = Process(target=work2)
p2.start()
執行結果:
in process1 pid=11349 ,nums=[11, 22]
in process1 pid=11349 ,nums=[11, 22, 0]
in process1 pid=11349 ,nums=[11, 22, 0, 1]
in process1 pid=11349 ,nums=[11, 22, 0, 1, 2]
in process2 pid=11350 ,nums=[11, 22]
1 . 程序間通訊-Queue
可以使用multiprocessing模組的Queue實現多程序之間的資料傳遞,Queue本身是一個訊息列隊程式,首先用一個小例項來演示一下Queue的工作原理:
#coding=utf-8
from multiprocessing import Queue
q=Queue(3) #初始化一個Queue物件,最多可接收三條put訊息
q.put("訊息1")
q.put("訊息2")
print(q.full()) #False
q.put("訊息3")
print(q.full()) #True
#因為訊息列隊已滿下面的try都會丟擲異常,第一個try會等待2秒後再丟擲異常,第二個Try會立刻丟擲異常
try:
q.put("訊息4",True,2)
except:
print("訊息列隊已滿,現有訊息數量:%s"%q.qsize())
try:
q.put_nowait("訊息4")
except:
print("訊息列隊已滿,現有訊息數量:%s"%q.qsize())
#推薦的方式,先判斷訊息列隊是否已滿,再寫入
if not q.full():
q.put_nowait("訊息4")
#讀取訊息時,先判斷訊息列隊是否為空,再讀取
if not q.empty():
for i in range(q.qsize()):
print(q.get_nowait())
執行結果:
False
True
訊息列隊已滿,現有訊息數量:3
訊息列隊已滿,現有訊息數量:3
訊息1
訊息2
訊息3
說明
初始化Queue()物件時(例如:q=Queue()),若括號中沒有指定最大可接收的訊息數量,或數量為負值,那麼就代表可接受的訊息數量沒有上限(直到記憶體的盡頭);
- Queue.qsize():返回當前佇列包含的訊息數量;
- Queue.empty():如果佇列為空,返回True,反之False ;
- Queue.full():如果佇列滿了,返回True,反之False;
- Queue.get([block[, timeout]]):獲取佇列中的一條訊息,然後將其從列隊中移除,block預設值為True;
1)如果block使用預設值,且沒有設定timeout(單位秒),訊息列隊如果為空,此時程式將被阻塞(停在讀取狀態),直到從訊息列隊讀到訊息為止,如果設定了timeout,則會等待timeout秒,若還沒讀取到任何訊息,則丟擲"Queue.Empty"異常;
2)如果block值為False,訊息列隊如果為空,則會立刻丟擲"Queue.Empty"異常;
- Queue.get_nowait():相當Queue.get(False);
- Queue.put(item,[block[, timeout]]):將item訊息寫入佇列,block預設值為True;
1)如果block使用預設值,且沒有設定timeout(單位秒),訊息列隊如果已經沒有空間可寫入,此時程式將被阻塞(停在寫入狀態),直到從訊息列隊騰出空間為止,如果設定了timeout,則會等待timeout秒,若還沒空間,則丟擲"Queue.Full"異常;
2)如果block值為False,訊息列隊如果沒有空間可寫入,則會立刻丟擲"Queue.Full"異常;
- Queue.put_nowait(item):相當Queue.put(item, False);
程序池Pool
當需要建立的子程序數量不多時,可以直接利用multiprocessing中的Process動態成生多個程序,但如果是上百甚至上千個目標,手動的去建立程序的工作量巨大,此時就可以用到multiprocessing模組提供的Pool方法。
初始化Pool時,可以指定一個最大程序數,當有新的請求提交到Pool中時,如果池還沒有滿,那麼就會建立一個新的程序用來執行該請求;但如果池中的程序數已經達到指定的最大值,那麼該請求就會等待,直到池中有程序結束,才會用之前的程序來執行新的任務,請看下面的例項:
# -*- coding:utf-8 -*-
from multiprocessing import Pool
import os, time, random
def worker(msg):
t_start = time.time()
print("%s開始執行,程序號為%d" % (msg,os.getpid()))
# random.random()隨機生成0~1之間的浮點數
time.sleep(random.random()*2)
t_stop = time.time()
print(msg,"執行完畢,耗時%0.2f" % (t_stop-t_start))
po = Pool(3) # 定義一個程序池,最大程序數3
for i in range(0,10):
# Pool().apply_async(要呼叫的目標,(傳遞給目標的引數元祖,))
# 每次迴圈將會用空閒出來的子程序去呼叫目標
po.apply_async(worker,(i,))
print("----start----")
po.close() # 關閉程序池,關閉後po不再接收新的請求
po.join() # 等待po中所有子程序執行完成,必須放在close語句之後
print("-----end-----")
執行結果:
----start----
0開始執行,程序號為21466
1開始執行,程序號為21468
2開始執行,程序號為21467
0 執行完畢,耗時1.01
3開始執行,程序號為21466
2 執行完畢,耗時1.24
4開始執行,程序號為21467
3 執行完畢,耗時0.56
5開始執行,程序號為21466
1 執行完畢,耗時1.68
6開始執行,程序號為21468
4 執行完畢,耗時0.67
7開始執行,程序號為21467
5 執行完畢,耗時0.83
8開始執行,程序號為21466
6 執行完畢,耗時0.75
9開始執行,程序號為21468
7 執行完畢,耗時1.03
8 執行完畢,耗時1.05
9 執行完畢,耗時1.69
-----end-----
multiprocessing.Pool常用函式解析:
- apply_async(func[, args[, kwds]])
:使用非阻塞方式呼叫func(並行執行,堵塞方式必須等待上一個程序退出才能執行下一個程序),args為傳遞給func的引數列表,kwds為傳遞給func的關鍵字引數列表; - close():關閉Pool,使其不再接受新的任務;
- terminate():不管任務是否完成,立即終止;
- join():主程序阻塞,等待子程序的退出, 必須在close或terminate之後使用;
程序池中的Queue
如果要使用Pool建立程序,就需要使用multiprocessing.Manager()中的Queue(),而不是multiprocessing.Queue(),否則會得到一條如下的錯誤資訊:
RuntimeError: Queue objects should only be shared between processes through inheritance.
下面的例項演示了程序池中的程序如何通訊:
# -*- coding:utf-8 -*-
# 修改import中的Queue為Manager
from multiprocessing import Manager,Pool
import os,time,random
def reader(q):
print("reader啟動(%s),父程序為(%s)" % (os.getpid(), os.getppid()))
for i in range(q.qsize()):
print("reader從Queue獲取到訊息:%s" % q.get(True))
def writer(q):
print("writer啟動(%s),父程序為(%s)" % (os.getpid(), os.getppid()))
for i in "itcast":
q.put(i)
if __name__=="__main__":
print("(%s) start" % os.getpid())
q = Manager().Queue() # 使用Manager中的Queue
po = Pool()
po.apply_async(writer, (q,))
time.sleep(1) # 先讓上面的任務向Queue存入資料,然後再讓下面的任務開始從中取資料
po.apply_async(reader, (q,))
po.close()
po.join()
print("(%s) End" % os.getpid())
執行結果:
(11095) start
writer啟動(11097),父程序為(11095)
reader啟動(11098),父程序為(11095)
reader從Queue獲取到訊息:i
reader從Queue獲取到訊息:t
reader從Queue獲取到訊息:c
reader從Queue獲取到訊息:a
reader從Queue獲取到訊息:s
reader從Queue獲取到訊息:t
(11095) End