並發編程 - 線程 - 1.線程queue/2.線程池進程池/3.異步調用與回調機制
阿新 • • 發佈:2018-04-04
cal 編程 機制 com size ssp .org don 結果
1.線程queue :會有鎖
q=queue.Queue(3)
q.get()
q.put()
先進先出 隊列
後進先出 堆棧
優先級隊列
1 """先進先出 隊列""" 2 import queue 3 q=queue.Queue(3) #先進先出->隊列 4 5 q.put(‘first‘) 6 q.put(2) 7 # q.put(‘third‘) 8 # q.put(4) 9 q.put(4,block=False) #q.put_nowait(4) 10 # q.put_nowait(4) 11 # q.put(4,block=True) # True 阻塞 False 不阻塞 直接告訴你 隊列滿了12 # q.put(4,block=True,timeout=3) # 阻塞等待3秒 還沒有拿走數據就拋異常 13 # 14 print(q.get()) 15 print(q.get()) 16 print(q.get()) 17 print(q.get(block=True,timeout=2)) # false 不阻塞沒有數據就拋異常 默認是阻塞 block=True 18 print(q.get_nowait()) # 相當於block=false 19 # def get(self, block=True, timeout=None): 20 21 22 """後進先出 堆棧""" 23 import queue 24 q=queue.LifoQueue(3) #後進先出->堆棧 25 q.put(‘first‘) 26 q.put(2) 27 q.put(‘third‘) 28 29 print(q.get()) 30 print(q.get()) 31 print(q.get()) 32 33 """優先級隊列 """ 34 import queue 35 q=queue.PriorityQueue(3) #優先級隊列 36 37 q.put((10,{‘alice‘:12})) # 數字越小 優先級越高 優先拿出來 38 q.put((40,‘two‘)) 39 q.put((30,‘three‘)) 40 41 print(q.get()) 42 print(q.get()) 43 print(q.get())
2.線程池進程池:
client server 是IO 操作應該用多線程
計算密集型: 用多進程
io密集型:用多線程
池:對數目加以限制,保證機器正常運行
1 from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor 2 import os,time,random 3 4 def task(name): 5 print(‘name:%s pid:%s run‘ %(name,os.getpid())) 6 time.sleep(random.randint(1,3)) 7 8 9 if __name__ == ‘__main__‘: 10 pool=ProcessPoolExecutor(4) # 不指定 默認是cpu的核數 11 # pool=ThreadPoolExecutor(5) 12 13 for i in range(10): 14 pool.submit(task,‘egon%s‘ %i) # 異步調用池子收了10個任務,但同一時間只有4個任務在進行 15 16 pool.shutdown(wait=True) # 類似join 代表往池子裏面丟任務的入口封死了 計數器-1 17 18 19 print(‘主‘) 20 """ 21 主 # # 異步調用池子收了10個任務,但同一時間只有4個任務在進行 22 name:egon0 pid:60056 run # 只有4個pid 23 name:egon1 pid:64700 run 24 name:egon2 pid:59940 run 25 name:egon3 pid:60888 run 26 27 name:egon4 pid:60888 run 28 29 name:egon5 pid:60056 run 30 name:egon6 pid:60888 run 31 32 name:egon7 pid:60056 run 33 name:egon8 pid:64700 run 34 name:egon9 pid:59940 run 35 """ 36 # pool.shutdown(wait=True) # 代表往池子裏面丟任務的入口封死了 計數器-1 37 """ 38 name:egon0 pid:57124 run 39 name:egon1 pid:62252 run 40 name:egon2 pid:55736 run 41 name:egon3 pid:62060 run 42 name:egon4 pid:57124 run 43 name:egon5 pid:62252 run 44 name:egon6 pid:55736 run 45 name:egon7 pid:55736 run 46 name:egon8 pid:62060 run 47 name:egon9 pid:55736 run 48 主 49 """ 50 51 from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor 52 from threading import currentThread 53 import os,time,random 54 55 def task(): 56 print(‘name:%s pid:%s run‘ %(currentThread().getName(),os.getpid())) 57 time.sleep(random.randint(1,3)) 58 59 60 if __name__ == ‘__main__‘: 61 pool=ThreadPoolExecutor(5) 62 63 for i in range(10): 64 pool.submit(task) 65 66 pool.shutdown(wait=True) 67 68 69 print(‘主‘) 70 """ 71 name:ThreadPoolExecutor-0_0 pid:61508 run 72 name:ThreadPoolExecutor-0_1 pid:61508 run 73 name:ThreadPoolExecutor-0_2 pid:61508 run 74 name:ThreadPoolExecutor-0_3 pid:61508 run 75 name:ThreadPoolExecutor-0_4 pid:61508 run 76 name:ThreadPoolExecutor-0_2 pid:61508 run 77 name:ThreadPoolExecutor-0_4 pid:61508 run 78 name:ThreadPoolExecutor-0_0 pid:61508 run 79 name:ThreadPoolExecutor-0_3 pid:61508 run 80 name:ThreadPoolExecutor-0_1 pid:61508 run 81 主 82 """
3.異步調用與回調機制:
提交任務的兩種方式:
同步調用:提交完任務後,就在原地等待任務執行完畢,拿到結果,再執行下一行代碼,導致程序是串行執行,效率低
異步調用:提交完任務後,不等待任務執行完畢。異步調用+回調機制 自動觸發叫回調
1 """同步調用""" 2 from concurrent.futures import ThreadPoolExecutor 3 import time 4 import random 5 6 def la(name): 7 print(‘%s is laing‘ %name) 8 time.sleep(random.randint(3,5)) 9 res=random.randint(7,13)*‘#‘ 10 return {‘name‘:name,‘res‘:res} 11 12 def weigh(shit): 13 name=shit[‘name‘] 14 size=len(shit[‘res‘]) 15 print(‘%s 拉了 《%s》kg‘ %(name,size)) 16 17 18 if __name__ == ‘__main__‘: 19 pool=ThreadPoolExecutor(13) 20 21 shit1=pool.submit(la,‘alex‘).result() 22 weigh(shit1) 23 24 shit2=pool.submit(la,‘wupeiqi‘).result() 25 weigh(shit2) 26 27 shit3=pool.submit(la,‘yuanhao‘).result() 28 weigh(shit3) 29 30 31 """異步調用 + 回調機制 自動觸發叫回調""" 32 from concurrent.futures import ThreadPoolExecutor 33 import time 34 import random 35 36 def la(name): 37 print(‘%s is laing‘ %name) 38 time.sleep(random.randint(3,5)) 39 res=random.randint(7,13)*‘#‘ 40 return {‘name‘:name,‘res‘:res} 41 # weigh({‘name‘:name,‘res‘:res}) # 這樣寫不好 所有功能 寫在一起了 42 43 44 def weigh(shit): 45 shit=shit.result() # 拿到是 對象 需要result() 46 name=shit[‘name‘] 47 size=len(shit[‘res‘]) 48 print(‘%s 拉了 《%s》kg‘ %(name,size)) 49 50 51 if __name__ == ‘__main__‘: 52 pool=ThreadPoolExecutor(13) 53 54 # pool.submit(la, ‘alex‘) 55 # pool.submit(la, ‘wupeiqi‘) 56 # pool.submit(la, ‘yuanhao‘) 57 58 pool.submit(la,‘alex‘).add_done_callback(weigh) # 實現了程序的解耦合 59 pool.submit(la,‘wupeiqi‘).add_done_callback(weigh) 60 pool.submit(la,‘yuanhao‘).add_done_callback(weigh)
4.異步調用與回調機制應用:
pip3 install requests
requests
異步調用+回調機制的 應用場景:
from concurrent.futures import ThreadPoolExecutor import requests import time def get(url): # io操作 基於線程 數目有限 用線程池 print(‘GET %s‘ %url) response=requests.get(url) time.sleep(3) return {‘url‘:url,‘content‘:response.text} def parse(res): res=res.result() print(‘%s parse res is %s‘ %(res[‘url‘],len(res[‘content‘]))) if __name__ == ‘__main__‘: urls=[ ‘http://www.cnblogs.com/linhaifeng‘, ‘https://www.python.org‘, ‘https://www.openstack.org‘, ] pool=ThreadPoolExecutor(2) for url in urls: pool.submit(get,url).add_done_callback(parse)
並發編程 - 線程 - 1.線程queue/2.線程池進程池/3.異步調用與回調機制