1. 程式人生 > >python-->多線程

python-->多線程

最重要的 正常 指針 程序執行效率 ner 信號量 ram 並發 單位

線程簡介

線程是CPU分配資源的基本單位。但一個程序開始運行,這個程序就變成了一個進程,而一個進程相當於一個或者多個線程。當沒有多線程編程時,一個進程也是一個主線程,但有多線程編程時,一個進程包含多個線程和一個主線程。使用線程可以實現程序的並發。

線程特點

  • 每個獨立的線程有一個程序運行的入口、順序執行序列和程序的出口。但是線程不能夠獨立執行,必須依存在應用程序中,由應用程序提供多個線程執行控制。
  • 每個線程都有他自己的一組CPU寄存器,稱為線程的上下文,該上下文反映了線程上次運行該線程的CPU寄存器的狀態。
  • 指令指針和堆棧指針寄存器是線程上下文中兩個最重要的寄存器,線程總是在進程得到上下文中運行的,這些地址都用於標誌擁有線程的進程地址空間中的內存。
  • 線程可以被搶占(中斷)。
  • 在其他線程正在運行時,線程可以暫時擱置(也稱為睡眠) -- 這就是線程的退讓。

多線程優勢

  • 使用線程可以把占據長時間的程序中的任務放到後臺去處理。
  • 用戶界面可以更加吸引人,比如用戶點擊了一個按鈕去觸發某些事件的處理,可以彈出一個進度條來顯示處理的進度
  • 程序的運行速度可能加快
  • 在一些等待的任務實現上如用戶輸入、文件讀寫和網絡收發數據等,線程就比較有用了。在這種情況下我們可以釋放一些珍貴的資源如內存占用等

線程分類

  • 內核線程:由操作系統內核創建和撤銷。
  • 用戶線程:不需要內核支持而在用戶程序中實現的線程。

線程模塊

  • _thread
  • threading(推薦使用)

更多多線程理論參考:http://www.cnblogs.com/linhaifeng/articles/7430082.html

線程開啟

方式一:創建線程要執行的函數,把這個函數傳遞進Thread對象裏,讓它來執行

技術分享圖片
from threading import Thread
import time

def test(param):
    time.sleep(2)
    print(Hello %s%param)

if __name__ == __main__:
    t=Thread(target=test,args=(World,))
    t.start()
    
print(main threading)
View Code

方式二:繼承thread類,重寫run方法與Java的多線程非常類似

技術分享圖片
from threading import Thread
import time

class Test_Thread(Thread):
    def __init__(self,param):
        super().__init__()
        self.param=param
    def run(self):
        time.sleep(2)
        print(Hello %s % self.param)


if __name__ == __main__:
    t = Test_Thread(Python)
    t.start()
    print(main Threading)
View Code

線程方法

Thread實例對象的方法
isAlive(): 返回線程是否活動的。
getName(): 返回線程名。
setName(): 設置線程名。

threading模塊提供的一些方法:
threading.currentThread(): 返回當前的線程變量。
threading.enerate(): 返回一個包含正在運行的線程的list。正在運行指線程啟動後、結束前,不包括啟動前和終止後的線程。
threading.activeCount(): 返回正在運行的線程數量,與len(threading.enerate())有相同的結果。
    

join()方法

主線程x中,創建了子線程y,並且在主線程x中調用了y.join(),則主線程x會在調用的地方等待,直到子線程y完成操作後,才會接著往下執行。(對主線程來說,運行完畢指的是主線程所在的進程內所有非守護線程統統運行完畢,主線程才算運行完畢主線程等待子線程結束)

技術分享圖片
from threading import Thread
import time

def test(param):
    time.sleep(2)
    print(Hello %s %param)

if  __name__ == __main__:
    t=Thread(target=test,args=(Python,))
    t.start()
    t.join()
    print(main threading)
    print(t.is_alive())
View Code

setDaemon()方法

