1. 程式人生 > >python 線程 threading

python 線程 threading

狀況 eat style 多次 組成 ont listen 正常的 any

一、什麽是線程

線程,有時被稱為輕量進程(Lightweight Process,LWP),是程序執行流的最小單元。一個標準的線程由線程ID,當前指令指針(PC),寄存器集合和堆棧組成。另外,線程是進程中的一個實體,是被系統獨立調度和分派的基本單位,線程自己不擁有系統資源,只擁有一點兒在運行中必不可少的資源,但它可與同屬一個進程的其它線程共享進程所擁有的全部資源。一個線程可以創建和撤消另一個線程,同一進程中的多個線程之間可以並發執行。由於線程之間的相互制約,致使線程在運行中呈現出間斷性。線程也有就緒、阻塞和運行三種基本狀態。就緒狀態是指線程具備運行的所有條件,邏輯上可以運行,在等待處理機;運行狀態是指線程占有處理機正在運行;阻塞狀態是指線程在等待一個事件(如某個信號量),邏輯上不可執行。每一個程序都至少有一個線程,若程序只有一個線程,那就是程序本身。

二、進程與線程

線程與進程的區別可以歸納為以下4點:   1)地址空間和其它資源(如打開文件):進程間相互獨立,同一進程的各線程間共享。某進程內的線程在其它進程不可見。   2)通信:進程間通信IPC,線程間可以直接讀寫進程數據段(如全局變量)來進行通信——需要進程同步和互斥手段的輔助,以保證數據的一致性。   3)調度和切換:線程上下文切換比進程上下文切換要快得多。   4)在多線程操作系統中,進程不是一個可執行的實體。 (進程與線程的簡單解釋:http://www.ruanyifeng.com/blog/2013/04/processes_and_threads.html) 技術分享圖片

三、線程的特點

多線程的操作系統中,通常是在一個進程中包括多個線程,每個線程都是作為利用CPU的基本單位,是花費最小開銷的實體。線程具有以下屬性。   1)輕型實體   線程中的實體基本上不擁有系統資源,只是有一點必不可少的、能保證獨立運行的資源。   線程的實體包括程序、數據和TCB。線程是動態概念,它的動態特性由線程控制塊TCB(Thread Control Block)描述。   2)獨立調度和分派的基本單位。   在多線程OS中,線程是能獨立運行的基本單位,因而也是獨立調度和分派的基本單位。由於線程很“輕”,故線程的切換非常迅速且開銷小(在同一進程中的)。   3)共享進程資源。   線程在同一進程中的各個線程,都可以共享該進程所擁有的資源,這首先表現在:所有線程都具有相同的進程id,這意味著,線程可以訪問該進程的每一個內存資源;此外,還可以訪問進程所擁有的已打開文件、定時器、信號量機構等。由於同一個進程內的線程共享內存和文件,所以線程之間互相通信不必調用內核。   4)可並發執行。   在一個進程中的多個線程之間,可以並發執行,甚至允許在一個進程中所有線程都能並發執行;同樣,不同進程中的線程也能並發執行,充分利用和發揮了處理機與外圍設備並行工作的能力。

四、內存中的線程

多個線程共享同一個進程的地址空間中的資源,是對一臺計算機上多個進程的模擬,有時也稱線程為輕量級的進程。而對一臺計算機上多個進程,則共享物理內存、磁盤、打印機等其他物理資源。多線程的運行也多進程的運行類似,是cpu在多個線程之間的快速切換。不同的進程之間是充滿敵意的,彼此是搶占、競爭cpu的關系,如果迅雷會和QQ搶資源。而同一個進程是由一個程序員的程序創建,所以同一進程內的線程是合作關系,一個線程可以訪問另外一個線程的內存地址,大家都是共享的,一個線程幹死了另外一個線程的內存,那純屬程序員腦子有問題。類似於進程,每個線程也有自己的堆棧,不同於進程,線程庫無法利用時鐘中斷強制線程讓出CPU,可以調用thread_yield運行線程自動放棄cpu,讓另外一個線程運行。   線程通常是有益的,但是帶來了不小程序設計難度,線程的問題是:   1. 父進程有多個線程,那麽開啟的子線程是否需要同樣多的線程   2. 在同一個進程中,如果一個線程關閉了文件,而另外一個線程正準備往該文件內寫內容呢?   因此,在多線程的代碼中,需要更多的心思來設計程序的邏輯、保護程序的數據。 技術分享圖片

