1. 程式人生 > >python-->多執行緒

python-->多執行緒

執行緒簡介

執行緒是CPU分配資源的基本單位。但一個程式開始執行,這個程式就變成了一個程序,而一個程序相當於一個或者多個執行緒。當沒有多執行緒程式設計時,一個程序也是一個主執行緒,但有多執行緒程式設計時,一個程序包含多個執行緒和一個主執行緒。使用執行緒可以實現程式的併發。

執行緒特點

  • 每個獨立的執行緒有一個程式執行的入口、順序執行序列和程式的出口。但是執行緒不能夠獨立執行,必須依存在應用程式中,由應用程式提供多個執行緒執行控制。
  • 每個執行緒都有他自己的一組CPU暫存器,稱為執行緒的上下文,該上下文反映了執行緒上次執行該執行緒的CPU暫存器的狀態。
  • 指令指標和堆疊指標暫存器是執行緒上下文中兩個最重要的暫存器,執行緒總是在程序得到上下文中執行的,這些地址都用於標誌擁有執行緒的程序地址空間中的記憶體。
  • 執行緒可以被搶佔(中斷)。
  • 在其他執行緒正在執行時,執行緒可以暫時擱置(也稱為睡眠) -- 這就是執行緒的退讓。  

多執行緒優勢  

  •   使用執行緒可以把佔據長時間的程式中的任務放到後臺去處理。
  •   使用者介面可以更加吸引人,比如使用者點選了一個按鈕去觸發某些事件的處理,可以彈出一個進度條來顯示處理的進度
  •   程式的執行速度可能加快
  •   在一些等待的任務實現上如使用者輸入、檔案讀寫和網路收發資料等,執行緒就比較有用了。在這種情況下我們可以釋放一些珍貴的資源如記憶體佔用等

執行緒分類

  •     核心執行緒:由作業系統核心建立和撤銷。
  •     使用者執行緒:不需要核心支援而在使用者程式中實現的執行緒。

執行緒模組 

  •    _thread
  •    threading(推薦使用)

更多多執行緒理論參考:http://www.cnblogs.com/linhaifeng/articles/7430082.html

執行緒開啟

方式一:建立執行緒要執行的函式,把這個函式傳遞進Thread物件裡,讓它來執行

from threading import
Thread import time def test(param): time.sleep(2) print('Hello %s'%param) if __name__ == '__main__': t=Thread(target=test,args=('World',)) t.start() print('main threading')
View Code

方式二:繼承thread類,重寫run方法與Java的多執行緒非常類似

from threading import Thread
import time

class Test_Thread(Thread):
    def __init__(self,param):
        super().__init__()
        self.param=param
    def run(self):
        time.sleep(2)
        print('Hello %s' % self.param)


if __name__ == '__main__':
    t = Test_Thread('Python')
    t.start()
    print('main Threading')
View Code

執行緒方法

Thread例項物件的方法
isAlive(): 返回執行緒是否活動的。
getName(): 返回執行緒名。
setName(): 設定執行緒名。

threading模組提供的一些方法:
threading.currentThread(): 返回當前的執行緒變數。
threading.enerate(): 返回一個包含正在執行的執行緒的list。正在執行指執行緒啟動後、結束前,不包括啟動前和終止後的執行緒。
threading.activeCount(): 返回正在執行的執行緒數量,與len(threading.enerate())有相同的結果。
    

join()方法

主執行緒x中,建立了子執行緒y,並且在主執行緒x中呼叫了y.join(),則主執行緒x會在呼叫的地方等待,直到子執行緒y完成操作後,才會接著往下執行。(對主執行緒來說,執行完畢指的是主執行緒所在的程序內所有非守護執行緒統統執行完畢,主執行緒才算執行完畢主執行緒等待子執行緒結束)

from threading import Thread
import time

def test(param):
    time.sleep(2)
    print('Hello %s' %param)