主線程x中創建了子線程y,並且在主線程x中調用了y.setDaemon(),則子線程y設置為守護線程,如果主線程x執行束,不論子線程y是否完成,一並和主線程x退出.
ps:必須在start() 方法調用之前設置,如果不設置為守護線程,程序會被無限掛起。(主線程不等待子線程結束即守護線程)

技術分享圖片
from threading import Thread
import time

def test(param):
    time.sleep(2)
    print(Hello %s %param)

if __name__ == __main__:
    t=Thread(target=sayhi,args=(World,))
    # 必須在t.start()之前設置
    t.setDaemon(True)
    t.start()
    print(main threding)
    print(t.is_alive())
View Code

GIL VS Lock

Python已經存在一個GIL但是來保證同一時間只能有一個線程來執行了,但是為什麽這裏還需要lock? 首先需要明確的是鎖的目的是為了保護共享的數據,保證同一時間只能有一個線程來修改共享的數據而且保護不同的數據就應該加不同的鎖。GIL與Lock是兩把鎖,保護的數據不一樣,前者是解釋器級別的(當然保護的就是解釋器級別的數據,比如垃圾回收的數據),後者是保護用戶自己開發的應用程序的數據,很明顯GIL不負責這件事,只能用戶自定義加鎖處理,即Lock。

線程同步

如果多個線程共同對某個數據修改,則可能出現不可預料的結果,為了保證數據的正確性,需要對多個線程進行同步。使用 Thread 對象的 Lock 和 Rlock 可以實現簡單的線程同步,這兩個對象都有 acquire 方法和 release 方法,對於那些需要每次只允許一個線程操作的數據,可以將其操作放到 acquire 和 release 方法之間即可

同步鎖

鎖通常被用來實現對共享資源的同步訪問。為每一個共享資源創建一個Lock對象,當你需要訪問該資源時,調用acquire方法來獲取鎖對象(如果其它線程已經獲得了該鎖,則當前線程需等待其被釋放),待資源訪問完後,再調用release方法釋放鎖

import threading

R=threading.Lock()

R.acquire()
‘‘‘
對公共數據的操作
‘‘‘
R.release()
技術分享圖片
#並發執行變成串行,犧牲了執行效率保證了數據安全
from threading import Thread,Lock
import os,time

def work():
    global nbers
    lock.acquire()
    temp=nbers
    time.sleep(0.1)
    nbers=temp-1
    lock.release()
if __name__ == __main__:
    lock=Lock()
    nbers=66
    list=[]
    for i in range(66):
        t=Thread(target=work)
        list.append(t)
        t.start()
    for t in list:
        t.join()
    print(nbers)
View Code
過程分析:
第一步:66個線程去搶GIL鎖,即搶執行權限
第二部:肯定有一個線程先搶到GIL(暫且稱為線程一),然後開始執行,一旦執行就會拿到lock.acquire()
第三步:極有可能線程一還未運行完畢,就有另外一個線程二搶到GIL,然後開始運行,但線程二發現互斥鎖lock還未被線程一釋放,於是阻塞,被迫交出執行權限,即釋放GIL
第四步:直到線程一重新搶到GIL,開始從上次暫停的位置繼續執行,直到正常釋放互斥鎖lock,然後其他的線程再重復以上的過程

互斥鎖與join()方法的區別

加鎖會讓運行變成串行,那麽如果在start之後立即使用join,就不需要加鎖而且也是串行的效果。
但是在start之後立刻使用join,肯定會將100個任務的執行變成串行,毫無疑問,最終n的結果也肯定是0,數據是安全的。
但問題是start之後立即join:任務內的所有代碼都是串行執行的,而加鎖只是加鎖的部分即修改共享數據的部分是串行的,這就是兩者的區別。
如果單從保證數據安全方面,二者都可以實現,但很明顯是加鎖的程序執行效率顯然比使用join()方法更高
技術分享圖片
*不加鎖:並發執行,速度快,但是數據是不安全
from threading import current_thread,Thread,Lock
import os,time
def task():
    global n
    print(%s is running %current_thread().getName())
    temp=n
    time.sleep(0.5)
    n=temp-1


