1. 程式人生 > 其它 >今日學習內容總結3.4

今日學習內容總結3.4

今日學習內容總結

      在昨日的學習中,我們已經學習了通過程式碼實現程序的方式,那就是process模組。對該模組的語法,以及內建方法進行了一個學習。而今天的主要學習內容主要是瞭解執行緒。

訊息佇列

內建佇列

      在之前的學習中我們提到過佇列的特點:先進先出。而python中自帶的佇列模組有四種:

  1. FIFO: 先進先出佇列
  2. LifoQueue: 先進後出佇列
  3. PriorityQueue: 優先佇列
  4. deque: 雙端佇列

      使用方式:

  from queue import Queue
   
  # maxsize設定佇列中,資料上限,小於或等於0則不限制,容器中大於這個數則阻塞,直到佇列中的資料被消掉
  q = Queue(maxsize=0)

      成員函式:

  1. Queue.qsize() 返回佇列的大致大小。
  2. Queue.empty() 如果佇列為空,返回 True 否則返回 False
  3. Queue.full() 如果佇列是滿的返回 True ,否則返回 False 
  4. Queue.put(item, block=True, timeout=None)
      4.1 常用時忽略預設引數,即使用 Queue.put(item)。
      4.2 將 item 放入佇列,如果可選引數 block 是 true 並且 timeout 是 None (預設),則在必要時阻塞至有空閒插槽可用。
      4.3 如果 timeout 是個正數,將最多阻塞 timeout 秒,如果在這段時間沒有可用的空閒插槽,將引發 Full 異常。
      4.4 反之 (block 是 false),如果空閒插槽立即可用,則把 item 放入佇列,否則引發 Full 異常 ( 在這種情況下,timeout 將被忽略)。
  5. Queue.get(block=True, timeout=None)
      5.1 常用時忽略預設引數,即使用 Queue.get()。
      5.2 從佇列中移除並返回一個專案。如果可選引數 block 是 true 並且 timeout 是 None (預設值),則在必要時阻塞至專案可得到。
      5.3 如果 timeout 是個正數,將最多阻塞 timeout 秒,如果在這段時間內專案不能得到,將引發 Empty 異常。反之 (block 是 false),如果一個專案立即可得到,則返回一個專案,否則引發 Empty 異常 (這種情況下,timeout 將被忽略)。 

      使用案例:

from multiprocessing import Queue


q = Queue(5)  # 自定義佇列的長度
# 朝佇列中存放資料
  q.put(111)
  q.put(222)
  q.put(333)
  print(q.full())  # False  判斷佇列是否滿了
  q.put(444)
  q.put(555)
  print(q.full())  # True
  q.put(666)  # 超出最大長度 原地阻塞等待佇列中出現空位
  print(q.get())
  print(q.get())
  print(q.empty())  # False  判斷佇列是否空了
  print(q.get())
  print(q.get())
  print(q.get())
  print(q.empty())  # True
  print(q.get())  # 佇列中沒有值 繼續獲取則阻塞等待佇列中給值
  print(q.get_nowait())  # 佇列中如果沒有值 直接報錯

      full()、empty()、get_nowait()這些方法不能再併發的場景下精確使用,之所以介紹佇列是因為它可以支援程序間資料通訊。

IPC機制

      IPC機制就是程序間的資料互動,可以是主程序與子程序資料互動,也可以是子程序間的資料互動。其實就是在不同記憶體空間中的程序資料互動。例項:

  from multiprocessing import Process, Queue

  def producer(q):
      # print('子程序producer從佇列中取值>>>:', q.get())
      q.put('子程序producer往佇列中新增值')

  def consumer(q):
      print('子程序consumer從佇列中取值>>>:', q.get())


  if __name__ == '__main__':
      q = Queue()
      p = Process(target=producer, args=(q, ))
      p1 = Process(target=consumer, args=(q,))
      p.start()
      p1.start()
      q.put(123)  # 主程序往佇列中存放資料123
      print('主程序')

  # 列印結果
  '''
    # 當往主執行緒佇列中存放資料時
    主程序
    子程序consumer從佇列中取值>>>: 123

    # 當沒有往主執行緒佇列中存放資料時(就是將q.put(123)註釋掉)
    主程序
    子程序consumer從佇列中取值>>>: 子程序producer往佇列中新增值
  '''