if  __name__ == '__main__':
    t=Thread(target=test,args=('Python',))
    t.start()
    t.join()
    print('main threading')
    print(t.is_alive())
View Code

setDaemon()方法

主執行緒x中建立了子執行緒y,並且在主執行緒x中呼叫了y.setDaemon(),則子執行緒y設定為守護執行緒,如果主執行緒x執行束,不論子執行緒y是否完成,一併和主執行緒x退出.
ps:必須在start() 方法呼叫之前設定,如果不設定為守護執行緒,程式會被無限掛起。(主執行緒不等待子執行緒結束即守護執行緒)

from threading import Thread
import time

def test(param):
    time.sleep(2)
    print('Hello %s' %param)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('World',))
    # 必須在t.start()之前設定
    t.setDaemon(True)
    t.start()
    print('main threding')
    print(t.is_alive())
View Code

GIL VS Lock

Python已經存在一個GIL但是來保證同一時間只能有一個執行緒來執行了,但是為什麼這裡還需要lock? 首先需要明確的是鎖的目的是為了保護共享的資料,保證同一時間只能有一個執行緒來修改共享的資料而且保護不同的資料就應該加不同的鎖。GIL與Lock是兩把鎖,保護的資料不一樣,前者是直譯器級別的(當然保護的就是直譯器級別的資料,比如垃圾回收的資料),後者是保護使用者自己開發的應用程式的資料,很明顯GIL不負責這件事,只能使用者自定義加鎖處理,即Lock。

執行緒同步

如果多個執行緒共同對某個資料修改,則可能出現不可預料的結果,為了保證資料的正確性,需要對多個執行緒進行同步。使用 Thread 物件的 Lock 和 Rlock 可以實現簡單的執行緒同步,這兩個物件都有 acquire 方法和 release 方法,對於那些需要每次只允許一個執行緒操作的資料,可以將其操作放到 acquire 和 release 方法之間即可

同步鎖

鎖通常被用來實現對共享資源的同步訪問。為每一個共享資源建立一個Lock物件,當你需要訪問該資源時,呼叫acquire方法來獲取鎖物件(如果其它執行緒已經獲得了該鎖,則當前執行緒需等待其被釋放),待資源訪問完後,再呼叫release方法釋放鎖

import threading

R=threading.Lock()

R.acquire()
'''
對公共資料的操作
'''
R.release()
#併發執行變成序列,犧牲了執行效率保證了資料安全
from threading import Thread,Lock
import os,time

def work():
    global nbers
    lock.acquire()
    temp=nbers
    time.sleep(0.1)
    nbers=temp-1
    lock.release()
if __name__ == '__main__':
    lock=Lock()
    nbers=66
    list=[]
    for i in range(66):
        t=Thread(target=work)
        list.append(t)
        t.start()
    for t in list:
        t.join()
    print(nbers)
View Code
過程分析:
第一步:66個執行緒去搶GIL鎖,即搶執行許可權
第二部:肯定有一個執行緒先搶到GIL(暫且稱為執行緒一),然後開始執行,一旦執行就會拿到lock.acquire()
第三步:極有可能執行緒一還未執行完畢,就有另外一個執行緒二搶到GIL,然後開始執行,但執行緒二發現互斥鎖lock還未被執行緒一釋放,於是阻塞,被迫交出執行許可權,即釋放GIL
第四步:直到執行緒一重新搶到GIL,開始從上次暫停的位置繼續執行,直到正常釋放互斥鎖lock,然後其他的執行緒再重複以上的過程

互斥鎖與join()方法的區別