五、全局解釋器鎖GIL

Python代碼的執行由Python虛擬機(也叫解釋器主循環)來控制。Python在設計之初就考慮到要在主循環中,同時只有一個線程在執行。雖然 Python 解釋器中可以“運行”多個線程,但在任意時刻只有一個線程在解釋器中運行。   對Python虛擬機的訪問由全局解釋器鎖(GIL)來控制,正是這個鎖能保證同一時刻只有一個線程在運行。   在多線程環境中,Python 虛擬機按以下方式執行:   a、設置 GIL;   b、切換到一個線程去運行;   c、運行指定數量的字節碼指令或者線程主動讓出控制(可以調用 time.sleep(0));   d、把線程設置為睡眠狀態;   e、解鎖 GIL;   d、再次重復以上所有步驟。   在調用外部代碼(如 C/C++擴展函數)的時候,GIL將會被鎖定,直到這個函數結束為止(由於在這期間沒有Python的字節碼被運行,所以不會做線程切換)編寫擴展的程序員可以主動解鎖GIL。 點: 鎖的是線程,不是數據 同一時刻只能有一個線程訪問cpu 高CPU:計算類---高CPU利用率 (適合多進程) 高IO :爬蟲,聊天類 send recv,處理web請求,讀寫數據庫 (適合多線程)

六、python線程模塊

Python提供了幾個用於多線程編程的模塊,包括thread、threading和Queue等。thread和threading模塊允許程序員創建和管理線程。thread模塊提供了基本的線程和鎖的支持,threading提供了更高級別、功能更強的線程管理的功能。Queue模塊允許用戶創建一個可以用於多個線程之間共享數據的隊列數據結構。   避免使用thread模塊,因為更高級別的threading模塊更為先進,對線程的支持更為完善,而且使用thread模塊裏的屬性有可能會與threading出現沖突;其次低級別的thread模塊的同步原語很少(實際上只有一個),而threading模塊則有很多;再者,thread模塊中當主線程結束時,所有的線程都會被強制結束掉,沒有警告也不會有正常的清除工作,至少threading模塊能確保重要的子線程退出後進程才退出。   thread模塊不支持守護線程,當主線程退出時,所有的子線程不論它們是否還在工作,都會被強行退出。而threading模塊支持守護線程,守護線程一般是一個等待客戶請求的服務器,如果沒有客戶提出請求它就在那等著,如果設定一個線程為守護線程,就表示這個線程是不重要的,在進程退出的時候,不用等待這個線程退出。

