python3程式設計基礎:多程序(二)建立程序
在之前的文章中對多程序的一些基礎概念,程序的生命週期和python程序操作的模組做了說明,本篇文章直接上程式碼,結束python中建立多程序的一些方法。
os.fork()(Linux)
fork()函式,只在Linux系統下存在。而且它非常特殊,普通的函式呼叫,呼叫一次,返回一次,但是fork()呼叫一次,返回兩次,因為作業系統自動把當前程序(稱為父程序)複製了一份(稱為子程序),然後分別在父程序和子程序內返回。子程序永遠返回0,而父程序返回子程序的PID。這樣一個父程序可以fork()出很多子程序,所以父程序要記下每個子程序的ID,而子程序只需要呼叫getppid()就可以拿到父程序的ID,呼叫os.getpid()函式可以獲取自己的程序號。
程式碼示例:os.fork()
import os
print (os.getpid())
pid = os.fork() # 建立一個子程序
print (pid) #子程序id和0
if pid == 0:
print ('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else:
print ('I (%s) just created a child process (%s).' % (os.getpid(), pid))
multiprocessing模組
multiprocessing模組就是跨平臺版本的多程序管理包,支援子程序、通訊和共享資料、執行不同形式的同步。該模組中有以下類/和方法:
Process類
Multiprocessing模組建立程序使用的是Process類。
Process類的構造方法:
init(self, group=None, target=None, name=None, args=(), kwargs={})
引數說明:
group:程序所屬組,基本不用。
target:表示呼叫物件,一般為函式。
args:表示呼叫物件的位置引數元組。
name:程序別名。
kwargs:表示呼叫物件的字典。
程式碼示例1:
#coding=utf-8 import multiprocessing def do(n) : #獲取當前執行緒的名字 name = multiprocessing.current_process().name print(name,'starting') print("worker ", n) return if __name__ == '__main__' : numList = [] for i in range(8) : p = multiprocessing.Process(target=do, args=(i,)) numList.append(p) p.start()#就緒狀態 #子程序執行完畢了才會執行主程序後面的語句。p程序通過join方法通知主程序死等我結束再繼續執行。 print("Process end.") for i in numList: i.join()#每個程序執行結束才會開始下一個迴圈 print(numList)#5個程序全部執行完畢才執行print語句,也就是主程序死等
程式碼示例2:
# -*- coding: utf-8 -*-
from multiprocessing import Process
import os
import time
def sleeper(name, seconds):
print("Process ID# %s" % (os.getpid())) #獲取當前程序ID
print("Parent Process ID# %s" % (os.getppid())) #獲取父程序ID
print("%s will sleep for %s seconds" % (name, seconds))
time.sleep(seconds)
if __name__ == "__main__":
child_proc = Process(target = sleeper, args = ('bob', 5))
child_proc.start()
print("in parent process after child process start")
print("parent process about to join child process")
child_proc.join()
print("in parent process after child process join" )
print("the parent's parent process: %s" % (os.getppid()))
程式碼示例3: 多程序模板程式
#coding=utf-8
import urllib.request
import time
import multiprocessing
def func1(url) :
response = urllib.request.urlopen(url)
html = response.read()
print(html[0:20])
time.sleep(1)
def func2(url) :
response = urllib.request.urlopen(url)
html = response.read()
print(html[0:20])
time.sleep(1)
if __name__ == '__main__' :
p1 = multiprocessing.Process(target=func1,args=("http://www.sogou.com",),name="gloryroad1")
p2 = multiprocessing.Process(target=func2,args=("http://www.baidu.com",),name="gloryroad2")
p1.start()
p2.start()
p1.join()
p2.join()
time.sleep(1)
print("done!")
程式碼示例4: 單程序和多程序的執行效率對比
#coding: utf-8
import multiprocessing
import time
def m1(x):
time.sleep(0.05)
return x * x
if __name__ == '__main__':
pool = multiprocessing.Pool(multiprocessing.cpu_count())
i_list = range(1000)
time1=time.time()
pool.map(m1, i_list)
time2=time.time()
print('time elapse:',time2-time1)
time1=time.time()
list(map(m1, i_list))
time2=time.time()
print('time elapse:',time2-time1)
程序池Pool
Pool類可以提供指定數量(一般為CPU的核數)的程序供使用者呼叫,當有新的請求提交到Pool中時,如果池還沒有滿,就會建立一個新的程序來執行請求。如果池滿,請求就會告知先等待,直到池中有程序結束,才會建立新的程序來執行這些請求。
如果操作的物件數目上百個甚至更多,那手動去限制程序數量就顯得特別的繁瑣,此時就可以交給程序池自動管理多個程序。
注意:程序池中的程序是不能共享佇列和資料的,而Process生成的子程序可以共享佇列
Pool類中常用方法
- apply():
函式原型:apply(func[, args=()[, kwds={}]])
該函式用於傳遞不定引數,主程序會被阻塞直到函式執行結束(不建議使用,並且3.x以後不再使用)。阻塞的,和單程序沒有什麼區別 - apply_async():
函式原型:apply_async(func, args=(), kwds={}, callback=None, error_callback=None)
非同步非阻塞的,不用等待當前程序執行完畢,隨時根據系統排程來進行程序切換(建議使用)
注意:apply_async每次只能提交一個程序的請求 - map()
函式原型:map(func, iterable[, chunksize=None])
Pool類中的map方法,與內建的map函式用法行為基本一致,它會使程序阻塞直到返回結果。
注意:
雖然第二個引數是一個迭代器,但在實際使用中,必須在整個佇列都就緒後,程式才會執行子程序;
map返回的是一個列表,由func函式的返回值組成 - close()
關閉程序池(Pool),使其不再接受新的任務。 - terminate()
立刻結束工作程序,不再處理未處理的任務 - join()
使主程序阻塞等待子程序的退出,join方法必須在close或terminate之後使用。
程式碼示例1:簡單的程序池apply_async+map
#encoding =utf-8
import time
import multiprocessing
def mul(x):
return x*x
if __name__ =="__main__":
pool = multiprocessing.Pool(multiprocessing.cpu_count())# start 4 worker processes
result = pool.apply_async(mul, [10])
print(result)#multiprocessing.pool.ApplyResult object
print(dir(result))
print(result.get(timeout = 1))#用get方法取出返回結果
print(pool.map(mul,range(10)))
程式碼示例2:map藉助類實現傳遞多個引數的情況
from multiprocessing import Pool
def f(object):
return object.x * object.y
class A:
def __init__(self,a,b):
self.x =a
self.y =b
if __name__ == '__main__':
pool = Pool(processes = 4) # start 4 worker processes
params = [A(i,i) for i in range(10)]
print(pool.map(f,params)) # prints "[0, 1, 4,..., 81]"
說明:呼叫map的時候不需要先執行close關閉程序池,但是join不呼叫close的話會報錯
程式碼示例3:多程序與單程序執行時間比較
#encoding=utf-8
import time
from multiprocessing import Pool
import os,multiprocessing
def run(num):
time.sleep(1)
return num * num
if __name__ == "__main__":
testList = [1,2,3,4,5,6,7]
print('單程序執行')#順序執行
t1 = time.time()
for i in testList:
run(i)
t2 = time.time()
print('順序執行的時間為:',int(t2-t1))
print('多程序執行')#並行執行
pool = Pool(multiprocessing.cpu_count())#建立擁有4個程序數量的程序池
result = pool.map(run,testList)
pool.close()#關閉程序池,不再接受新的任務
pool.join()#主程序阻塞等待子程序的退出
t3 = time.time()
print('並行執行的時間為:',int(t3-t2))
print(result)
說明:
併發執行的時間明顯比順序執行要快很多,但是程序是要耗資源的,所以程序數也不能開太大。
程式中的result表示全部程序執行結束後全域性的返回結果集,run函式有返回值,所以一個程序對應一個返回結果,這個結果存在一個列表中,也就是一個結果堆中,實際上是用了佇列的原理,等待所有程序都執行完畢,就返回這個列表(列表的順序不定)。
對Pool物件呼叫join()方法會等待所有子程序執行完畢,呼叫join()之前必須先呼叫close(),讓其不再接受新的Process。
程式碼示例4:程序池中的程序不能共享佇列
#encoding=utf-8
import time
from multiprocessing import Pool,Queue,Process
def func( q):
print('*'*30)
q.put('1111')
if __name__ == "__main__":
queue = Queue() #直接用multiprocessing.Queue生成佇列
pool = Pool(4)#生成一個容量為4的程序池
for i in range(5):
pool.apply_async(func, (queue,))#向程序池提交目標請求
pool.close()
pool.join()
print(queue.qsize())
說明:
print(queue.qsize())輸出的結果是0,而且func函式中的print也沒有執行,如果不傳入q的話,func函式就能正確執行,原因是程序池中每個程序都有自己獨立的佇列,是不共享的,解決方法有兩種:一是用Process生成程序,二是用multiprocessing.Manager.Queue()生成共享佇列。修改後的程式碼如下:
方法一:Process生成程序
#encoding=utf-8
import time
from multiprocessing import Pool,Queue,Process
import multiprocessing
def func( q):
print('*'*30)
q.put('1111')
if __name__ == "__main__":
queue = Queue()
p_list = []
for i in range(5):
p = Process(target = func, args=(queue,))
p_list.append(p)
for p in p_list:
p.start()
for p in p_list:
p.join()
print(queue.qsize())
print(queue.get())
方法二:multiprocessing.Manager.Queue()
#encoding=utf-8
import time
from multiprocessing import Pool,Queue,Process
import multiprocessing
def func( q):
print('*'*30)
q.put('1111')
if __name__ == "__main__":
m = multiprocessing.Manager()
queue = m.Queue()
pool = Pool(4)#生成一個容量為4的程序池
for i in range(5):
pool.apply_async(func, (queue,))#向程序池提交目標請求
pool.close()
pool.join()
print(queue.qsize())