加鎖會讓執行變成序列,那麼如果在start之後立即使用join,就不需要加鎖而且也是序列的效果。
但是在start之後立刻使用join,肯定會將100個任務的執行變成序列,毫無疑問,最終n的結果也肯定是0,資料是安全的。
但問題是start之後立即join:任務內的所有程式碼都是序列執行的,而加鎖只是加鎖的部分即修改共享資料的部分是序列的,這就是兩者的區別。
如果單從保證資料安全方面,二者都可以實現,但很明顯是加鎖的程式執行效率顯然比使用join()方法更高
*不加鎖:併發執行,速度快,但是資料是不安全
from threading import current_thread,Thread,Lock
import os,time
def task():
    global n
    print('%s is running' %current_thread().getName())
    temp=n
    time.sleep(0.5)
    n=temp-1


if __name__ == '__main__':
    n=100
    lock=Lock()
    threads=[]
    start_time=time.time()
    for i in range(100):
        t=Thread(target=task)
        threads.append(t)
        t.start()
    for t in threads:
        t.join()

    stop_time=time.time()
    print('主:%s n:%s' %(stop_time-start_time,n))

----------------------------------------------------
Thread-1 is running
Thread-2 is running
......
Thread-100 is running
主:0.5216062068939209 n:99



*不加鎖:未加鎖部分併發執行,加鎖部分序列執行,速度慢,但是資料安全
from threading import current_thread,Thread,Lock
import os,time
def task():
    #未加鎖的程式碼併發執行
    time.sleep(3)
    print('%s start to run' %current_thread().getName())
    global n
    #加鎖的程式碼序列執行
    lock.acquire()
    temp=n
    time.sleep(0.5)
    n=temp-1
    lock.release()

if __name__ == '__main__':
    n=100
    lock=Lock()
    threads=[]
    start_time=time.time()
    for i in range(100):
        t=Thread(target=task)
        threads.append(t)
        t.start()
    for t in threads:
        t.join()
    stop_time=time.time()
    print('主:%s n:%s' %(stop_time-start_time,n))

----------------------------------------------------------
Thread-1 is running
Thread-2 is running
......
Thread-100 is running
主:53.294203758239746 n:0



*使用join()方法,資料安全,但是序列執行
from threading import current_thread,Thread,Lock
import os,time
def task():
    time.sleep(3)
    print('%s start to run' %current_thread().getName())
    global n
    temp=n
    time.sleep(0.5)
    n=temp-1


if __name__ == '__main__':
    n=100
    lock=Lock()
    start_time=time.time()
    for i in range(100):
        t=Thread(target=task)
        t.start()
        t.join()
    stop_time=time.time()
    print('主:%s n:%s' %(stop_time-start_time,n))

-----------------------------------------
Thread-1 start to run
Thread-2 start to run
......
Thread-100 start to run
主:350.6937336921692 n:0 #耗時是多麼的恐怖
View Code

死鎖

是指兩個或兩個以上的程序或執行緒在執行過程中,因爭奪資源而造成的一種互相等待的現象,如果沒有外力進行干預則程式就無法繼續執行,此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的程序就稱為死鎖程序

from threading import Thread,Lock
import time
X=Lock()
Y=Lock()

class Lock_Thread(Thread):
    def run(self):
        self.actionX()
        self.actionY()
    def actionX(self):
        X.acquire()
        print('\033[41m%s 拿到X鎖\033[0m' %self.name)

        Y.acquire()
        print('\033[42m%s 拿到Y鎖\033[0m' %self.name)
        Y.release()

        X.release()

    def actionY(self):
        Y.acquire()
        print('\033[43m%s 拿到Y鎖\033[0m' %self.name)
        time.sleep(2)

        X.acquire()
        print('\033[44m%s 拿到X鎖\033[0m' %self.name)
        X.release()

        Y.release()

if __name__ == '__main__':
    for i in range(10):
        t=Lock_Thread()
        t.start()
----------------------------------------------------------
Thread-1 拿到X鎖
Thread-1 拿到Y鎖
Thread-1 拿到Y鎖
Thread-2 拿到X鎖
然後就卡住,死鎖了    
View Code

遞迴鎖 