if __name__ == __main__:
    n=100
    lock=Lock()
    threads=[]
    start_time=time.time()
    for i in range(100):
        t=Thread(target=task)
        threads.append(t)
        t.start()
    for t in threads:
        t.join()

    stop_time=time.time()
    print(主:%s n:%s %(stop_time-start_time,n))

----------------------------------------------------
Thread-1 is running
Thread-2 is running
......
Thread-100 is running
主:0.5216062068939209 n:99



*不加鎖:未加鎖部分並發執行,加鎖部分串行執行,速度慢,但是數據安全
from threading import current_thread,Thread,Lock
import os,time
def task():
    #未加鎖的代碼並發運行
    time.sleep(3)
    print(%s start to run %current_thread().getName())
    global n
    #加鎖的代碼串行運行
    lock.acquire()
    temp=n
    time.sleep(0.5)
    n=temp-1
    lock.release()

if __name__ == __main__:
    n=100
    lock=Lock()
    threads=[]
    start_time=time.time()
    for i in range(100):
        t=Thread(target=task)
        threads.append(t)
        t.start()
    for t in threads:
        t.join()
    stop_time=time.time()
    print(主:%s n:%s %(stop_time-start_time,n))

----------------------------------------------------------
Thread-1 is running
Thread-2 is running
......
Thread-100 is running
主:53.294203758239746 n:0



*使用join()方法,數據安全,但是串行執行
from threading import current_thread,Thread,Lock
import os,time
def task():
    time.sleep(3)
    print(%s start to run %current_thread().getName())
    global n
    temp=n
    time.sleep(0.5)
    n=temp-1


if __name__ == __main__:
    n=100
    lock=Lock()
    start_time=time.time()
    for i in range(100):
        t=Thread(target=task)
        t.start()
        t.join()
    stop_time=time.time()
    print(主:%s n:%s %(stop_time-start_time,n))

-----------------------------------------
Thread-1 start to run
Thread-2 start to run
......
Thread-100 start to run
主:350.6937336921692 n:0 #耗時是多麽的恐怖
View Code

死鎖

是指兩個或兩個以上的進程或線程在執行過程中,因爭奪資源而造成的一種互相等待的現象,如果沒有外力進行幹預則程序就無法繼續執行,此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程就稱為死鎖進程

技術分享圖片
from threading import Thread,Lock
import time
X=Lock()
Y=Lock()