執行緒

生產者消費者模型

      在併發程式設計中使用生產者和消費者模式能夠解決絕大多數併發問題。該模式通過平衡生產執行緒和消費執行緒的工作能力來提高程式的整體處理資料的速度。

      什麼是生產者消費者模式呢:

  1. 產生資料的模組稱為生產者。
  2. 處理資料的模組稱為消費者。
  3. 在生產者與消費者之間的緩衝區稱之為倉庫。
  4. 生產者負責往倉庫運輸商品,而消費者負責從倉庫裡取出商品,這就構成了生產者消費者模式。

      生產者消費者模型

      生產者消費者模式的優點:

      1.解耦

      假設生產者和消費者分別是兩個執行緒。如果讓生產者直接呼叫消費者的某個方法,那麼生產者對於消費者就會產生依賴(也就是耦合)。如果未來消費者的程式碼發生變化,可能會影響到生產者的程式碼。而如果兩者都依賴於某個緩衝區,兩者之間不直接依賴,耦合也就相應降低了。

      生活案例:我們去郵局投遞信件,如果不使用郵箱(也就是緩衝區),你必須得把信直接交給郵遞員。有同學會說,直接給郵遞員不是挺簡單的嘛?其實不簡單,你必須得認識誰是郵遞員,才能把信給他。這就產生了你和郵遞員之間的依賴(相當於生產者和消費者的強耦合)。萬一哪天郵遞員換人了,你還要重新認識一下(相當於消費者變化導致修改生產者程式碼)。而郵箱相對來說比較固定,你依賴它的成本就比較低(相當於和緩衝區之間的弱耦合)。

      2.併發

      由於生產者與消費者是兩個獨立的併發體,他們之間是用緩衝區通訊的,生產者只需要往緩衝區裡丟資料,就可以繼續生產下一個資料,而消費者只需要從緩衝區拿資料即可,這樣就不會因為彼此的處理速度而發生阻塞。

      繼續上面的例子,如果我們不使用郵箱,就得在郵局等郵遞員,直到他回來,把信件交給他,這期間我們啥事兒都不能幹(也就是生產者阻塞)。或者郵遞員得挨家挨戶問,誰要寄信(相當於消費者輪詢)。

      3.支援忙閒不均

      當生產者製造資料快的時候,消費者來不及處理,未處理的資料可以暫時存在緩衝區中,慢慢處理掉。而不至於因為消費者的效能造成資料丟失或影響生產者生產。

      比如在寄信的例子中:假設郵遞員一次只能帶走1000封信,萬一碰上情人節(或是聖誕節)送賀卡,需要寄出去的信超過了1000封,這時候郵箱這個緩衝區就派上用場了。郵遞員把來不及帶走的信暫存在郵箱中,等下次過來時再拿走。

      程式碼例項:

  from multiprocessing import Process, Queue, JoinableQueue
  import time
  import random


  def producer(name, food, q):
      for i in range(5):
          data = f'{name}生產了{food}{i}'
          print(data)
          time.sleep(random.randint(1, 3))  # 模擬產生過程
          q.put(data)


  def consumer(name, q):
      while True:
          food = q.get()
          # if food == None:
          #     print('完蛋了 沒得吃了 要餓死人了')
          #     break
          time.sleep(random.random())
          print(f'{name}吃了{food}')
          q.task_done()  # 每次去完資料必須給佇列一個反饋


  if __name__ == '__main__':
      # q = Queue()
      q = JoinableQueue()
      p1 = Process(target=producer, args=('大廚jason', '韭菜炒蛋', q))
      p2 = Process(target=producer, args=('老闆kevin', '祕製小漢堡', q))
      c1 = Process(target=consumer, args=('濤濤', q))
      c2 = Process(target=consumer, args=('龍龍', q))
      c1.daemon = True
      c2.daemon = True
      p1.start()
      p2.start()
      c1.start()
      c2.start()
      # 生產者生產完所有資料之後 往佇列中新增結束的訊號
      p1.join()
      p2.join()
      # q.put(None)  # 結束訊號的個數要跟消費者個數一致才可以
      # q.put(None)
      """佇列中其實已經自己加了鎖 所以多程序取值也不會衝突 並且取走了就沒了"""
      q.join()  # 等待佇列中資料全部被取出(一定要讓生產者全部結束才能判斷正確)
      """執行完上述的join方法表示消費者也已經消費完資料了"""