即死鎖解決方法,在Python中為了支援在同一執行緒中多次請求同一資源,python提供了可重入鎖RLock。
這個RLock內部維護著一個Lock和一個counter變數,counter記錄了acquire的次數,從而使得資源可以被多次require。直到一個執行緒所有的acquire都被release,
其他的執行緒才能獲得資源。如果使用RLock代替Lock,則不會發生死鎖.
即X=Y=threading.RLock()如果一個執行緒拿到鎖,counter加1,該執行緒內又碰到加鎖的情況,則counter繼續加1,這期間所有其他執行緒都只能等待,等待該執行緒釋放所有鎖,
即counter遞減到0為止

import  threading
import time


class Lock_Thread(threading.Thread):

    def actionX(self):

        r_lcok.acquire() #count=1
        print(self.name,"gotX",time.ctime())
        time.sleep(2)
        r_lcok.acquire() #count=2

        print(self.name, "gotY", time.ctime())
        time.sleep(1)

        r_lcok.release() #count=1
        r_lcok.release() #count=0


    def actionY(self):

        r_lcok.acquire()
        print(self.name, "gotY", time.ctime())
        time.sleep(2)

        r_lcok.acquire()
        print(self.name, "gotX", time.ctime())
        time.sleep(1)

        r_lcok.release()
        r_lcok.release()


    def run(self):

        self.actionX()
        self.actionY()


if __name__ == '__main__':


    r_lcok=threading.RLock()
    L=[]

    for i in range(5):
        t=Lock_Thread()
        t.start()
        L.append(t)


    for i in L:
        i.join()

    print("ending....")
View Code

 訊號量Semahpore 

Semaphore管理一個內建的計數器,每當呼叫acquire()時內建計數器-1;呼叫release() 時內建計數器+1;計數器不能小於0;當計數器為0時,acquire()將阻塞執行緒直到其他執行緒呼叫release()
ps:訊號量與程序池是完全不同的概念,程序池Pool(4),最大隻能產生4個程序,而且從頭到尾都只是這四個程序,不會產生新的,而訊號量是產生一堆執行緒/程序
互斥鎖與訊號量推薦部落格:http://url.cn/5DMsS9r

*方法一
import threading
import time

semaphore = threading.Semaphore(6)

def func():
    if semaphore.acquire():
        print (threading.currentThread().getName() + ' get semaphore')
        time.sleep(2)
        semaphore.release()

for i in range(20):
  t = threading.Thread(target=func)
  t.start()
  
---------------------------------------------

*方法二
from threading import Thread,Semaphore
import threading
import time

def func():
    sm.acquire()
    print('%s get sm' %threading.current_thread().getName())
    time.sleep(3)
    sm.release()
if __name__ == '__main__':
    sm=Semaphore(6)
    for i in range(26):
        t=Thread(target=func)
        t.start()
View Code

Event(標誌位)

執行緒的一個關鍵特性是每個執行緒都是獨立執行且狀態不可預測。如果程式中的其它執行緒需要通過判斷某個執行緒的狀態來確定自己下一步的操作,這時執行緒同步問題就 會變得非常棘手。為了解決這些問題,我們需要使用threading庫中的Event物件。 物件包含一個可由執行緒設定的訊號標誌,它允許執行緒等待某些事件的發生。在 初始情況下,Event物件中的訊號標誌被設定為假。如果有執行緒等待一個Event物件, 而這個Event物件的標誌為假,那麼這個執行緒將會被一直阻塞直至該標誌為真。一個執行緒如果將一個Event物件的訊號標誌設定為真,它將喚醒所有等待這個Event物件的執行緒。如果一個執行緒等待一個已經被設定為真的Event物件,那麼它將忽略這個事件, 繼續執行

Event方法

event.isSet():返回event的狀態值;
event.wait():如果 event.isSet()==False將阻塞執行緒;
event.set(): 設定event的狀態值為True,所有阻塞池的執行緒啟用進入就緒狀態, 等待作業系統排程;
event.clear():恢復event的狀態值為False

 

