Python線程之threading
Threading模塊提供線程相關的操作,Threading模塊包含Thread,Lock,RLock,Event,Queue等組件;multiprocess模塊完全模仿了threading模塊的接口,二者在使用時,時極其相似的。
1、Thread
創建線程的兩種方式:
示例1:
import time from threading import Thread def func(i): time.sleep(1) print("hello : %s"%i) thread_l = [] # 開啟多線程 for i in range(10): t = Thread(target=func,args=(i,)) #實例化線程對象 t.start() # 激活線程 thread_l.append(t) # 異步開啟阻塞 for j in thread_l: j.join() # 阻塞主線程,子線程執行完畢之後向下執行主線程代碼 print("主線程")
結果:
hello : 2
hello : 0
hello : 1
hello : 3
hello : 5
hello : 4
hello : 7hello : 6hello : 9
hello : 8
主線程
示例2:使用類繼承的方式創建線程
import time from threading import Thread class MyThread(Thread): # 繼承Thread類 count = 0 # 子線程間會共享此靜態屬性 def __init__(self,arg1,arg2): # 通過init方法傳遞參數 super().__init__() self.arg1 = arg1 self.arg2 = arg2 def run(self): # 必須實現run方法 MyThread.count += 1 time.sleep(1) print("%s,hello!%s"%(self.arg1,self.arg2)) thread_l = [] for i in range(10): t = MyThread(‘eric‘,‘jonny‘) t.start() thread_l.append(t) for j in thread_l: j.join() print("conut: %s"%MyThread.count) 結果: 1,hello!jonny 0,hello!jonny 5,hello!jonny4,hello!jonny 3,hello!jonny 2,hello!jonny 6,hello!jonny9,hello!jonny 7,hello!jonny 8,hello!jonny conut: 10
Thread的主要方法:
t.start() :激活線程
t.join():阻塞(等待子線程執行完畢,在向下執行),在每次激活線程後阻塞會使線程變為同步,所以要在線程激活完畢之後阻塞。
t.name :設置或獲取線程名
t.getName():獲取線程名
t.setName(NAME):設置線程名
t.is_alive() :判斷線程是否激活
t.setDaemon() :設置守護線程,在激活線程之前設置,默認值為False
t.isDaemon() : 判斷是否為守護線程
2、Lock與RLock
同一個進程內的線程是數據共享的,線程的GIL(全局解釋性)鎖是鎖的線程調用CPU的時間,在第一個線程調用CPU操作共享數據的時候,時間輪轉至第二個線程,第二個線程也要操作共享數據,這樣就導致了數據的不一致,這是因為GIL不鎖數據,這種情況下,線程鎖的出現就能解決這個這個問題。
示例1:GIL鎖發揮作用
from threading import Thread
def func():
global n
n -= 1
n = 1000
thread_l = []
for i in range(100):
t = Thread(target=func)
t.start()
thread_l.append(t)
for j in thread_l:
j.join()
print(n)
結果是:900
示例2:時間片輪轉,GIL鎖失效
import time
from threading import Thread
def func():
global n
# n -= 1
temp = n # 從進程中獲取n
time.sleep(0.01) # 每個線程調用CPU的時間是很短的,制造時間片輪轉的效果
n = temp -1 # 得到結果,再將修改過的數據返回給進程
n = 1000
thread_l = []
for i in range(100):
t = Thread(target=func)
t.start()
thread_l.append(t)
for j in thread_l:
j.join()
print(n)
結果是:998
示例3:互斥鎖Lock,對數據加鎖
import time
from threading import Thread
from threading import Lock
def func():
global n
# n -= 1
lock.acquire() # 上鎖
temp = n # 從進程中獲取n
time.sleep(0.01) # 每個線程調用CPU的時間是很短的,制造時間片輪轉的效果
n = temp -1 # 得到結果,再將修改過的數據返回給進程
lock.release() # 釋放
n = 1000
lock = Lock() # 實例化鎖對象
thread_l = []
for i in range(100):
t = Thread(target=func)
t.start()
thread_l.append(t)
for j in thread_l:
j.join()
print(n)
結果是:900
互斥鎖Lock後使數據一致,具有安全性,但是也帶來了新的問題,因為鎖的緣故,每一個線程都是串行的拿到鎖,在釋放;整個程序運行變成串行,效率降低。
示例4:遞歸鎖Rlock
Lock在同一線程中只能被acquire一次,下一次的acquire必須等待release之後才可以;而RLock允許在同一線程中被多次acquire,但是acquire和release必須是成對存在。
from threading import Lock
from threading import RLock
lock = Lock() # 實例化出互斥鎖對象
lock.acquire()
lock.acquire() # 在第二次acquire時,程序阻塞等待release之後拿到鎖
print("死鎖")
lock.release()
lock.release()
lock1 = RLock() # 實例化出遞歸鎖對象
lock1.acquire()
lock1.acquire() # 可被多次acquire
print("running")
lock1.release()
lock1.release() # acquire與release成對出現
在多線程並發的情況下,同一個線程中,如果出現多次acquire,就可能發生死鎖現象,使用RLock就不會出現死鎖問題
3、Semaphore
線程的信號量與進程的信號量使用基本一致;信號量可以允許指定數量的線程操作上鎖的數據,即一把鎖有多個鑰匙。對與有信號量限制的程序來說,信號量有幾個任務就開啟幾個線程,在加鎖階段會限制程序運行數量,並不影響其它代碼的並發。
示例:
import random
import time
from threading import Semaphore
from threading import Thread
def sing(i,sem):
sem.acquire() # 加鎖
print(‘%s enter the ktv‘%i)
time.sleep(random.randint(1,10))
print(‘%s leave the ktv‘%i)
sem.release() # 釋放
sem = Semaphore(4)
for i in range(20):
t = Thread(target=sing, args=(i, sem))
t.start()
4、Event
事件:線程之間狀態標記通信,使用方法與進程的基本一致
主要方法:
e = Event() # 實例化一個事件對象
e.set() # 標記變為非阻塞
e.wait() # 默認標記為阻塞,在等待的過程中,遇到非阻塞信號就繼續執行
e.clear() # 標記變為阻塞
e.is_set() # 是否阻塞 True就是非阻塞,False是阻塞
示例:連接數據庫
‘‘‘
連接數據庫
每0.5秒連一次,連接三次
用事件來標誌數據庫的連接情況
如果連接成功,顯示成功
否則報錯
‘‘‘
import time
import random
from threading import Thread
from threading import Event
# 模擬檢查連接,檢查連接成功使事件標誌位為非阻塞
def check_conn():
time.sleep(random.randint(1,3))
e.set()
# 在還沒檢查成功前等待,接到非阻塞信號則連接數據庫
def conn_mysql():
count = 1
while not e.is_set():
if count > 3:
raise TimeoutError
print("嘗試第 %s 次連接" % count)
count += 1
e.wait(0.5)
print("連接成功")
e = Event() # 實例化事件對象
Thread(target=check_conn).start()
Thread(target=conn_mysql).start()
5、Timer
定時器:定時開啟一個線程,執行一個任務
示例:
from threading import Timer
def func():
print("hello")
‘‘‘
必須有兩個參數
第一個是時間,單位為秒
第二個是要執行的函數
‘‘‘
Timer(1,func).start()
6、Condition
條件變量:條件包含遞歸鎖RLock和事件Event中的wait()方法的功能。
五個方法:
acquire(): 遞歸鎖
release(): 釋放鎖
wait(timeout): 等待通知,或者等到設定的超時時間;才會被喚醒繼續運行。wait()必須在已獲得Lock前提下才能調用,否則會觸發RuntimeError異常。
notify(n=1): 通知其他線程,傳入的參數必須時int類型的,那些掛起的線程接到這個通知之後會開始運行,默認是通知一個正等待該condition的線程,最多則喚醒n個等待的線程。notify()必須在已獲得Lock前提下才能調用,否則會觸發RuntimeError。notify()不會主動釋放Lock。
notifyAll(): 如果wait狀態線程比較多,notifyAll的作用就是通知所有線程
示例:
from threading import Condition
from threading import Thread
def run(n):
con.acquire()
con.wait()
print("run the thread: %s"%n)
con.release()
if __name__ == ‘__main__‘:
con = Condition()
for i in range(10):
t = Thread(target=run,args=(i,))
t.start()
while True:
msg = input(">>> ")
if msg == ‘q‘:
break
con.acquire() # 遞歸鎖
if msg == ‘all‘:
con.notify_all() # 放行所有線程
else:
con.notify(int(msg)) # 傳遞信號,放行線程,參數是int類型的
con.release() # 釋放鎖
7、Queue模塊
queue模塊就是線程的隊列,它是數據安全的。
主要方法:
q.put(1) # 將傳入的數據放入隊列
q.get() # 根據對象所屬類的不同,取出隊列中的數據
q.join() # 等隊列為空時,在執行別的操作
q.qsize() # 返回隊列的大小,不一定準確
q.empty() # 隊列為空時,返回True,否則返回False,不一定準確
q.full() # 隊列滿時,返回True,否則返回False,不一定準確
Queue類的使用:先進先出
import queue
q = queue.Queue() # 實例化一個隊列對象,可給出隊列長度,先進先出
q.put(1) # 將傳入的數據放入隊列
q.put(2)
print(q.get()) # 先進先出,取出隊列的第一個值
LifoQueue類的主要方法:後進先出
import queue
lfq = queue.LifoQueue() # 實例化一個對象,可給出長度,後進先出
lfq.put(1)
lfq.put(2)
print(lfq.get()) #後進先出,取出2
PriorityQueue類的主要方法:優先級
import queue
pq = queue.PriorityQueue() # 實例化一個隊列對象,優先級隊列,優先級值越小越優先
pq.put((10,‘a‘))
pq.put((5,‘b‘))
pq.put((1,‘c‘))
pq.put((15,‘d‘))
for i in range(4):
print(pq.get())
結果:
(1, ‘c‘)
(5, ‘b‘)
(10, ‘a‘)
(15, ‘d‘)
8、concurrent模塊之futures
concurrent是用來操作池的模塊,這個模塊可創建進程池和線程池,其使用方法完全一致,統一了入口和方法,使用池更便捷,且python內置,導入便可使用。
主要方法:
submit(FUNC,ARGS):創建線程對象和激活線程,FUNC是要執行的任務,ARGS是參數
shutdown():shutdown方法封裝了close和join方法,調用該方法時,不能在忘池中添加任務,且要等待池中任務執行結束
result():result方法取線程執行的函數返回值
map(FUNC,iter):map方法異步執行,需傳入要執行的任務FUNC,以及一個可叠代對象iter,map方法無返回值
add_done_callback(call):回調函數
示例1:
import time
import random
from concurrent import futures
def func(n):
time.sleep(random.randint(1,3))
print(n)
return n*"*"
thread_pool = futures.ThreadPoolExecutor(20) # 實例化一個線程池對象,一般開啟CPU核數*5
f_l = []
for i in range(10):
t = thread_pool.submit(func,i) # submit方法合並了創建線程對象和激活線程的功能
f_l.append(t)
thread_pool.shutdown() # shutdown方法封裝了close和join方法,調用該方法時,不能在忘池中添加任務,且要等待池中任務執行結束
for j in f_l:
print(j.result()) # result方法取線程執行的函數返回值
示例2:回調
from concurrent import futures
def func(n):
print(n)
return n*"*"
def call(args):
print(args.result())
thread_pool = futures.ThreadPoolExecutor(20)
thread_pool.submit(func,1).add_done_callback(call)
Python線程之threading