七、threading模塊

  1. 創建線程
    創建線程,方法1
    from threading import Thread
    import time
    
    def func(n):
        time.sleep(1)
        print(n)
    
    for i in range(10):
        Thread(target=func, args=(i,)).start()
    
    創建線程,方法2
    from threading import Thread
    import time
    
    class MyThread(Thread):
        def __init__(self,arg):
            super().__init__()
            self.arg = arg
    
        def run(self):
            time.sleep(1)
            print(self.arg)
    
    for i in range(10):
        MyThread(i).start()

  2. 例子(多線程實現TCP socket連接)
    #服務端:
    from threading import Thread
    import socket
    
    sk = socket.socket()
    s = (127.0.0.1, 8081)
    sk.bind(s)
    sk.listen()
    
    def func(conn):
        while 1:
            se = input(>>>)
            if se == q:
                break
            conn.send(se.encode(utf-8))
            ret = conn.recv(1024).decode(utf-8)
            print(ret)
        conn.close()
    
    while 1:
        conn,addr = sk.accept()
        Thread(target=func, args=(conn,)).start()
    
    #客戶端
    import socket
    
    sk = socket.socket()
    sk.connect((127.0.0.1,8081))
    
    ret = sk.recv(1024).decode(utf-8)
    print(ret)
    
    c = input().encode(utf-8)
    sk.send(c)
    
    sk.close()

  3. 其他方法
    Thread實例對象的方法
      # isAlive(): 返回線程是否活動的。
      # getName(): 返回線程名。
      # setName(): 設置線程名。
    
    threading模塊提供的一些方法:
      # threading.currentThread(): 返回當前的線程變量。
      # threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啟動後、結束前,不包括啟動前和終止後的線程。
      # threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。
  4. 守護線程
    守護線程
    #1 主進程在其代碼結束後就已經算運行完畢了(守護進程在此時就被回收),然後主進程會一直等非守護的子進程都運行完畢後回收子進程的資源(否則會產生僵屍進程),才會結束,
    #2 主線程在其他非守護線程運行完畢後才算運行完畢(守護線程在此時就被回收)。因為主線程的結束意味著進程的結束,進程整體的資源都將被回收,而進程必須保證非守護線程都運行完畢後才能結束。
    
    from threading import Thread
    import time
    
    def sayhi(name):
        time.sleep(2)
        print(%s say hello %name)
    
    if __name__ == __main__:
        t=Thread(target=sayhi,args=(hsr,))
        #t.setDaemon(True) #必須在t.start()之前設置
        t.start()
        print(主線程)
        print(t.is_alive())
    
    註釋t.setDaemon(True) 即不設置為守護線程,結果:
    主線程
    True
    hsr say hello
    
    設置為守護線程,結果:
    主線程
    True

  5. 線程鎖
    #當多個線程同時去拿同一個數據時,造成數據的不安全
    import time
    from threading import Thread,Lock
    def eat():
        global n
        temp = n
        time.sleep(1)
        n = temp - 1
    n = 10
    t_lst = []
    for i in range(10):
        t = Thread(target=eat)
        t.start()
        t_lst.append(t)
    [t.join() for t in t_lst]
    print(n)
    
    #通過加鎖保證安全性
    import time
    from threading import Thread,Lock
    def eat(l):
        global n
        l.acquire()
        temp = n
        time.sleep(1)
        n = temp - 1
        l.release()
    n = 10
    t_lst = []
    l = Lock()
    for i in range(10):
        t = Thread(target=eat,args=(l,))
        t.start()
        t_lst.append(t)
    [t.join() for t in t_lst]
    print(n)

  6. 死鎖
    指兩個或兩個以上的進程或線程在執行過程中,因爭奪資源而造成的一種互相等待的現象,若無外力作用,它們都將無法推進下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱為死鎖進程
    
    例子:
    import time
    from threading import Thread,Lock
    
    noodle_lock = Lock()
    fork_lock = Lock()
    
    def e1(name):
        noodle_lock.acquire()
        print(%s,拿到面條%name)
        time.sleep(1)
        fork_lock.acquire()
        print(%s,拿到叉子%name)
        print(nice%name)
        fork_lock.release()
        noodle_lock.release()
    
    def e2(name):
        fork_lock.acquire()
        print(%s,拿到叉子%name)
        time.sleep(1)
        noodle_lock.acquire()
        print(%s,拿到面條 % name)
        print(nice%name)
        noodle_lock.release()
        fork_lock.release()
    
    Thread(target=e1,args=(hsr,)).start()
    Thread(target=e1,args=(張三,)).start()
    Thread(target=e2,args=(李四,)).start()
    Thread(target=e2,args=(王五,)).start()

  7. 遞歸鎖
    Lock ->互斥鎖
    RLock ->遞歸鎖
    
    這個RLock內部維護著一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源可以被多次require。直到一個線程所有的acquire都被release,其他的線程才能獲得資源。
    
    import time
    from threading import Thread,RLock
    
    lock = RLock()
    
    def e1(name):
        lock.acquire()
        print(%s,拿到面條%name)
        time.sleep(1)
        lock.acquire()
        print(%s,拿到叉子%name)
        print(%snice%name)
        lock.release()
        lock.release()
    
    def e2(name):
        lock.acquire()
        print(%s,拿到叉子%name)
        time.sleep(1)
        lock.acquire()
        print(%s,拿到面條 % name)
        print(%snice%name)
        lock.release()
        lock.release()
    
    Thread(target=e1,args=(hsr,)).start()
    Thread(target=e1,args=(張三,)).start()
    Thread(target=e2,args=(李四,)).start()
    Thread(target=e2,args=(王五,)).start()

  8. 信號量
    from threading import Semaphore,Thread
    import time
    def func(a,b,s):
        time.sleep(1)
        s.acquire()
        print(a+b)  #同一時間只能由4個線程執行這個代碼
        s.release()
    sem = Semaphore(4)
    for i in range(10):
        t = Thread(target=func,args=(i,i+5,sem))
        t.start()

  9. 事件
    以模擬檢測網絡狀況,連接數據庫
    from threading import Event,Thread
    import time,random
    
    def check_net(e):
        time.sleep(random.randint(0,5))
        e.set()
    
    def connect_db(e):
        count = 0
        while True:
            if count >2:
                raise TimeoutError
            e.wait(1)   #狀態為False,只等2s
            if e.is_set() == True:
                print(----connect----)
                break
            else:
                count += 1
                print(error)
    
    e = Event()
    
    t1 = Thread(target=connect_db,args=(e,))
    t2 = Thread(target=check_net,args=(e,))
    t1.start()
    t2.start()

  10. 條件
    #一個條件被創建時,默認False狀態
    #False會讓wait處於等待狀態
    #notify相當於制造幾把鑰匙
    #使用鑰匙不會歸還
    from threading import Condition,Thread
    import time,random
    
    def func(c,i):
        #wait要先acquire
        c.acquire()
        c.wait()
        print(在第%s個循環裏%i)
        c.release()
    
    c = Condition()
    
    for i in range(10):
        t = Thread(target=func, args=(c,i))
        t.start()
    
    while 1:
        num = int(input(>>>))
        #執行notify要先acquire
        c.acquire()
        c.notify(num)
        c.release()

  11. 定時器
    from threading import Timer
    
    def func():
        print(時間同步)
    Timer(2,func).start()    #兩秒後打印“時間同步”

  12. 線程隊列
    #3種隊列
    #queue普通隊列
    #queue.LifoQueue先進後出隊列(棧)
    #queue.PriorityQueue優先級隊列
    
    queue.PriorityQueue()    #優先級隊列
    q.put((20,1))
    q.put((10,2))
    q.put((30,4))
    
    q.get()#10
    q.get()#1
    q.get()#4
    #優先級一樣按ascii碼排

  13. 線程池
    concurrent.futures
    
    
    #1 介紹
    concurrent.futures模塊提供了高度封裝的異步調用接口
    ThreadPoolExecutor:線程池,提供異步調用
    ProcessPoolExecutor: 進程池,提供異步調用
    Both implement the same interface, which is defined by the abstract Executor class.
    
    #2 基本方法
    #submit(fn, *args, **kwargs)
    異步提交任務
    
    #map(func, *iterables, timeout=None, chunksize=1)
    取代for循環submit的操作
    
    #shutdown(wait=True)
    相當於進程池的pool.close()+pool.join()操作
    wait=True,等待池內所有任務執行完畢回收完資源後才繼續
    wait=False,立即返回,並不會等待池內的任務執行完畢
    但不管wait參數為何值,整個程序都會等到所有任務執行完畢
    submit和map必須在shutdown之前
    
    #result(timeout=None)
    取得結果
    
    #add_done_callback(fn)
    回調函數
    
    # done()
    判斷某一個線程是否完成
    
    # cancle()
    取消某個任務
    
    
    
    3.例子:
    from concurrent.futures import ThreadPoolExecutor
    import time
    def func(n):
        time.sleep(2)
        print(n)
        return n*n
    tpool = ThreadPoolExecutor(max_workers=20)  #不要超過cpu個數*5
    t_lst = []
    for i in range(50):
        t = tpool.submit(func,i)    #提交任務
        t_lst.append(t)
    tpool.shutdown()    #close + join
    print(主線程)
    for t in t_lst:print(t.result())    #獲取返回值
    
    
    
    4.回調函數
    from concurrent.futures import ThreadPoolExecutor
    import time
    def func(n):
        time.sleep(2)
        print(n)
        return n*n
    def call_back(m):
        print(result:%s%m.result())
    tpool = ThreadPoolExecutor(max_workers=5)  #不要超過cpu個數*5
    for i in range(20):
        t = tpool.submit(func,i).add_done_callback(call_back)    #提交任務

python 線程 threading