執行緒理論

      在傳統作業系統中,每個程序有一個地址空間,而且預設就有一個控制執行緒。那麼執行緒是什麼呢?

      執行緒顧名思義,就是一條流水線工作的過程,一條流水線必須屬於一個車間,一個車間的工作過程是一個程序。車間負責把資源整合到一起,是一個資源單位,而一個車間內至少有一個流水線。流水線的工作需要電源,電源就相當於cpu。所以,程序只是用來把資源集中到一起(程序只是一個資源單位,或者說資源集合),而執行緒才是cpu上的執行單位。

      多執行緒(即多個控制執行緒)的概念是,在一個程序中存在多個控制執行緒,多個控制執行緒共享該程序的地址空間,相當於一個車間內有多條流水線,都共用一個車間的資源。

      程序有很多優點,它提供了多道程式設計,可以提高計算機CPU的利用率。既然程序這麼優秀,為什麼還要執行緒呢?其實,仔細觀察就會發現程序還是有很多缺陷的。主要體現在以下幾個方面:

  1. 程序只能在一個時間做一個任務,如果想同時做兩個任務或多個任務,就必須開啟多個程序去完成多個任務。
  2. 程序在執行的過程中如果阻塞,例如等待輸入,整個程序就會掛起,即使程序中有些工作不依賴於輸入的資料,也將無法執行。
  3. 每個程序都有自己的獨立空間,所以多程序的建立,銷燬相比於多執行緒更加耗時,也更加佔用系統資源。

      程序與執行緒的區別:

  1. 地址空間:每個程序都有自己獨立的記憶體空間,也就是說一個程序內的資料在另一個程序是不可見的。但同一程序中的各執行緒間資料是共享的。
  2. 通訊:由於每個程序有自己獨立的記憶體空間,所以程序間通訊需要IPC,而程序內的資料對於多個執行緒來說是共有的,每個執行緒都可以訪問。
  3. 排程和切換:執行緒上下文切換比程序上下文切換要快得多。
  4. 在多執行緒作業系統中,程序不是一個可執行的實體,它主要的功能是向作業系統申請一塊記憶體空間,然後在記憶體空間中開執行緒來執行任務,相當於一個容器,容器中的執行緒才是真正的執行體。一個程序可以包含多個執行緒,而一個執行緒是不能包含程序的。因為程序是系統分配資源的最小單位,所以執行緒不能向作業系統申請自己的空間,但一個執行緒內可以包含多個執行緒。

      執行緒的特點:

  1. 在多執行緒的作業系統中,通常是在一個程序中包括多個執行緒,每個執行緒都是作為利用CPU的基本單位,是花費最小開銷的實體。執行緒具有以下屬性。
  2. 輕型實體:執行緒實體基本上不擁有系統資源,只是有一點必不可少的、能保證獨立執行的資源。執行緒的實體包括程式、資料和TCB。執行緒是動態概念,它的動態特性由執行緒控制塊TCB(Thread Control Block)描述。
  3. 獨立排程和分派的基本單位:在多執行緒OS中,執行緒是能獨立執行的基本單位,因而也是獨立排程和分派的基本單位。由於執行緒很“輕”,故執行緒的切換非常迅速且開銷小(在同一程序中的)。
  4. 共享程序資源:在同一程序中的各個執行緒,都可以共享該程序所擁有的資源,這首先表現在所有執行緒都具有相同的程序id,這意味著,執行緒可以訪問該程序的每一個記憶體資源,此外,還可以訪問程序所擁有的已開啟檔案、定時器、訊號量機構等。由於同一個程序內的執行緒共享記憶體和檔案,所以執行緒之間互相通訊不必呼叫核心。
  5. 可併發執行:在一個程序中的多個執行緒之間,可以併發執行,甚至允許在一個程序中所有執行緒都能併發執行;同樣,不同程序中的執行緒也能併發執行,充分利用和發揮了處理機與外圍裝置並行工作的能力。

