重修課程day34(網絡編程八之線程二)
一 concurrent.futures模塊
開啟進程池和線程池模塊。
線程池和進程池的區別:進程池開啟的數量範圍只能在CPU對的核數以內。而線程池的數量可以自己隨便的定義,默認為是核數的五倍。
相關用法:
ThreadPoolExecutor:創建一個線程池
ProcessPoolExecutor:創建一個進程池
Executor:是一個抽象類,上面的兩個都是方法繼承這個抽象類的
submit:異步的提交方式。
map:簡化提交的方式,map自帶循環,只能單純的提交,用於int類型,並且沒有放回的結果。
shutdown:提供的一個借口,等待進程池或著線程池執行完畢過後,回收調那裏的資源。它裏面的wait=True的時候,要等待執行解釋後才會有返回結果;如果wait=False,就會理解返回結果,等到任務執行結束後才能回收調進程池或者線程池裏面的資源。但是不能提交任務了。
result:拿到結果。
# import concurrent.futures # import os # import time # import random # # def work(n): # print(‘%s is working‘%os.getpid()) # time.sleep(random.random()) # return n # # if __name__==‘__main__‘: # excutor=concurrent.futures.ProcessPoolExecutor(4) # futures=[] # for i in range(20): # future=excutor.submit(work,i) # futures.append(future) # excutor.shutdown(wait=True) # print(‘%s is zhuing‘%os.getpid()) # for future in futures: # print(future.result())
簡寫如下:
# import concurrent.futures # import os # import time # import random # def work(n): # print(‘%s is working‘ % os.getpid()) # time.sleep(random.random()) # return n # if __name__ == ‘__main__‘: # excutor = concurrent.futures.ThreadPoolExecutor(4) # futures = [excutor.submit(work, i) for i in range(20)] # excutor.shutdown(wait=True) # print(‘%s is zhuing‘ % os.getpid()) # for future in futures: # print(future.result())
回調函數:add_done_callback後面括號裏加上一個回調函數,回調函數接收的就是第一個函數返回的一個對像,使用時,還需要在回調函數的內部sunmit提交一下。
如下:
# import concurrent.futures # import requests # import time # import random # import os # def work(url): # print(‘%s is %s‘%(os.getpid(),url)) # ret=requests.get(url) # time.sleep(3) # if ret.status_code==200: # print(‘%s DONE %s‘%(os.getpid(),url)) # return {‘url‘:url,‘text‘:ret.text} # def foo(ret): # ret = ret.result() # print(‘%s FOO %s‘%(os.getpid(),ret[‘url‘])) # time.sleep(1) # res=‘%s;長度:%s‘%(ret[‘url‘],len(ret[‘text‘])) # with open(‘a.txt‘,‘a‘,encoding=‘utf-8‘)as f: # f.write(res+‘\n‘) # # if __name__==‘__main__‘: # url_list = [ # ‘http://tool.chinaz.com/regex/‘, # ‘http://www.cnblogs.com/fangjie0410/‘, # "http://www.cnblogs.com/xuanan", # "http://www.cnblogs.com/bg0131/p/6430943.html", # "http://www.cnblogs.com/wupeiqi/", # "http://www.cnblogs.com/linhaifeng/", # "http://www.cnblogs.com/Eva-J/articles/7125925.html", # "http://www.cnblogs.com/Eva-J/articles/6993515.html", # ] # excutres=concurrent.futures.ProcessPoolExecutor() # for i in url_list: # excutres.submit(work,i).add_done_callback(foo) # # print(‘主‘,os.getpid())
異常處理:exception:異常接口
concurrent.futures:自帶異常,所以python內置的異常是不能識別出來的。
raise:拋出異常
import concurrent.futures import os import time import random def work(n): print(‘%s is working‘ % os.getpid()) time.sleep(random.random()) raise Exception return n if __name__ == ‘__main__‘: excutor = concurrent.futures.ThreadPoolExecutor(4) futures = [] for i in range(20): future = excutor.submit(work, i).result() futures.append(future) excutor.shutdown(wait=True) print(‘%s is zhuing‘ % os.getpid()) for future in futures: print(future)
cuncel:取消終止異常
二 死鎖現象和遞歸鎖現象
什麽是死鎖:各自拿著對方想要的一把鎖,但是各自缺一把鎖而不能釋放
Lock:就是互斥鎖但是容易出現死鎖。只能夠acquire一次,只要鎖不釋放(selease),就不能acquire了。
死鎖現象如下:
# import threading # import time # import random # l1=threading.Lock() # l2=threading.Lock() # class Func(threading.Thread): # def run(self): # self.aa() # self.bb() # # def aa(self): # l1.acquire() # print(111) # l2.acquire() # print(222) # l2.release() # l1.release() # # def bb(self): # l2.acquire() # print(333) # time.sleep(random.random()) # l1.acquire() # print(444) # l1.release() # l2.release() # # if __name__==‘__main__‘: # for i in range(10): # ret=Func() # ret.start()
RLock:遞歸鎖,可以賦值多個變量。
遞歸鎖現象如下:
# import threading # import time # import random # r1=r2=threading.RLock() # class Func(threading.Thread): # def run(self): # self.aa() # self.bb() # # def aa(self): # r1.acquire() # print(111) # r2.acquire() # print(222) # r2.release() # r1.release() # # def bb(self): # r1.acquire() # print(333) # time.sleep(random.random()) # r2.acquire() # print(444) # r2.release() # r1.release() # # if __name__==‘__main__‘: # for i in range(10): # ret=Func() # ret.start()
遞歸鎖:沒加一次鎖,引用技術就會加1,沒減一次鎖,引用技術就會減一,並且可以同時acquire多次,只要技術不為0,就不能被其他的線程搶到。
三 信號量
什麽是信號量:其實就是鎖,同時能夠創建多個鎖,實現了一個並發的效果。超出鎖的範圍的任務就只有等待所得釋放,才能夠搶到鎖。相當於產生一堆新的線程和進程。
Semaphore:創建信號量,同時管理一個內置的計數器
# import threading # import time # import random # import os # def task(n): # with sm: # time.sleep(random.randint(1, 5)) # print(‘%s is tasking‘%threading.current_thread().getName()) # # if __name__==‘__main__‘: # sm=threading.Semaphore(5) # for i in range(20): # t=threading.Thread(target=task,args=(i,)) # t.start()
可以指定信號量的個數。
四 事件Event
Event:創建事件。用於線程之間的通信。Event對象實現了簡單的線程之間通信機制,它提供了設置信號,清除信號,等待等用於實現線程間的通信。
set:設置Event內部的信號為真。
wait:只有在內部信號為真的時候才會很快的執行並完成返回。當Event對象的內部信號標誌位假時,則wait方法一直等待到其為真時才返回。
timeout:wait裏面的一個參數,時間參數,等待的時間範圍。
使用Event對象的clear()方法可以清除Event對象內部的信號標誌,即將其設為假,當使用Event的clear方法後,isSet()方法返回假
is_set:判斷是否傳入了信號
isSet:返回event的狀態值
# import threading # import time # import random # e=threading.Event() # def work(): # print(‘%s 正在檢測‘%threading.current_thread().getName()) # time.sleep(random.randint(1,5)) # e.set() # # def foo(): # count=1 # while not e.is_set(): # if count > 3: # raise TimeoutError(‘等待超時‘) # print(‘%s 正在等待%s次連接‘%(threading.current_thread().getName(),count)) # e.wait(timeout=random.randint(1,5)) # count+=1 # print(‘%s 正在連接‘%threading.current_thread().getName()) # if __name__==‘__main__‘: # t1=threading.Thread(target=work) # t2=threading.Thread(target=foo) # t1.start() # t2.start()
五 定時器
Timer:定時器,是thread的一個派生類,用於在指定的時間後調用調用某個方法。
# import threading # import random # def hello(n): # print(‘hello‘,n) # # if __name__==‘__main__‘: # for i in range(20): # t=threading.Timer(random.random(),hello,args=(i,)) # t.start()
六 線程queue:隊列
Queue:先放入隊列裏的數據先讀取出來
# import queue # import time # import random # import threading # q=queue.Queue(5) # def work(n): # time.sleep(random.randint(1,5)) # q.put(‘%s is working‘%n) # print(n) # # def foo(): # time.sleep(random.randint(1,3)) # print(q.get()) # # if __name__==‘__main__‘: # for i in range(20): # t1=threading.Thread(target=work,args=(i,)) # t2=threading.Thread(target=foo) # t1.start() # t2.start()
PriorityQueue:先讀取優先級最高的數據,傳參時,以元組的格式傳入,前面傳入數字,後面傳入內容。數字從小到大排的優先級,如果是數字相同,就會按照ascii碼從小到大排序
# import queue # import time # import random # import threading # q=queue.PriorityQueue() # def work(n): # time.sleep(random.randint(1,5)) # q.put(‘%s is working‘%n) # print(n) # # def foo(): # time.sleep(random.randint(1,3)) # print(q.get()) # # if __name__==‘__main__‘: # for i in range(20): # t1=threading.Thread(target=work,args=(i,)) # t2=threading.Thread(target=foo) # t1.start() # t2.start()
LifoQueue:先進後出,也叫做堆棧。讀取數據時,按照時間短的先讀取。
# import queue # import random # import time # import threading # q=queue.LifoQueue() # def work(n): # time.sleep(random.randint(1, 5)) # q.put((random.randint(1,20),‘jie_%s‘%n)) # print(n) # # def foo(): # print(q.get()) # time.sleep(random.randint(1,3)) # # if __name__==‘__main__‘: # for i in range(30): # t1=threading.Thread(target=work,args=(1,)) # t2=threading.Thread(target=foo) # t1.start() # t2.start()
重修課程day34(網絡編程八之線程二)