1. 程式人生 > >python—day32 異步 + 回調 、Event、gevent 、協程、單線程下實現遇到IO切換

python—day32 異步 + 回調 、Event、gevent 、協程、單線程下實現遇到IO切換

user stat 調用 進程池 targe eat 串行 spool 工具

異步 + 回調:就是把下載好的東西回調主進程執行 或者回調給線程,哪個線程閑著就執行

 1 #進程的異步 + 回調
 2 # from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
 3 #
 4 # import requests
 5 # import os,time,random
 6 # def get(url):
 7 #     print(‘%s get %s‘%(os.getpid(),url) )
 8 #
 9 #     response = requests.get(url)
10 #
time.sleep(random.randint(1, 3)) 11 # 12 # if response.status_code == 200 : 13 # #幹解析的活 只要下載完立刻進行解析 14 # return response.text 15 # 16 # def pasrse(obj): 17 # res = obj.result() 18 # print(‘%s 解析結果為:%s‘ %(os.getpid(),len(res))) 19 # 20 # if __name__ == ‘__main__‘: 21 # urls = [
22 # ‘https://www.baidu.com/‘, 23 # ‘https://www.baidu.com/‘, 24 # ‘https://www.baidu.com/‘, 25 # ‘https://www.baidu.com/‘, 26 # ‘https://www.baidu.com/‘, 27 # ‘http://www.sina.com.cn/‘, 28 # ‘http://www.sina.com.cn/‘, 29 # ‘http://www.sina.com.cn/‘ 30 # ]
31 # pool = ProcessPoolExecutor(4) 32 # # objs = [] 33 # for url in urls: 34 # #把get函數和url任務扔進進程池 35 # obj = pool.submit(get,url) 36 # #提交完後給obj對象綁定了一個工具pasrse 37 # #任務有返回值就會自動運行,有結果立即調用解析方法pasrse,完成了解耦 38 # obj.add_done_callback(pasrse) 39 # 40 # print(‘主進程 %s‘%os.getpid()) 41 # # objs.append(obj) 42 # # res = pool.submit(get,url).result() 同步解析 43 # # pool.shutdown(wait=True) 44 # 45 # #問題 46 # #1、任務的返回值不能得到及時的處理,必須等到所有任務都運行完畢才能繼續統一進行處理 47 # #2、解析的過程是串行執行的,如果解析一次需要花費2s,解析9次則需要花費18秒 48 # 49 # # 串行了 50 # # for obj in objs: 51 # # res = obj.result() 52 # # pasrse(res) 53 54 55 56 #哪個線程閑著就用回調函數 57 from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor 58 from threading import current_thread 59 import requests 60 import os,time,random 61 def get(url): 62 print(%s get %s%(current_thread().name,url) ) 63 64 response = requests.get(url) 65 time.sleep(random.randint(1, 3)) 66 67 if response.status_code == 200 : 68 #幹解析的活 只要下載完立刻進行解析 69 return response.text 70 71 def pasrse(obj): 72 res = obj.result() 73 print(%s 解析結果為:%s %(current_thread().name,len(res))) 74 75 if __name__ == __main__: 76 urls = [ 77 https://www.baidu.com/, 78 https://www.baidu.com/, 79 https://www.baidu.com/, 80 https://www.baidu.com/, 81 https://www.baidu.com/, 82 http://www.sina.com.cn/, 83 http://www.sina.com.cn/, 84 http://www.sina.com.cn/ 85 ] 86 pool = ThreadPoolExecutor(4) 87 # objs = [] 88 for url in urls: 89 #把get函數和url任務扔進進程池 90 obj = pool.submit(get,url) 91 #提交完後給obj對象綁定了一個工具pasrse 92 #任務有返回值就會自動運行,有結果立即調用解析方法pasrse,完成了解耦 93 obj.add_done_callback(pasrse) 94 95 print(主線程 %s%current_thread().name)

線程Queue:

 1 import queue
 2 
 3 q=queue.Queue(3) #隊列:先進先出
 4 q.put(1)
 5 q.put(2)
 6 q.put(3)
 7 # q.put(4)
 8 
 9 print(q.get())
10 print(q.get())
11 print(q.get())
12 
13 
14 q=queue.LifoQueue(3) #堆棧:後進先出
15 
16 q.put(a)
17 q.put(b)
18 q.put(c)
19 
20 print(q.get())
21 print(q.get())
22 print(q.get())
23 
24 
25 q=queue.PriorityQueue(3) #優先級隊列:可以以小元組的形式往隊列裏存值,第一個元素代表優先級,數字越小優先級越高
26 q.put((10,user1))
27 q.put((-3,user2))
28 q.put((-2,user3))
29 
30 
31 print(q.get())
32 print(q.get())
33 print(q.get())

