Python入門學習-DAY37-進程池與線程池、協程、gevent模塊
阿新 • • 發佈:2018-09-10
在線 ces pro alt 18C name bcb 所有 __name__
一、進程池與線程池
基本使用:
進程池和線程池操作一樣
提交任務的兩種方式:
同步調用:提交完一個任務之後,就在原地等待,等待任務完完整整地運行完畢拿到結果後,再執行下一行代碼,會導致任務是串行執行的
異步調用:提交完一個任務之後,不在原地等待,結果???,而是直接執行下一行代碼,會導致任務是並發執行的
同步調用
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor import time,random,os def task(): print(‘%s is running‘%os.getpid()) iView Code=random.randint(1,3) time.sleep(i) return i if __name__ == ‘__main__‘: p=ProcessPoolExecutor(4) l=[] for i in range(10): res = p.submit(task).result()#等待任務執行完畢,返回結果 print(res) print(‘主‘)
異步調用
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutorView Codeimport time,random,os def task(): print(‘%s is running‘%os.getpid()) i=random.randint(1,3) time.sleep(i) return i if __name__ == ‘__main__‘: p=ProcessPoolExecutor(4) l=[] for i in range(10): future=p.submit(task)#只替提交任務 l.append(future) p.shutdown(wait=True)#關閉進程池入口,並在原地等待所有進程任務執行完畢 for i in l: print(i.result()) print(‘主‘)
異步 + 回調函數
from concurrent.futures import ProcessPoolExecutor import time,os import requests def get(url): print(‘%s GET %s‘ %(os.getpid(),url)) time.sleep(3) response=requests.get(url) if response.status_code == 200: res=response.text else: res=‘下載失敗‘ return res def parse(future): time.sleep(1) res=future.result() print(‘%s 解析結果為%s‘ %(os.getpid(),len(res))) if __name__ == ‘__main__‘: urls=[ ‘https://www.baidu.com‘, ‘https://www.sina.com.cn‘, ‘https://www.tmall.com‘, ‘https://www.jd.com‘, ‘https://www.python.org‘, ‘https://www.openstack.org‘, ‘https://www.baidu.com‘, ‘https://www.baidu.com‘, ‘https://www.baidu.com‘, ] p=ProcessPoolExecutor(9) start=time.time() for url in urls: future=p.submit(get,url) future.add_done_callback(parse) #parse會在任務運行完畢後自動觸發,然後接收一個參數future對象,回調函數的執行是在主進程裏,而線程中的回調函數是由空閑的線程來執行 p.shutdown(wait=True) print(‘主‘,time.time()-start) print(‘主‘,os.getpid())View Code
基於線程池的套接字通訊
服務端
from concurrent.futures import ThreadPoolExecutor import socket from threading import current_thread IP=‘127.0.0.1‘ PORT=8085 ADDRESS=(IP,PORT) BUFFSIZE=1024 t = ThreadPoolExecutor(4) def communicate(conn,addr): while True: try: data=conn.recv(BUFFSIZE) if not data: print(‘%s客戶端斷開....‘%addr) break print(‘>>>>%s 端口:%s 線程:%s‘%(data.decode(‘utf-8‘),addr[1],current_thread().name)) conn.send(data.upper()) except ConnectionResetError: break conn.close() if __name__ == ‘__main__‘: server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.bind(ADDRESS) server.listen(2) print(current_thread().name) while True: conn,addr=server.accept() t.submit(communicate, conn,addr)View Code
客戶端
import socket IP=‘127.0.0.1‘ PORT=8085 ADDRESS=(IP,PORT) BUFFSIZE=1024 client=socket.socket(socket.AF_INET,socket.SOCK_STREAM) client.connect(ADDRESS) while True: msg=input(‘>>>>‘).strip() if len(msg)==0:continue if msg==‘q‘:break client.send(msg.encode(‘utf-8‘)) data = client.recv(BUFFSIZE) print(data.decode(‘utf-8‘)) client.close()View Code
二、協程
1. 目標:
在線程下實現並發
並發(多個任務看起來是同時執行就是並發):切換+保存狀態
2. 協程:
協程是單線程實現並發
註意:協程是程序員意淫出來的東西,操作系統裏只有進程和線程的概念(操作系統調度的是線程)
在單線程下實現多個任務間遇到IO就切換就可以降低單線程的IO時間,從而最大限度地提升單線程的效率
串行執行
import time def func1(): for i in range(10000000): i+1 def func2(): for i in range(10000000): i+1 start = time.time() func1() func2() stop = time.time() print(stop - start)#1.9774692058563232sView Code
基於yield並發執行
import time def func1(): while True: print(‘func1‘) yield def func2(): g=func1() for i in range(1000): print(‘func2‘) i+1 next(g) start=time.time() func2() stop=time.time() print(stop-start)#0.014994382858276367sView Code
三、gevent模塊
1.使用
from gevent import monkey;monkey.patch_all()#用來識別IO阻塞,必須放到文件頭 from gevent import spawn,joinall import time def foo1(name): print(‘%s play1‘%name) time.sleep(2)#模擬IO操作,遇到IO切換任務 print(‘%s play2‘%name) def foo2(name): print(‘%s eat1‘%name) time.sleep(3)#模擬IO操作,遇到IO切換任務 print(‘%s eat2‘%name) f1=spawn(foo1,‘egon‘)#提交任務 f2=spawn(foo2,‘egon‘)#提交任務 joinall([f1,f2])#主線程等待任務完成 print(‘主‘) #結果: #egon play1 #egon eat1 #egon play2 #egon eat2 #主
2.基於gevent的套接字通信
服務端
from gevent import monkey;monkey.patch_all() from gevent import spawn import socket from threading import current_thread IP=‘127.0.0.1‘ PORT=8086 ADDRESS=(IP,PORT) BUFFSIZE=1024 def communicate(conn,addr): while True: try: data=conn.recv(BUFFSIZE) if not data: print(‘%s客戶端斷開....‘%addr) break conn.send(data.upper()) except ConnectionResetError: break def server(): server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.bind(ADDRESS) server.listen(2) print(current_thread().name) while True: conn,addr=server.accept() spawn(communicate,conn,addr) if __name__ == ‘__main__‘: s1=spawn(server) s1.join()View Code
多個客戶端並發
import socket from threading import Thread,current_thread IP = ‘127.0.0.1‘ PORT = 8086 ADDRESS = (IP, PORT) BUFFSIZE = 1024 def client(): client=socket.socket(socket.AF_INET,socket.SOCK_STREAM) client.connect(ADDRESS) n=0 while True: msg=‘%s say hello %s‘ %(current_thread().name,n) n+=1 client.send(msg.encode(‘utf-8‘)) data=client.recv(BUFFSIZE) print(data.decode(‘utf-8‘)) if __name__ == ‘__main__‘: for i in range(500): t=Thread(target=client) t.start()View Code
Python入門學習-DAY37-進程池與線程池、協程、gevent模塊