python-進程池與線程池,協程
一、進程池與線程池
實現並發的手段有兩種,多線程和多進程。註:並發是指多個任務看起來是同時運行的。主要是切換+保存狀態。
當我們需要執行的並發任務大於cpu的核數時,我們需要知道一個操作系統不能無限的開啟進程和線程,通常有幾個核就開幾個進程,如果進程開啟過多,就無法充分利用cpu多核的優勢,效率反而會下降。這個時候就引入了進程池線程池的概念。
池的功能就是限制啟動的進程數或線程數
concurent.future模塊:
concurrent.futures模塊提供了高度封裝的異步調用接口
ProcessPoolExecutor: 進程池,提供異步調用
p = ProcessPoolExecutor(max_works)對於進程池如果不寫max_works:默認的是cpu的數目,默認是4個
ThreadPoolExecutor:線程池,提供異步調用
p = ThreadPoolExecutor(max_works)對於線程池如果不寫max_works:默認的是cpu的數目*5
補充:
提交任務的兩種方式:
# 同步調用:提交完一個任務之後,就在原地等待,等待任務完完整整地運行完畢拿到結果後,再執行下一行代碼,會導致任務是串行執行的
# 異步調用:提交完一個任務之後,不在原地等待,結果???,而是直接執行下一行代碼,會導致任務是並發執行的
進程池從無到有創建進程後,然後會固定使用進程池裏創建好的進程去執行所有任務,不會開啟其他進程
# 基本方法 #submit(fn, *args, **kwargs)異步提交任務 #map(func, *iterables, timeout=None, chunksize=1) 取代for循環submit的操作 #shutdown(wait=True) 相當於進程池的pool.close()+pool.join()操作 wait=True,等待池內所有任務執行完畢回收完資源後才繼續 wait=False,立即返回,並不會等待池內的任務執行完畢 但不管wait參數為何值,整個程序都會等到所有任務執行完畢 submit和map必須在shutdown之前 #result(timeout=None) 取得結果 #add_done_callback(fn)回調函數
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import time,random,os import requests def get(url): print(‘%s GET %s‘ %(os.getpid(),url)) time.sleep(3) response=requests.get(url) if response.status_code == 200: res=response.text else: res=‘下載失敗‘ return res def parse(future): time.sleep(1) res=future.result() print(‘%s 解析結果為%s‘ %(os.getpid(),len(res))) if __name__ == ‘__main__‘: urls=[ ‘https://www.baidu.com‘, ‘https://www.sina.com.cn‘, ‘https://www.tmall.com‘, ‘https://www.jd.com‘, ‘https://www.python.org‘, ‘https://www.openstack.org‘, ‘https://www.baidu.com‘, ‘https://www.baidu.com‘, ‘https://www.baidu.com‘, ] p=ProcessPoolExecutor(9) start=time.time() for url in urls: future=p.submit(get,url) # 異步調用:提交完一個任務之後,不在原地等待,而是直接執行下一行代碼,會導致任務是並發執行的,,結果futrue對象會在任務運行完畢後自動傳給回調函數 future.add_done_callback(parse) #parse會在任務運行完畢後自動觸發,然後接收一個參數future對象 p.shutdown(wait=True) print(‘主‘,time.time()-start) print(‘主‘,os.getpid())test
線程池與進程池相比 他們的同步執行和異步執行是一樣的:
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor from threading import current_thread import time,random,os import requests def get(url): print(‘%s GET %s‘ %(current_thread().name,url)) time.sleep(3) response=requests.get(url) if response.status_code == 200: res=response.text else: res=‘下載失敗‘ return res def parse(future): time.sleep(1) res=future.result() print(‘%s 解析結果為%s‘ %(current_thread().name,len(res))) if __name__ == ‘__main__‘: urls=[ ‘https://www.baidu.com‘, ‘https://www.sina.com.cn‘, ‘https://www.tmall.com‘, ‘https://www.jd.com‘, ‘https://www.python.org‘, ‘https://www.openstack.org‘, ‘https://www.baidu.com‘, ‘https://www.baidu.com‘, ‘https://www.baidu.com‘, ] p=ThreadPoolExecutor(4) for url in urls: future=p.submit(get,url) future.add_done_callback(parse) p.shutdown(wait=True) print(‘主‘,current_thread().name)test
map函數:
# 我們的那個p.submit(task,i)和map函數的原理類似。我們就 # 可以用map函數去代替。更減縮了代碼 from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor import os, time, random def task(n): print(‘[%s] is running‘ % os.getpid()) time.sleep(random.randint(1, 3)) # I/O密集型的,,一般用線程,用了進程耗時長 return n ** 2 if __name__ == ‘__main__‘: p = ProcessPoolExecutor() obj = p.map(task, range(10)) p.shutdown() # 相當於close和join方法 print(‘=‘ * 30) print(obj) # 返回的是一個叠代器 print(list(obj))View Code
回調函數(知乎):https://www.zhihu.com/question/19801131/answer/27459821
二、協程
在單線程的情況下實現並發。
遇到IO就切換就可以降低單線程的IO時間,從而最大限度地提升單線程的效率。
實現並發是讓多個任務看起來同時運行(切換+保存狀態),cpu在運行一個任務的時候,會在兩種情況下去執行其他的任務,一種是遇到了I/O操作,一種是計算時間過長。其中第二種情況使用線程並發並不能提升效率,運算密集型的並發反而會降低效率。
#串行執行 import time def func1(): for i in range(10000000): i+1 def func2(): for i in range(10000000): i+1 start = time.time() func1() func2() stop = time.time() print(stop - start)#1.675490379333496串行執行
#基於yield並發執行 import time def func1(): while True: print(‘func1‘) 100000+1 yield def func2(): g=func1() for i in range(10000000): print(‘func2‘) time.sleep(100) i+1 next(g) start=time.time() func2() stop=time.time() print(stop-start)基於yield並發執行
yield復習:
函數中只有有yield,這個函數就變成了一個生成器,調用函數不會執行函數體代碼,會得到一個返回值,返回值就是生成器對象。
def yield_test(n): for i in range(n): yield call(i) print("i=",i) #做一些其它的事情 print("do something.") print("end.") def call(i): return i*2 #使用for循環 for i in yield_test(5): print(i,",")test
協程的本質就是在單線程下,由用戶自己控制一個任務遇到IO操作就切換到另一個任務去執行,以此來提升效率。
Gevent:
gevent是第三方庫,通過greenlet實現協程,其基本思想是:
當一個greenlet遇到IO操作時,比如訪問網絡,就自動切換到其他的greenlet,等到IO操作完成,再在適當的時候切換回來繼續執行。由於IO操作非常耗時,經常使程序處於等待狀態,有了gevent為我們自動切換協程,就保證總有greenlet在運行,而不是等待IO。
由於切換是在IO操作時自動完成,所以gevent需要修改Python自帶的一些標準庫,這一過程在啟動時通過monkey patch完成:
我們用等待的時間模擬IO阻塞 在gevent模塊裏面要用gevent.sleep(5)表示等待的時間 要是我們想用time.sleep(),就要在最上面導入from gevent import monkey;monkey.patch_all()這句話 如果不導入直接用time.sleep(),就實現不了單線程並發的效果了
註:猴子補丁需要在第一行就運行
from gevent import monkey;monkey.patch_all() from gevent import spawn,joinall #pip3 install gevent import time def play(name): print(‘%s play 1‘ %name) time.sleep(5) print(‘%s play 2‘ %name) def eat(name): print(‘%s eat 1‘ %name) time.sleep(3) print(‘%s eat 2‘ %name) start=time.time() g1=spawn(play,‘lxx‘) g2=spawn(eat,‘lxx‘) # g1.join() # g2.join() joinall([g1,g2]) print(‘主‘,time.time()-start)test
gevent.spawn()”方法會創建一個新的greenlet協程對象,並運行它。”gevent.joinall()”方法會等待所有傳入的greenlet協程運行結束後再退出,這個方法可以接受一個”timeout”參數來設置超時時間,單位是秒。
在單線程內實現socket並發:
from gevent import monkey;monkey.patch_all() from socket import * from gevent import spawn def comunicate(conn): while True: # 通信循環 try: data = conn.recv(1024) if len(data) == 0: break conn.send(data.upper()) except ConnectionResetError: break conn.close() def server(ip, port, backlog=5): server = socket(AF_INET, SOCK_STREAM) server.bind((ip, port)) server.listen(backlog) while True: # 鏈接循環 conn, client_addr = server.accept() print(client_addr) # 通信 spawn(comunicate,conn) if __name__ == ‘__main__‘: g1=spawn(server,‘127.0.0.1‘,8080) g1.join()server
from threading import Thread,current_thread from socket import * def client(): client=socket(AF_INET,SOCK_STREAM) client.connect((‘127.0.0.1‘,8080)) n=0 while True: msg=‘%s say hello %s‘ %(current_thread().name,n) n+=1 client.send(msg.encode(‘utf-8‘)) data=client.recv(1024) print(data.decode(‘utf-8‘)) if __name__ == ‘__main__‘: for i in range(500): t=Thread(target=client) t.start()client
python-進程池與線程池,協程