舉例:多個工作執行緒嘗試連結MySQL,如果想要在連結前確保MySQL服務正常才讓那些工作執行緒去連線MySQL伺服器,若連線不成功,都會去嘗試重新連線。那麼就可以採用threading.Event機制來協調各個工作執行緒的連線操作

from threading import Thread,Event
import threading
import time,random
def conn_mysql():
    count=1
    while not event.is_set():
        if count > 3:
            raise TimeoutError('連結超時')
        print('<%s>第%s次嘗試連結' % (threading.current_thread().getName(), count))
        event.wait(0.5)
        count+=1
    print('<%s>連結成功' %threading.current_thread().getName())


def check_mysql():
    print('\033[45m[%s]正在檢查mysql\033[0m' % threading.current_thread().getName())
    time.sleep(random.randint(2,4))
    event.set()
if __name__ == '__main__':
    event=Event()
    connX=Thread(target=conn_mysql)
    connY=Thread(target=conn_mysql)
    check=Thread(target=check_mysql)

    connX.start()
    connY.start()
    check.start()
View Code

執行緒佇列Queue

Python 的 Queue 模組中提供了同步的、執行緒安全的佇列類,包括FIFO(先入先出)佇列Queue,LIFO(後入先出)佇列LifoQueue,和優先順序佇列 PriorityQueue。
而且這些佇列都實現了鎖原語,能夠在多執行緒中直接使用,可以使用佇列來實現執行緒間的同步。

Queue 方法

Queue.qsize() 返回佇列的大小
Queue.empty() 如果佇列為空,返回True,反之False
Queue.full() 如果佇列滿了,返回True,反之False
Queue.full 與 maxsize 大小對應
Queue.get([block[, timeout]])獲取佇列,timeout等待時間
Queue.get_nowait() 相當Queue.get(False)
Queue.put(item) 寫入佇列,timeout等待時間
Queue.put_nowait(item) 相當Queue.put(item, False)
Queue.task_done() 在完成一項工作之後,Queue.task_done()函式向任務已經完成的佇列傳送一個訊號
Queue.join() 實際上意味著等到佇列為空,再執行別的操作
先進先出
import queue

q=queue.Queue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
--------------------
first
second
third


後進先出
import queue

q=queue.LifoQueue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
-------------------
third
second
first

優先順序的佇列
import queue

q=queue.PriorityQueue()
#put進入一個元組,元組的第一個元素是優先順序(通常是數字,也可以是非數字之間的比較),數字越小優先順序越高
q.put((20,'a'))
q.put((10,'b'))
q.put((30,'c'))

print(q.get())
print(q.get())
print(q.get())
-------------------
(10, 'b')
(20, 'a')
(30, 'c')
View Code

生產者與消費者模型

import time,random
import queue,threading

q = queue.Queue()

def Producer(name):
  count = 0
  while count <10:
    print("making........")
    time.sleep(5)
    q.put(count)
    print('Producer %s has produced %s baozi..' %(name, count))
    count +=1
    #q.task_done()
    q.join()
    print("ok......")

def Consumer(name):
  count = 0
  while count <10:
        time.sleep(random.randrange(4))
    # if not q.empty():
    #     print("waiting.....")
        #q.join()
        data = q.get()
        print("eating....")
        time.sleep(4)

        q.task_done()
        #print(data)
        print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data))
    # else:
    #     print("-----no baozi anymore----")
        count +=1

p = threading.Thread(target=Producer, args=('jack',))
cX = threading.Thread(target=Consumer, args=('tom',))
cY = threading.Thread(target=Consumer, args=('rus',))
cZ = threading.Thread(target=Consumer, args=('kia',))

p.start()
cX.start()
cY.start()
cZ.start()
View Code

定時器Timer

from threading import Timer
 
def times():
    print("Hello World")
 
t = Timer(1, times)
# After 1 seconds, "Hello World" will be printed
t.start()
View Code