線程Event:event.wait()

 1 from threading import Event,current_thread,Thread
 2 import time
 3 
 4 event=Event()
 5 
 6 def check():
 7     print(%s 正在檢測服務是否正常.... %current_thread().name)
 8     time.sleep(5)
 9     event.set()
10 
11 
12 def connect():
13     count=1
14     while not event.is_set():
15         if count ==  4:
16             print(嘗試的次數過多,請稍後重試)
17             return
18         print(%s 嘗試第%s次連接... %(current_thread().name,count))
19         event.wait(1)
20         count+=1
21     print(%s 開始連接... % current_thread().name)
22 
23 if __name__ == __main__:
24     t1=Thread(target=connect)
25     t2=Thread(target=connect)
26     t3=Thread(target=connect)
27 
28     c1=Thread(target=check)
29 
30     t1.start()
31     t2.start()
32     t3.start()
33     c1.start()

gevent:

 1 from gevent import monkey;monkey.patch_all()
 2 from threading import current_thread
 3 import gevent
 4 import time
 5 
 6 def eat():
 7     print(%s eat 1 %current_thread().name)
 8     time.sleep(5)
 9     print(%s eat 2 %current_thread().name)
10 def play():
11     print(%s play 1 %current_thread().name)
12     time.sleep(3)
13     print(%s play 2 %current_thread().name)
14 
15 g1=gevent.spawn(eat)
16 g2=gevent.spawn(play)
17 
18 # gevent.sleep(100)
19 # g1.join()
20 # g2.join()
21 print(current_thread().name)
22 gevent.joinall([g1,g2])

協程:

1、單線程下實現並發:協程

  並發指的是多個任務看起來是同時運行的

  並發實現的本質:切換 + 保存狀態

  並發、並行、串行

  並發:看起來是同時運行,切換 + 保存狀態

    實現並行,4個cpu能夠並行4個任務

  串行:一個人完完整整地執行完畢才能運行下一個任務

 1 import time
 2 def consumer():
 3     ‘‘‘任務1:接收數據,處理數據‘‘‘
 4     while True:
 5         x=yield
 6 
 7 
 8 def producer():
 9     ‘‘‘任務2:生產數據‘‘‘
10     g=consumer()
11     next(g)
12     for i in range(10000000):
13         g.send(i)
14 
15 start=time.time()
16 #基於yield保存狀態,實現兩個任務直接來回切換,即並發的效果
17 #PS:如果每個任務中都加上打印,那麽明顯地看到兩個任務的打印是你一次我一次,即並發執行的.
18 producer() #1.0202116966247559
19 
20 
21 stop=time.time()
22 print(stop-start)

 1 import time
 2 def consumer(res):
 3     ‘‘‘任務1:接收數據,處理數據‘‘‘
 4     pass
 5 
 6 def producer():
 7     ‘‘‘任務2:生產數據‘‘‘
 8     res=[]
 9     for i in range(10000000):
10         res.append(i)
11 
12     consumer(res)
13     # return res
14 
15 start=time.time()
16 #串行執行
17 res=producer()
18 stop=time.time()
19 print(stop-start)
 1 # 純計算的任務串行執行
 2 import time
 3 def task1():
 4     res=1
 5     for i in range(1000000):
 6         res+=i
 7 
 8 def task2():
 9     res=1
10     for i in range(1000000):
11         res*=i
12 
13 start=time.time()
14 #基於yield保存狀態,實現兩個任務直接來回切換,即並發的效果
15 #PS:如果每個任務中都加上打印,那麽明顯地看到兩個任務的打印是你一次我一次,即並發執行的.
16 task1()
17 task2()
18 stop=time.time()
19 print(stop-start)
20 
21 
22 
23 # 純計算的任務並發執行
24 import time
25 def task1():
26     res=1
27     for i in range(1000000):
28         res+=i
29         yield
30         time.sleep(10000)
31         print(task1)
32 
33 def task2():
34     g=task1()
35     res=1
36     for i in range(1000000):
37         res*=i
38         next(g)
39         print(task2)
40 
41 start=time.time()
42 #基於yield保存狀態,實現兩個任務直接來回切換,即並發的效果
43 #PS:如果每個任務中都加上打印,那麽明顯地看到兩個任務的打印是你一次我一次,即並發執行的.
44 task2()
45 stop=time.time()
46 print(stop-start)

單線程下實現遇到IO切換:

 1 from greenlet import greenlet
 2 import time
 3 
 4 def eat(name):
 5     print(%s eat 1 %name)
 6     time.sleep(30)
 7     g2.switch(alex)
 8     print(%s eat 2 %name)
 9     g2.switch()
10 def play(name):
11     print(%s play 1 %name)
12     g1.switch()
13     print(%s play 2 %name)
14 
15 g1=greenlet(eat)
16 g2=greenlet(play)
17 
18 g1.switch(egon)

python—day32 異步 + 回調 、Event、gevent 、協程、單線程下實現遇到IO切換