Python 線程
一、定義:
線程顧名思義,就是一條流水線工作的過程,一條流水線必須屬於一個車間,一個車間的工作過程是一個進程,進程只是用來把資源集中到一起(進程只是一個資源單位,或者說資源集合),而線程才是執行單位
二、線程定義方式:
1、使用替換threading模塊提供的Thread
from threading import Thread from multiprocessing import Process def task(): print(‘is running‘) if __name__ == ‘__main__‘: t=Thread(target=task,)# t=Process(target=task,) t.start() print(‘主‘)
2、自定義類,繼承Thread
from threading import Thread from multiprocessing import Process class MyThread(Thread): def __init__(self,name): super().__init__() self.name=name def run(self): print(‘%s is running‘ %self.name)if __name__ == ‘__main__‘: t=MyThread(‘egon‘) # t=Process(target=task,) t.start() print(‘主‘)
三、多線程共享同一個進程內的資源
因為線程間的數據是共享的所以都會用同一個資源
from threading import Thread from multiprocessing import Process n=100 def work(): global n n=0 if __name__ == ‘__main__‘: # p=Process(target=work,)# p.start() # p.join() # print(‘主‘,n) t=Thread(target=work,) t.start() t.join() print(‘主‘,n)
四、其它相關函數
Thread實例對象的方法 # isAlive(): 返回線程是否活動的。 # getName(): 返回線程名。 # setName(): 設置線程名。 threading模塊提供的一些方法: # threading.currentThread(): 返回當前的線程變量。 # threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啟動後、結束前,不包括啟動前和終止後的線程。 # threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。
from threading import Thread,activeCount,enumerate,current_thread import time def task(): print(‘%s is running‘ %current_thread().getName()) time.sleep(2) if __name__ == ‘__main__‘: t=Thread(target=task,) t.start() t.join() print(t.is_alive()) print(t.getName()) print(enumerate()) print(‘主‘) print(activeCount())
current_thread的用法 from threading import Thread,activeCount,enumerate,current_thread from multiprocessing import Process import time def task(): print(‘%s is running‘ %current_thread().getName()) time.sleep(2) if __name__ == ‘__main__‘: p=Process(target=task) p.start() print(current_thread())
from threading import Thread,activeCount,enumerate,current_thread from multiprocessing import Process import time def task(): print(‘%s is running‘ %current_thread().getName()) time.sleep(2) if __name__ == ‘__main__‘: t1=Thread(target=task) t2=Thread(target=task) t3=Thread(target=task) t1.start() t2.start() t3.start() print(current_thread())
五、守護線程
守護線程則是主線程等待其它非守護線程結束,主線程結束則守護線程結束
#再看:守護線程 from threading import Thread import time def task1(): print(‘123‘) time.sleep(10) print(‘123done‘) def task2(): print(‘456‘) time.sleep(1) print(‘456done‘) if __name__ == ‘__main__‘: t1=Thread(target=task1) t2=Thread(target=task2) t1.daemon=True t1.start() t2.start() print(‘主‘)
六、線程互斥鎖
即:線程中誰搶到了鎖誰去執行,沒有搶到的則在等待
from threading import Thread,Lock import time n=100 def work(): global n mutex.acquire()#搶到鎖加鎖 temp=n time.sleep(0.1) n=temp-1 mutex.release()#解鎖 if __name__ == ‘__main__‘: mutex=Lock() l=[] start=time.time() for i in range(100): t=Thread(target=work) l.append(t) t.start() for t in l: t.join() print(‘run time:%s value:%s‘ %(time.time()-start,n))
七:互斥鎖與join的區別
互斥鎖只是在重要的代碼階段加上誰搶到誰處理,而join則是一個一個的全部把所有的代碼都執行,大大加大執行代碼的時間
join實例:
from threading import Thread,Lock import time n=100 def work(): time.sleep(0.05) global n temp=n time.sleep(0.1) n=temp-1 if __name__ == ‘__main__‘: start=time.time() for i in range(100): t=Thread(target=work) t.start() t.join() print(‘run time:%s value:%s‘ %(time.time()-start,n))
互斥鎖實例:
#互斥鎖 from threading import Thread,Lock import time n=100 def work(): time.sleep(0.05) global n mutex.acquire() temp=n time.sleep(0.1) n=temp-1 mutex.release() if __name__ == ‘__main__‘: mutex=Lock() l=[] start=time.time() for i in range(100): t=Thread(target=work) l.append(t) t.start() for t in l: t.join() print(‘run time:%s value:%s‘ %(time.time()-start,n))
八:線程死鎖與遞規鎖
死鎖:則是幾個人在搶幾把鎖,但是一個人搶一把鎖,在沒有解這把鎖,則是去搶另一把,則永遠無法搶到,也沒法解除當前的鎖,由為死鎖
from threading import Thread,Lock,RLock import time mutexA=Lock() mutexB=Lock() class Mythread(Thread): def run(self): self.f1() self.f2() def f1(self): mutexA.acquire() print(‘\033[45m%s 搶到A鎖\033[0m‘ %self.name) mutexB.acquire() print(‘\033[44m%s 搶到B鎖\033[0m‘ %self.name) mutexB.release() mutexA.release() def f2(self): mutexB.acquire() print(‘\033[44m%s 搶到B鎖\033[0m‘ %self.name) time.sleep(1) mutexA.acquire() print(‘\033[45m%s 搶到A鎖\033[0m‘ %self.name) mutexA.release() mutexB.release()
遞歸鎖:
則需要threading導入RLock,用這個每一個人拿到的都是這把鎖,解除這把鎖之後才能拿到下把鎖,這個RLock內部維護著一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源可以被多次require。直到一個線程所有的acquire都被release,其他的線程才能獲得資源。上面的例子如果使用RLock代替Lock,則不會發生死鎖:
#遞歸鎖 from threading import Thread,Lock,RLock import time mutex=RLock() class Mythread(Thread): def run(self): self.f1() self.f2() def f1(self): mutex.acquire() print(‘\033[45m%s 搶到A鎖\033[0m‘ %self.name) mutex.acquire() print(‘\033[44m%s 搶到B鎖\033[0m‘ %self.name) mutex.release() mutex.release() def f2(self): mutex.acquire() print(‘\033[44m%s 搶到B鎖\033[0m‘ %self.name) time.sleep(1) mutex.acquire() print(‘\033[45m%s 搶到A鎖\033[0m‘ %self.name) mutex.release() mutex.release()
九:信號量
同進程的一樣
Semaphore管理一個內置的計數器,
每當調用acquire()時內置計數器-1;
調用release() 時內置計數器+1;
計數器不能小於0;當計數器為0時,acquire()將阻塞線程直到其他線程調用release()
from threading import Thread,current_thread,Semaphore import time,random sm=Semaphore(5) def work(): sm.acquire() print(‘%s 上廁所‘ %current_thread().getName()) time.sleep(random.randint(1,3)) sm.release() if __name__ == ‘__main__‘: for i in range(20): t=Thread(target=work) t.start()
十:Event
同進程的一樣
線程的一個關鍵特性是每個線程都是獨立運行且狀態不可預測。如果程序中的其 他線程需要通過判斷某個線程的狀態來確定自己下一步的操作,這時線程同步問題就會變得非常棘手。為了解決這些問題,我們需要使用threading庫中的Event對象。 對象包含一個可由線程設置的信號標誌,它允許線程等待某些事件的發生。在 初始情況下,Event對象中的信號標誌被設置為假。如果有線程等待一個Event對象, 而這個Event對象的標誌為假,那麽這個線程將會被一直阻塞直至該標誌為真。一個線程如果將一個Event對象的信號標誌設置為真,它將喚醒所有等待這個Event對象的線程。如果一個線程等待一個已經被設置為真的Event對象,那麽它將忽略這個事件, 繼續執行
event.isSet():返回event的狀態值; event.wait():如果 event.isSet()==False將阻塞線程; event.set(): 設置event的狀態值為True,所有阻塞池的線程激活進入就緒狀態, 等待操作系統調度; event.clear():恢復event的狀態值為False。
from threading import Thread,current_thread,Event import time event=Event() def conn_mysql(): count=1 while not event.is_set(): if count > 3: raise ConnectionError(‘鏈接失敗‘) print(‘%s 等待第%s次鏈接mysql‘ %(current_thread().getName(),count)) event.wait(0.5) count+=1 print(‘%s 鏈接ok‘ % current_thread().getName()) def check_mysql(): print(‘%s 正在檢查mysql狀態‘ %current_thread().getName()) time.sleep(1) event.set() if __name__ == ‘__main__‘: t1=Thread(target=conn_mysql) t2=Thread(target=conn_mysql) check=Thread(target=check_mysql) t1.start() t2.start() check.start()
十一:定時器
定義:指定n秒後執行某操作
from threading import Timer def hello(n): print("hello, world",n) t = Timer(3, hello,args=(11,))#3秒後執行 t.start() # after 1 seconds, "hello, world" will be printed
十二:線程queue
定義:線程的隊列,使用import queue,用法與進程Queue一樣
import queue q=queue.Queue(3) #隊列:先進先出 q.put(1) q.put(2) q.put(3) print(q.get()) print(q.get()) print(q.get()) q=queue.LifoQueue(3) #堆棧:後進先出 q.put(1) q.put(2) q.put(3) print(q.get()) print(q.get()) print(q.get()) q=queue.PriorityQueue(3) #數字越小優先級越高 q.put((10,‘data1‘)) q.put((11,‘data2‘)) q.put((9,‘data3‘)) print(q.get()) print(q.get()) print(q.get())
十三、線程池
定義:則是同時開啟多少線程,如果並發則用的線程名則還是已開啟的
#線程池 import requests #pip3 install requests import os,time,threading from multiprocessing import Pool from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor def get_page(url): print(‘<%s> get :%s‘ %(threading.current_thread().getName(),url)) respone = requests.get(url) if respone.status_code == 200: return {‘url‘:url,‘text‘:respone.text} def parse_page(obj): dic=obj.result() print(‘<%s> parse :%s‘ %(threading.current_thread().getName(),dic[‘url‘])) time.sleep(0.5) res=‘url:%s size:%s\n‘ %(dic[‘url‘],len(dic[‘text‘])) #模擬解析網頁內容 with open(‘db.txt‘,‘a‘) as f: f.write(res) if __name__ == ‘__main__‘: # p=Pool(4) p=ThreadPoolExecutor(3) #同時開始3個線程 urls = [ ‘http://www.baidu.com‘, ‘http://www.baidu.com‘, ‘http://www.baidu.com‘, ‘http://www.baidu.com‘, ‘http://www.baidu.com‘, ‘http://www.baidu.com‘, ‘http://www.baidu.com‘, ] for url in urls: # p.apply_async(get_page,args=(url,),callback=parse_page) p.submit(get_page,url).add_done_callback(parse_page) p.shutdown() print(‘主進程pid:‘,os.getpid())
Python 線程