開設執行緒的兩種方式

      程序與執行緒的程式碼實操幾乎是一樣的。

  from threading import Thread
  import time

  def task(name):
      print(f'{name} is running')
      time.sleep(3)
      print(f'{name} is over')

  # 建立執行緒無需在__main__下面編寫 但是為了統一 還是習慣在子程式碼中寫
  t = Thread(target=task, args=('jason', ))
  t.start()  # 建立執行緒的開銷極小 幾乎是一瞬間就可以建立
  print('主執行緒')


  class MyThread(Thread):
      def __init__(self, username):
          super().__init__()
          self.username = username
      def run(self):
          print(f'{self.username} jason is running')
          time.sleep(3)
          print(f'{self.username} is over')

  t = MyThread('jasonNB')
  t.start()
  print('主執行緒')

  # 列印結果
  '''
  jason is running
  主執行緒
  jasonNB jason is running
  主執行緒
  jason is over
  jasonNB is over

  Process finished with exit code 0

  '''

      threading.Thread類引數與內建方法:

  # 語法
  class threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
  
  # 引數介紹
  group:目前此引數為None,在實現ThreadGroup類時為將來的擴充套件保留。
  target:target接收的是一個函式的地址,由run()方法呼叫執行函式中的內容。預設為無,表示未呼叫任何內容。
  name :執行緒名,可自行定義。
  args:target接收的是函式名,此函式的位置引數以元組的形式存放在args中,用於執行函式時呼叫。
  kwargs :target接收的是函式名,此函式的關鍵字引數以字典的形式存放在kwargs中,用於執行函式時呼叫。
  aemon:如果為True表示該執行緒為守護執行緒。
  # 內建方法
  1. start():開啟執行緒,一個Thread物件只能呼叫一次start()方法,如果在同一執行緒物件上多次呼叫此方法,則會引發RuntimeError。
  2. run():執行start()方法會呼叫run(),該方將建立Thread物件時傳遞給target的函式名,和傳遞給args、kwargs的引數組合成一個完整的函式,並執行該函式。run()方法一般在自定義Thead類時會用到。
  3. join(timeout=None):join會阻塞、等待執行緒,timeout單位為秒,因為join()總是返回none,所以在設定timeout呼叫join(timeout)之後,需要使用isalive()判斷執行緒是否執行完成,如果isalive為True表示執行緒在規定時間內沒有執行完,執行緒超時。如果join(timeout=None)則會等待執行緒執行完畢後才會執行join()後面的程式碼,一般用於等待執行緒結束。
  4. name:獲取執行緒名。
  5. getName():獲取執行緒名。
  6. setName(name):設定執行緒名。
  7. ident:“執行緒識別符號”,如果執行緒尚未啟動,則為None。如果執行緒啟動是一個非零整數。
  8. is_alive():判斷執行緒的存活狀態,在run()方法開始之前,直到run()方法終止之後。如果執行緒存活返回True,否則返回False。
  9. daemon:如果thread.daemon=True表示該執行緒為守護執行緒,必須在呼叫Start()之前設定此項,否則將引發RuntimeError。預設為False
  10. isDaemon():判斷一個執行緒是否是守護執行緒。
  11. setDaemon(daemonic):設定執行緒為守護執行緒。

