day32 管道, 資料共享, 程序池, 回撥函式
阿新 • • 發佈:2018-12-01
一. 管道(程序間通訊)
#建立管道的類: Pipe([duplex]):在程序之間建立一條管道,並返回元組(conn1,conn2),其中conn1,conn2表示管道兩端的連線物件,強調一點:必須在產生Process物件之前產生管道 #引數介紹: dumplex:預設管道是全雙工的,如果將duplex射成False,conn1只能用於接收,conn2只能用於傳送。 #主要方法: conn1.recv():接收conn2.send(obj)傳送的物件。如果沒有訊息可接收,recv方法會一直阻塞。如果連線的另外一端已經關閉,那麼recv方法會丟擲EOFError。 conn1.send(obj):通過連線傳送物件。obj是與序列化相容的任意物件 #其他方法: conn1.close():關閉連線。如果conn1被垃圾回收,將自動呼叫此方法 conn1.fileno():返回連線使用的整數檔案描述符 conn1.poll([timeout]):如果連線上的資料可用,返回True。timeout指定等待的最長時限。如果省略此引數,方法將立即返回結果。如果將timeout射成None,操作將無限期地等待資料到達。
from multiprocessing import Process, Pipe
def f(conn):
conn.send("Hello 妹妹") #子程序傳送了訊息
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe() #建立管道,拿到管道的兩端,雙工通訊方式,兩端都可以收發訊息
p = Process(target=f, args=(child_conn,)) #將管道的一段給子程序
p.start() #開啟子程序
print (parent_conn.recv()) #主程序接受了訊息
p.join()
View Code
應該特別注意管道端點的正確管理問題。如果是生產者或消費者中都沒有使用管道的某個端點,就應將它關閉。這也說明了為何在生產者中關閉了管道的輸出端,在消費者中關閉管道的輸入端。如果忘記執行這些步驟,程式可能在消費者中的recv()操作上掛起(就是阻塞)。管道是由作業系統進行引用計數的,必須在所有程序中關閉管道的相同一端就會能生成EOFError異常。因此,在生產者中關閉管道不會有任何效果,除非消費者也關閉了相同的管道端點。
二. 資料共享: Manager模組
程序間應該儘量避免通訊,即便需要通訊,也應該選擇程序安全的工具來避免加鎖帶來的問題,應該儘量避免使用本節所講的共享資料的方式,以後我們會嘗試使用資料庫來解決程序之間的資料共享問題。
Manager預設加鎖
from multiprocessing import Process,Manager,Lock
def func(dic, loc):
with loc:
dic['num'] -= 1
if __name__ == '__main__':
m = Manager()
loc = Lock()
dic = m.dict({'num': 100})
p_list = []
for i in range(100):
p = Process(target=func, args=(dic, loc))
p_list.append(p)
p.start()
[pp.join() for pp in p_list]
print('>>>', dic['num'])
View Code
三. 程序池
Pool([numprocess [,initializer [, initargs]]]):建立程序池
numprocess:要建立的程序數,如果省略,將預設使用cpu_count()的值
initializer:是每個工作程序啟動時要執行的可呼叫物件,預設為None
initargs:是要傳給initializer的引數組
主要方法:
p.apply(func [ args [, kwargs]]):在一個池工作程序中執行func(*args,**kwargs), 然後返回結果。
'''需要強調的是:此操作並不會在所有池工作程序中並執行func函式。如果要通過不同引數併發地執行func函式,必須從不同執行緒呼叫p.apply()函式或者使用p.apply_async()'''
p.apply_async(func [, args [, kwargs]]):在一個池工作程序中執行func(*args,**kwargs), 然後返回結果。
'''此方法的結果是AsyncResult類的例項,callback是可呼叫物件,接收輸入引數。當func的結果變為可用時,將理解傳遞給callback。callback禁止執行任何阻塞操作,否則將接收其他非同步操作中的結果。'''
p.close():鎖定不是關閉程序池,防止進一步操作。如果所有操作持續掛起,它們將在工作程序終止前完成
P.join():等待所有工作程序退出。此方法只能在close()或teminate()之後呼叫
程序池的同步方法:
import time from multiprocessing import Process,Pool def func(i): num = 0 for j in range(5): num += i time.sleep(1) return num if __name__ == '__main__': pool = Pool(4) for i in range(10): res = pool.apply(func,args=(i,)) # print(res)View Code
程序池的非同步方法:
import time from multiprocessing import Process,Pool def func(i): num = 0 for j in range(5): num += i time.sleep(1) # print('>>>>>',num) return num if __name__ == '__main__': pool = Pool(4) red_list = [] for i in range(10): res = pool.apply_async(func,args=(i,)) # # print(res) red_list.append(res) # print(res.get()) # pool.close() #不是關閉程序池,只是鎖定 # pool.join() # 等待程序池中所有的任務執行完,但是無法確認是否所有的任務真的全部執行完了,前面要加close方法 for resss in red_list: print(resss.get())View Code
四. 回撥函式
import os from multiprocessing import Pool def func1(n): print('func1>>',os.getpid()) print('func1') return n*n def func2(nn): print('func2>>',os.getpid()) print('func2') print(nn) # import time # time.sleep(0.5) if __name__ == '__main__': print('主程序:',os.getpid()) p = Pool(5) #args裡面的10給了func1,func1的返回值作為回撥函式的引數給了callback對應的函式,不能直接給回撥函式直接傳引數,他只能是你任務函式func1的函式的返回值 # for i in range(10,20): #如果是多個程序來執行任務,那麼當所有子程序將結果給了回撥函式之後,回撥函式又是在主程序上執行的,那麼就會出現列印結果是同步的效果。我們上面func2裡面登出的時間模組開啟看看 # p.apply_async(func1,args=(i,),callback=func2) p.apply_async(func1,args=(10,),callback=func2) p.close() p.join() #結果 # 主程序: 11852 #發現回撥函式是在主程序中完成的,其實如果是在子程序中完成的,那我們直接將程式碼寫在子程序的任務函式func1裡面就行了,對不對,這也是為什麼稱為回撥函式的原因。 # func1>> 17332 # func1 # func2>> 11852 # func2 # 100View Code
import requests,json,os from multiprocessing import Pool def get_page(url): print('<程序%s> get %s' % (os.getpid(), url)) respone =requests.get(url) if respone.status_code == 200: return {'url': url, 'text': respone.txt} def parse_page(res): print('<程序%s> parse %s' % (os.getpid(), res['url'])) parse_res = 'url:<%s> size:[%s]\n' % (res['url'], len(res['text'])) with open('db.txt', 'a') as f: f.write(parse_res) if __name__ == '__main__': urls = [ 'https://www.baidu.com', 'https://www.python.org', 'https://www.openstack.org', 'https://help.github.com/', 'http://www.sina.com.cn/' ] p = Pool(3) res_l = [] for url in urls: res = p.apply_async(get_page, args=(url, ), callback=parse_page) res_l.append(res) p.close() p.join()多程序請求多個url
from multiprocessing import Pool import time,random import requests import re def get_page(url,pattern): response=requests.get(url) if response.status_code == 200: return (response.text,pattern) def parse_page(info): page_content,pattern=info res=re.findall(pattern,page_content) for item in res: dic={ 'index':item[0], 'title':item[1], 'actor':item[2].strip()[3:], 'time':item[3][5:], 'score':item[4]+item[5] } print(dic) if __name__ == '__main__': pattern1=re.compile(r'<dd>.*?board-index.*?>(\d+)<.*?title="(.*?)".*?star.*?>(.*?)<.*?releasetime.*?>(.*?)<.*?integer.*?>(.*?)<.*?fraction.*?>(.*?)<',re.S) url_dic={ 'http://maoyan.com/board/7':pattern1, } p=Pool() res_l=[] for url,pattern in url_dic.items(): res=p.apply_async(get_page,args=(url,pattern),callback=parse_page) res_l.append(res) for i in res_l: i.get()爬蟲案例
程序池和訊號量的區別:
程序池是多個需要被執行的任務在程序池外面排隊等待獲取程序物件去執行自己,而訊號量是一堆程序等待著去執行一段邏輯程式碼。
訊號量不能控制建立多少個程序,但是可以控制同時多少個程序能夠執行,但是程序池能控制你可以建立多少個程序。
舉例:就像那些開大車拉煤的,訊號量是什麼呢,就好比我只有五個車道,你每次只能過5輛車,但是不影響你建立100輛車,但是程序池相當於什麼呢?相當於你只有5輛車,每次5個車拉東西,拉完你再把車放回來,給別的人拉煤用。
其他語言裡面有更高階的程序池,在設定的時候,可以將程序池中的程序動態的創建出來,當需求增大的時候,就會自動在程序池中新增程序,需求小的時候,自動減少程序,並且可以設定程序數量的上線,最多為多,python裡面沒有。