class Lock_Thread(Thread):
    def run(self):
        self.actionX()
        self.actionY()
    def actionX(self):
        X.acquire()
        print(\033[41m%s 拿到X鎖\033[0m %self.name)

        Y.acquire()
        print(\033[42m%s 拿到Y鎖\033[0m %self.name)
        Y.release()

        X.release()

    def actionY(self):
        Y.acquire()
        print(\033[43m%s 拿到Y鎖\033[0m %self.name)
        time.sleep(2)

        X.acquire()
        print(\033[44m%s 拿到X鎖\033[0m %self.name)
        X.release()

        Y.release()

if __name__ == __main__:
    for i in range(10):
        t=Lock_Thread()
        t.start()
----------------------------------------------------------
Thread-1 拿到X鎖
Thread-1 拿到Y鎖
Thread-1 拿到Y鎖
Thread-2 拿到X鎖
然後就卡住,死鎖了    
View Code

遞歸鎖

即死鎖解決方法,在Python中為了支持在同一線程中多次請求同一資源,python提供了可重入鎖RLock。
這個RLock內部維護著一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源可以被多次require。直到一個線程所有的acquire都被release,
其他的線程才能獲得資源。如果使用RLock代替Lock,則不會發生死鎖.
即X=Y=threading.RLock()如果一個線程拿到鎖,counter加1,該線程內又碰到加鎖的情況,則counter繼續加1,這期間所有其他線程都只能等待,等待該線程釋放所有鎖,
即counter遞減到0為止

技術分享圖片
import  threading
import time


class Lock_Thread(threading.Thread):

    def actionX(self):

        r_lcok.acquire() #count=1
        print(self.name,"gotX",time.ctime())
        time.sleep(2)
        r_lcok.acquire() #count=2

        print(self.name, "gotY", time.ctime())
        time.sleep(1)

        r_lcok.release() #count=1
        r_lcok.release() #count=0


    def actionY(self):

        r_lcok.acquire()
        print(self.name, "gotY", time.ctime())
        time.sleep(2)

        r_lcok.acquire()
        print(self.name, "gotX", time.ctime())
        time.sleep(1)

        r_lcok.release()
        r_lcok.release()


    def run(self):

        self.actionX()
        self.actionY()


if __name__ == __main__:


    r_lcok=threading.RLock()
    L=[]

    for i in range(5):
        t=Lock_Thread()
        t.start()
        L.append(t)


    for i in L:
        i.join()

    print("ending....")
View Code

信號量Semahpore

Semaphore管理一個內置的計數器,每當調用acquire()時內置計數器-1;調用release() 時內置計數器+1;計數器不能小於0;當計數器為0時,acquire()將阻塞線程直到其他線程調用release()
ps:信號量與進程池是完全不同的概念,進程池Pool(4),最大只能產生4個進程,而且從頭到尾都只是這四個進程,不會產生新的,而信號量是產生一堆線程/進程
互斥鎖與信號量推薦博客:http://url.cn/5DMsS9r

技術分享圖片
*方法一
import threading
import time

semaphore = threading.Semaphore(6)

def func():
    if semaphore.acquire():
        print (threading.currentThread().getName() +  get semaphore)
        time.sleep(2)
        semaphore.release()

for i in range(20):
  t = threading.Thread(target=func)
  t.start()
  
---------------------------------------------

*方法二
from threading import Thread,Semaphore
import threading
import time

def func():
    sm.acquire()
    print(%s get sm %threading.current_thread().getName())
    time.sleep(3)
    sm.release()
if __name__ == __main__:
    sm=Semaphore(6)
    for i in range(26):
        t=Thread(target=func)
        t.start()
View Code

Event(標誌位)

線程的一個關鍵特性是每個線程都是獨立運行且狀態不可預測。如果程序中的其它線程需要通過判斷某個線程的狀態來確定自己下一步的操作,這時線程同步問題就 會變得非常棘手。為了解決這些問題,我們需要使用threading庫中的Event對象。 對象包含一個可由線程設置的信號標誌,它允許線程等待某些事件的發生。在 初始情況下,Event對象中的信號標誌被設置為假。如果有線程等待一個Event對象, 而這個Event對象的標誌為假,那麽這個線程將會被一直阻塞直至該標誌為真。一個線程如果將一個Event對象的信號標誌設置為真,它將喚醒所有等待這個Event對象的線程。如果一個線程等待一個已經被設置為真的Event對象,那麽它將忽略這個事件, 繼續執行

Event方法

event.isSet():返回event的狀態值;
event.wait():如果 event.isSet()==False將阻塞線程;
event.set(): 設置event的狀態值為True,所有阻塞池的線程激活進入就緒狀態, 等待操作系統調度;
event.clear():恢復event的狀態值為False

技術分享圖片

技術分享圖片
舉例:多個工作線程嘗試鏈接MySQL,如果想要在鏈接前確保MySQL服務正常才讓那些工作線程去連接MySQL服務器,若連接不成功,都會去嘗試重新連接。那麽就可以采用threading.Event機制來協調各個工作線程的連接操作

from threading import Thread,Event
import threading
import time,random
def conn_mysql():
    count=1
    while not event.is_set():
        if count > 3:
            raise TimeoutError(鏈接超時)
        print(<%s>第%s次嘗試鏈接 % (threading.current_thread().getName(), count))
        event.wait(0.5)
        count+=1
    print(<%s>鏈接成功 %threading.current_thread().getName())


def check_mysql():
    print(\033[45m[%s]正在檢查mysql\033[0m % threading.current_thread().getName())
    time.sleep(random.randint(2,4))
    event.set()
if __name__ == __main__:
    event=Event()
    connX=Thread(target=conn_mysql)
    connY=Thread(target=conn_mysql)
    check=Thread(target=check_mysql)

    connX.start()
    connY.start()
    check.start()
View Code

線程隊列Queue

Python 的 Queue 模塊中提供了同步的、線程安全的隊列類,包括FIFO(先入先出)隊列Queue,LIFO(後入先出)隊列LifoQueue,和優先級隊列 PriorityQueue。
而且這些隊列都實現了鎖原語,能夠在多線程中直接使用,可以使用隊列來實現線程間的同步。

Queue 方法

Queue.qsize() 返回隊列的大小
Queue.empty() 如果隊列為空,返回True,反之False
Queue.full() 如果隊列滿了,返回True,反之False
Queue.full 與 maxsize 大小對應
Queue.get([block[, timeout]])獲取隊列,timeout等待時間
Queue.get_nowait() 相當Queue.get(False)
Queue.put(item) 寫入隊列,timeout等待時間
Queue.put_nowait(item) 相當Queue.put(item, False)
Queue.task_done() 在完成一項工作之後,Queue.task_done()函數向任務已經完成的隊列發送一個信號
Queue.join() 實際上意味著等到隊列為空,再執行別的操作
技術分享圖片
先進先出
import queue

q=queue.Queue()
q.put(first)
q.put(second)
q.put(third)

print(q.get())
print(q.get())
print(q.get())
--------------------
first
second
third


後進先出
import queue

q=queue.LifoQueue()
q.put(first)
q.put(second)
q.put(third)

print(q.get())
print(q.get())
print(q.get())
-------------------
third
second
first

優先級的隊列
import queue

q=queue.PriorityQueue()
#put進入一個元組,元組的第一個元素是優先級(通常是數字,也可以是非數字之間的比較),數字越小優先級越高
q.put((20,a))
q.put((10,b))
q.put((30,c))

print(q.get())
print(q.get())
print(q.get())
-------------------
(10, b)
(20, a)
(30, c)
View Code

生產者與消費者模型

技術分享圖片
import time,random
import queue,threading

q = queue.Queue()

def Producer(name):
  count = 0
  while count <10:
    print("making........")
    time.sleep(5)
    q.put(count)
    print(Producer %s has produced %s baozi.. %(name, count))
    count +=1
    #q.task_done()
    q.join()
    print("ok......")

def Consumer(name):
  count = 0
  while count <10:
        time.sleep(random.randrange(4))
    # if not q.empty():
    #     print("waiting.....")
        #q.join()
        data = q.get()
        print("eating....")
        time.sleep(4)

        q.task_done()
        #print(data)
        print(\033[32;1mConsumer %s has eat %s baozi...\033[0m %(name, data))
    # else:
    #     print("-----no baozi anymore----")
        count +=1

p = threading.Thread(target=Producer, args=(jack,))
cX = threading.Thread(target=Consumer, args=(tom,))
cY = threading.Thread(target=Consumer, args=(rus,))
cZ = threading.Thread(target=Consumer, args=(kia,))

p.start()
cX.start()
cY.start()
cZ.start()
View Code

定時器Timer

技術分享圖片
from threading import Timer
 
def times():
    print("Hello World")
 
t = Timer(1, times)
# After 1 seconds, "Hello World" will be printed
t.start()
View Code

python-->多線程