多程序和多執行緒的效率對比

  from threading import Thread
  from multiprocessing import Process
  import time

  def thread_work(name):
      print(f"{name}")
  def process_work(name):
      print(f"{name}")

  if __name__ == "__main__":
      # 程序執行效率
      pro = []
      start = time.time()
      for i in range(3):
          p = Process(target=process_work,args=(("程序-"+str(i)),))
          p.start()
          pro.append(p)
      for i in pro:
          i.join()
      end = time.time()
      print("程序運行了:%s" %(end - start))
      # 執行緒執行效率
      thread_l = []
      start = time.time()
      for i in range(3):
          t = Thread(target=process_work, args=(("執行緒-" + str(i)),))
          t.start()
          thread_l.append(t)
      for i in thread_l:
          i.join()
      end = time.time()
      print("程序運行了:%s" % (end - start))

  # 列印內容如下
  '''
  程序-0
  程序-1
  程序-2
  程序運行了:0.18501067161560059
  執行緒-0
  執行緒-1
  執行緒-2
  程序運行了:0.004000186920166016
  '''

執行緒的join方法

  from threading import Thread
  import time

  def task(name):
      print(f'{name} is running')
      time.sleep(3)
      print(f'{name} is over')


  t = Thread(target=task, args=('jason', ))
  t.start()
  t.join()  # 主執行緒程式碼等待子執行緒程式碼執行完畢之後再往下執行
  print('主執行緒')

  # 列印結果
  '''
  jason is running
  jason is over
  主執行緒
  '''

      主執行緒為什麼要等著子執行緒結束才會結束整個程序?因為主執行緒結束也就標誌著整個程序的結束,要確保子執行緒執行過程中所需的各項資源。

同一個程序內的多執行緒資料共享

  from threading import Thread

  money = 10000000000
  def task():
      global money
      money = 1

  t = Thread(target=task)
  t.start()
  t.join()
  print(money)

  # 列印結果
  '''
  1
  '''

      執行緒更改程序內資料,資料也會被更改

守護執行緒

      主執行緒會等待所有非守護執行緒執行完畢後,才結束主執行緒。主程序是程序內的程式碼結束後就結束主程序。對比守護程序,程式碼執行完畢後立即關閉守護程序,因為在主程序看來程式碼執行完畢,主程序結束了,所以守護程序在程式碼結束後就被結束了。

      守護執行緒會等待主執行緒的結束而結束。這是因為如果主執行緒結束意味著程式結束,主執行緒會一直等著所有非守護執行緒結束,回收資源然後退出程式,所以當所有非守護執行緒結束後,守護執行緒結束,然後主執行緒回收資源,結束程式。

      守護程序:

  from threading import Thread
  import time


  def task(name):
      print(f'{name} is running')
      time.sleep(3)
      print(f'{name} is over')

  t1 = Thread(target=task, args=('jason',))
  t2 = Thread(target=task, args=('kevin',))
  t1.daemon = True
  t1.start()
  t2.start()
  print('主執行緒')

  # 列印結果
  '''
  jason is running
  kevin is running
  主執行緒
  jason is over
  kevin is over

  Process finished with exit code 0
  '''

GIL全域性直譯器鎖

      GIL 是最流程的 CPython 直譯器(平常稱為 Python)中的一個技術術語,中文譯為全域性直譯器鎖,其本質上類似作業系統的 Mutex。GIL 的功能是:在 CPython 直譯器中執行的每一個 Python 執行緒,都會先鎖住自己,以阻止別的執行緒執行。

      python直譯器的類別有很多: Cpython Jpython Ppython 。而GIL只存在於CPython直譯器中,不是python的特徵。GIL是一把互斥鎖用於阻止同一個程序下的多個執行緒同時執行。原因是因為CPython直譯器中的垃圾回收機制不是執行緒安全的。

      GIL是加在CPython直譯器上面的互斥鎖,同一個程序下的多個執行緒要想執行必須先搶GIL鎖,所以同一個程序下多個執行緒肯定不能同時執行,即無法利用多核優勢。所有的解釋型語言都無法做到同一個程序下多個執行緒利用多核優勢。