python—day32 異步 + 回調 、Event、gevent 、協程、單線程下實現遇到IO切換
阿新 • • 發佈:2018-05-02
user stat 調用 進程池 targe eat 串行 spool 工具
異步 + 回調:就是把下載好的東西回調主進程執行 或者回調給線程,哪個線程閑著就執行
1 #進程的異步 + 回調 2 # from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor 3 # 4 # import requests 5 # import os,time,random 6 # def get(url): 7 # print(‘%s get %s‘%(os.getpid(),url) ) 8 # 9 # response = requests.get(url) 10 #time.sleep(random.randint(1, 3)) 11 # 12 # if response.status_code == 200 : 13 # #幹解析的活 只要下載完立刻進行解析 14 # return response.text 15 # 16 # def pasrse(obj): 17 # res = obj.result() 18 # print(‘%s 解析結果為:%s‘ %(os.getpid(),len(res))) 19 # 20 # if __name__ == ‘__main__‘: 21 # urls = [22 # ‘https://www.baidu.com/‘, 23 # ‘https://www.baidu.com/‘, 24 # ‘https://www.baidu.com/‘, 25 # ‘https://www.baidu.com/‘, 26 # ‘https://www.baidu.com/‘, 27 # ‘http://www.sina.com.cn/‘, 28 # ‘http://www.sina.com.cn/‘, 29 # ‘http://www.sina.com.cn/‘ 30 # ]31 # pool = ProcessPoolExecutor(4) 32 # # objs = [] 33 # for url in urls: 34 # #把get函數和url任務扔進進程池 35 # obj = pool.submit(get,url) 36 # #提交完後給obj對象綁定了一個工具pasrse 37 # #任務有返回值就會自動運行,有結果立即調用解析方法pasrse,完成了解耦 38 # obj.add_done_callback(pasrse) 39 # 40 # print(‘主進程 %s‘%os.getpid()) 41 # # objs.append(obj) 42 # # res = pool.submit(get,url).result() 同步解析 43 # # pool.shutdown(wait=True) 44 # 45 # #問題 46 # #1、任務的返回值不能得到及時的處理,必須等到所有任務都運行完畢才能繼續統一進行處理 47 # #2、解析的過程是串行執行的,如果解析一次需要花費2s,解析9次則需要花費18秒 48 # 49 # # 串行了 50 # # for obj in objs: 51 # # res = obj.result() 52 # # pasrse(res) 53 54 55 56 #哪個線程閑著就用回調函數 57 from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor 58 from threading import current_thread 59 import requests 60 import os,time,random 61 def get(url): 62 print(‘%s get %s‘%(current_thread().name,url) ) 63 64 response = requests.get(url) 65 time.sleep(random.randint(1, 3)) 66 67 if response.status_code == 200 : 68 #幹解析的活 只要下載完立刻進行解析 69 return response.text 70 71 def pasrse(obj): 72 res = obj.result() 73 print(‘%s 解析結果為:%s‘ %(current_thread().name,len(res))) 74 75 if __name__ == ‘__main__‘: 76 urls = [ 77 ‘https://www.baidu.com/‘, 78 ‘https://www.baidu.com/‘, 79 ‘https://www.baidu.com/‘, 80 ‘https://www.baidu.com/‘, 81 ‘https://www.baidu.com/‘, 82 ‘http://www.sina.com.cn/‘, 83 ‘http://www.sina.com.cn/‘, 84 ‘http://www.sina.com.cn/‘ 85 ] 86 pool = ThreadPoolExecutor(4) 87 # objs = [] 88 for url in urls: 89 #把get函數和url任務扔進進程池 90 obj = pool.submit(get,url) 91 #提交完後給obj對象綁定了一個工具pasrse 92 #任務有返回值就會自動運行,有結果立即調用解析方法pasrse,完成了解耦 93 obj.add_done_callback(pasrse) 94 95 print(‘主線程 %s‘%current_thread().name)
線程Queue:
1 import queue 2 3 q=queue.Queue(3) #隊列:先進先出 4 q.put(1) 5 q.put(2) 6 q.put(3) 7 # q.put(4) 8 9 print(q.get()) 10 print(q.get()) 11 print(q.get()) 12 13 14 q=queue.LifoQueue(3) #堆棧:後進先出 15 16 q.put(‘a‘) 17 q.put(‘b‘) 18 q.put(‘c‘) 19 20 print(q.get()) 21 print(q.get()) 22 print(q.get()) 23 24 25 q=queue.PriorityQueue(3) #優先級隊列:可以以小元組的形式往隊列裏存值,第一個元素代表優先級,數字越小優先級越高 26 q.put((10,‘user1‘)) 27 q.put((-3,‘user2‘)) 28 q.put((-2,‘user3‘)) 29 30 31 print(q.get()) 32 print(q.get()) 33 print(q.get())
線程Event:event.wait()
1 from threading import Event,current_thread,Thread 2 import time 3 4 event=Event() 5 6 def check(): 7 print(‘%s 正在檢測服務是否正常....‘ %current_thread().name) 8 time.sleep(5) 9 event.set() 10 11 12 def connect(): 13 count=1 14 while not event.is_set(): 15 if count == 4: 16 print(‘嘗試的次數過多,請稍後重試‘) 17 return 18 print(‘%s 嘗試第%s次連接...‘ %(current_thread().name,count)) 19 event.wait(1) 20 count+=1 21 print(‘%s 開始連接...‘ % current_thread().name) 22 23 if __name__ == ‘__main__‘: 24 t1=Thread(target=connect) 25 t2=Thread(target=connect) 26 t3=Thread(target=connect) 27 28 c1=Thread(target=check) 29 30 t1.start() 31 t2.start() 32 t3.start() 33 c1.start()
gevent:
1 from gevent import monkey;monkey.patch_all() 2 from threading import current_thread 3 import gevent 4 import time 5 6 def eat(): 7 print(‘%s eat 1‘ %current_thread().name) 8 time.sleep(5) 9 print(‘%s eat 2‘ %current_thread().name) 10 def play(): 11 print(‘%s play 1‘ %current_thread().name) 12 time.sleep(3) 13 print(‘%s play 2‘ %current_thread().name) 14 15 g1=gevent.spawn(eat) 16 g2=gevent.spawn(play) 17 18 # gevent.sleep(100) 19 # g1.join() 20 # g2.join() 21 print(current_thread().name) 22 gevent.joinall([g1,g2])
協程:
1、單線程下實現並發:協程
並發指的是多個任務看起來是同時運行的
並發實現的本質:切換 + 保存狀態
並發、並行、串行
並發:看起來是同時運行,切換 + 保存狀態
實現並行,4個cpu能夠並行4個任務
串行:一個人完完整整地執行完畢才能運行下一個任務
1 import time 2 def consumer(): 3 ‘‘‘任務1:接收數據,處理數據‘‘‘ 4 while True: 5 x=yield 6 7 8 def producer(): 9 ‘‘‘任務2:生產數據‘‘‘ 10 g=consumer() 11 next(g) 12 for i in range(10000000): 13 g.send(i) 14 15 start=time.time() 16 #基於yield保存狀態,實現兩個任務直接來回切換,即並發的效果 17 #PS:如果每個任務中都加上打印,那麽明顯地看到兩個任務的打印是你一次我一次,即並發執行的. 18 producer() #1.0202116966247559 19 20 21 stop=time.time() 22 print(stop-start)
1 import time 2 def consumer(res): 3 ‘‘‘任務1:接收數據,處理數據‘‘‘ 4 pass 5 6 def producer(): 7 ‘‘‘任務2:生產數據‘‘‘ 8 res=[] 9 for i in range(10000000): 10 res.append(i) 11 12 consumer(res) 13 # return res 14 15 start=time.time() 16 #串行執行 17 res=producer() 18 stop=time.time() 19 print(stop-start)
1 # 純計算的任務串行執行 2 import time 3 def task1(): 4 res=1 5 for i in range(1000000): 6 res+=i 7 8 def task2(): 9 res=1 10 for i in range(1000000): 11 res*=i 12 13 start=time.time() 14 #基於yield保存狀態,實現兩個任務直接來回切換,即並發的效果 15 #PS:如果每個任務中都加上打印,那麽明顯地看到兩個任務的打印是你一次我一次,即並發執行的. 16 task1() 17 task2() 18 stop=time.time() 19 print(stop-start) 20 21 22 23 # 純計算的任務並發執行 24 import time 25 def task1(): 26 res=1 27 for i in range(1000000): 28 res+=i 29 yield 30 time.sleep(10000) 31 print(‘task1‘) 32 33 def task2(): 34 g=task1() 35 res=1 36 for i in range(1000000): 37 res*=i 38 next(g) 39 print(‘task2‘) 40 41 start=time.time() 42 #基於yield保存狀態,實現兩個任務直接來回切換,即並發的效果 43 #PS:如果每個任務中都加上打印,那麽明顯地看到兩個任務的打印是你一次我一次,即並發執行的. 44 task2() 45 stop=time.time() 46 print(stop-start)
單線程下實現遇到IO切換:
1 from greenlet import greenlet 2 import time 3 4 def eat(name): 5 print(‘%s eat 1‘ %name) 6 time.sleep(30) 7 g2.switch(‘alex‘) 8 print(‘%s eat 2‘ %name) 9 g2.switch() 10 def play(name): 11 print(‘%s play 1‘ %name) 12 g1.switch() 13 print(‘%s play 2‘ %name) 14 15 g1=greenlet(eat) 16 g2=greenlet(play) 17 18 g1.switch(‘egon‘)
python—day32 異步 + 回調 、Event、gevent 、協程、單線程下實現遇到IO切換