程序(六):程序池(Pool)
目錄
程序池
為什麼要有程序池?程序池的概念。
在程式實際處理問題過程中,忙時會有成千上萬的任務需要被執行,閒時可能只有零星任務。那麼在成千上萬個任務需要被執行的時候,我們就需要去建立成千上萬個程序麼?首先,建立程序需要消耗時間,銷燬程序也需要消耗時間。第二即便開啟了成千上萬的程序,作業系統也不能讓他們同時執行,這樣反而會影響程式的效率。因此我們不能無限制的根據任務開啟或者結束程序。那麼我們要怎麼做呢?
在這裡,要給大家介紹一個程序池的概念,定義一個池子,在裡面放上固定數量的程序,有需求來了,就拿一個池中的程序來處理任務,等到處理完畢,程序並不關閉,而是將程序再放回程序池中繼續等待任務。如果有很多工需要執行,池中的程序數量不夠,任務就要等待之前的程序執行任務完畢歸來,拿到空閒程序才能繼續執行。也就是說,池中程序的數量是固定的,那麼同一時間最多有固定數量的程序在執行。這樣不會增加作業系統的排程難度,還節省了開閉程序的時間,也一定程度上能夠實現併發效果。
multiprocess.Pool模組
概念介紹
Pool([numprocess [,initializer [, initargs]]]):建立程序池
引數介紹
1 numprocess:要建立的程序數,如果省略,將預設使用cpu_count()的值
2 initializer:是每個工作程序啟動時要執行的可呼叫物件,預設為None
3 initargs:是要傳給initializer(可迭代)的引數組
主要方法
1、p.apply(func [, args [, kwargs]]):在一個池工作程序中執行func(*args,**kwargs),然後返回結果。
'''需要強調的是:此操作並不會在所有池工作程序中並執行func函式。如果要通過不同引數併發地執行func函式,必須從不同執行緒呼叫p.apply()函式或者使用p.apply_async()'''
2、p.apply_async(func [, args [, kwargs]]):在一個池工作程序中執行func(*args,**kwargs),然後返回結果。
'''此方法的結果是AsyncResult類的例項,callback是可呼叫物件,接收輸入引數。當func的結果變為可用時,將理解傳遞給callback。callback禁止執行任何阻塞操作,否則將接收其他非同步操作中的結果。'''
3、p.close():關閉程序池,防止進一步操作。如果所有操作持續掛起,它們將在工作程序終止前完成
4、P.jion():等待所有工作程序退出。此方法只能在close()或teminate()之後呼叫
其他方法(瞭解)
方法apply_async()和map_async()的返回值是AsyncResul的例項obj。例項具有以下方法
●obj.get():返回結果,如果有必要則等待結果到達。timeout是可選的。如果在指定時間內還沒有到達,將引發一場。如果遠端操作中引發了異常,它將在呼叫此方法時再次被引發。
●obj.ready():如果呼叫完成,返回True
●obj.successful():如果呼叫完成且沒有引發異常,返回True,如果在結果就緒之前呼叫此方法,引發異常
●obj.wait([timeout]):等待結果變為可用。
●obj.terminate():立即終止所有工作程序,同時不執行任何清理或結束任何掛起工作。如果p被垃圾回收,將自動呼叫此函式
程式碼例項
程序池和多程序效率對比
import time
from multiprocessing import Pool, Process
def func(n):
for i in range(10):
print(n + 1)
if __name__ == '__main__':
start = time.time()
pool = Pool(5) # 5個程序池
pool.map(func, range(100)) # 100個任務
t1 = time.time() - start
start = time.time()
p_lst = []
for i in range(100):
p = Process(target=func, args=(i,)) #100個程序的任務
p_lst.append(p)
p.start()
for p in p_lst: p.join()
t2 = time.time() - start
print(t1, t2)
同步和非同步
#程序池的同步呼叫
import os,time
from multiprocessing import Pool
def work(n):
print('%s run' %os.getpid())
time.sleep(3)
return n**2
if __name__ == '__main__':
p=Pool(3) #程序池中從無到有建立三個程序,以後一直是這三個程序在執行任務
res_l=[]
for i in range(10):
res=p.apply(work,args=(i,)) # 同步呼叫,直到本次任務執行完畢拿到res,等待任務work執行的過程中可能有阻塞也可能沒有阻塞
# 但不管該任務是否存在阻塞,同步呼叫都會在原地等著
print(res_l)
#程序池的非同步呼叫
import os
import time
import random
from multiprocessing import Pool
def work(n):
print('%s run' %os.getpid())
time.sleep(random.random())
return n**2
if __name__ == '__main__':
p=Pool(3) #程序池中從無到有建立三個程序,以後一直是這三個程序在執行任務
res_l=[]
for i in range(10):
res=p.apply_async(work,args=(i,)) # 非同步執行,根據程序池中有的程序數,每次最多3個子程序在非同步執行
# 返回結果之後,將結果放入列表,歸還程序,之後再執行新的任務
# 需要注意的是,程序池中的三個程序不會同時開啟或者同時結束
# 而是執行完一個就釋放一個程序,這個程序就去接收新的任務。
res_l.append(res)
# 非同步apply_async用法:如果使用非同步提交的任務,主程序需要使用jion,等待程序池內任務都處理完,然後可以用get收集結果
# 否則,主程序結束,程序池可能還沒來得及執行,也就跟著一起結束了
p.close()
p.join()
for res in res_l:
print(res.get()) #使用get來獲取apply_aync的結果,如果是apply,則沒有get方法,因為apply是同步執行,立刻獲取結果,也根本無需get
練習
#server:程序池版socket併發聊天
#Pool內的程序數預設是cpu核數,假設為4(檢視方法os.cpu_count())
#開啟6個客戶端,會發現2個客戶端處於等待狀態
#在每個程序內檢視pid,會發現pid使用為4個,即多個客戶端公用4個程序
from socket import *
from multiprocessing import Pool
import os
server=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind(('127.0.0.1',8080))
server.listen(5)
def talk(conn):
print('程序pid: %s' %os.getpid())
while True:
try:
msg=conn.recv(1024)
if not msg:break
conn.send(msg.upper())
except Exception:
break
if __name__ == '__main__':
p=Pool(4)
while True:
conn,*_=server.accept()
p.apply_async(talk,args=(conn,))
# p.apply(talk,args=(conn,client_addr)) #同步的話,則同一時間只有一個客戶端能訪問
#client
from socket import *
client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))
while True:
msg=input('>>: ').strip()
if not msg:continue
client.send(msg.encode('utf-8'))
msg=client.recv(1024)
print(msg.decode('utf-8'))
發現:併發開啟多個客戶端,服務端同一時間只有4個不同的pid,只能結束一個客戶端,另外一個客戶端才會進來.
回撥函式
需要回調函式的場景:程序池中任何一個任務一旦處理完了,就立即告知主程序:我好了額,你可以處理我的結果了。主程序則呼叫一個函式去處理該結果,該函式即回撥函式
我們可以把耗時間(阻塞)的任務放到程序池中,然後指定回撥函式(主程序負責執行),這樣主程序在執行回撥函式時就省去了I/O的過程,直接拿到的是任務的結果。
#使用多程序請求多個url來減少網路等待浪費的時間
import requests
from urllib.request import urlopen
from multiprocessing import Pool
# 200 網頁正常的返回
# 404 網頁找不到
# 502 504
def get(url):
response = requests.get(url)
if response.status_code == 200:
return url, response.content.decode('utf-8')
def get_urllib(url):
ret = urlopen(url)
return ret.read().decode('utf-8')
def call_back(args):
url, content = args
print(url, len(content))
if __name__ == '__main__':
url_lst = [
'https://www.cnblogs.com/',
'http://www.baidu.com',
'https://www.sogou.com/',
'http://www.sohu.com/',
]
p = Pool(5)
for url in url_lst:
p.apply_async(get, args=(url,), callback=call_back)
p.close()
p.join()
爬蟲例項
import re
from urllib.request import urlopen
from multiprocessing import Pool
def get_page(url,pattern):
response=urlopen(url).read().decode('utf-8')
return pattern,response
def parse_page(info):
pattern,page_content=info
res=re.findall(pattern,page_content)
for item in res:
dic={
'index':item[0].strip(),
'title':item[1].strip(),
'actor':item[2].strip(),
'time':item[3].strip(),
}
print(dic)
if __name__ == '__main__':
regex = r'<dd>.*?<.*?class="board-index.*?>(\d+)</i>.*?title="(.*?)".*?class="movie-item-info".*?<p class="star">(.*?)</p>.*?<p class="releasetime">(.*?)</p>'
pattern1=re.compile(regex,re.S)
url_dic={
'http://maoyan.com/board/7':pattern1,
}
p=Pool()
res_l=[]
for url,pattern in url_dic.items():
res=p.apply_async(get_page,args=(url,pattern),callback=parse_page)
res_l.append(res)
for i in res_l:
i.get()
如果在主程序中等待程序池中所有任務都執行完畢後,再統一處理結果,則無需回撥函式
#無需回撥函式
from multiprocessing import Pool
import time,random,os
def work(n):
time.sleep(1)
return n**2
if __name__ == '__main__':
p=Pool()
res_l=[]
for i in range(10):
res=p.apply_async(work,args=(i,))
res_l.append(res)
p.close()
p.join() #等待程序池中所有程序執行完畢
nums=[]
for res in res_l:
nums.append(res.get()) #拿到所有結果
print(nums) #主程序拿到所有的處理結果,可以在主程序中進行統一進行處理