1. 程式人生 > >python學習之併發程式設計

python學習之併發程式設計

目錄

  • 一、併發程式設計之多程序
    • 1.multiprocessing模組介紹
    • 2.Process類的介紹
    • 3.Process類的使用
      • 3.1 建立開啟子程序的兩種方式
      • 3.2 獲取程序pid
      • 3.3驗證程序之間的空間隔離
      • 3.4 程序物件的join方法
      • 3.5程序物件的其他屬性(瞭解)
      • 3.6 殭屍程序和孤兒程序(瞭解)
    • 4.守護程序
    • 5.互斥鎖(程序同步控制)
    • 6.程序之間的通訊
      • 1.基於檔案通訊
      • 2.基於佇列通訊
      • 3.基於管道
    • 7.生產者和消費者
      • 1.為什麼要使用生產者和消費者模式
      • 2.什麼是生產者消費者模式
  • 二、併發程式設計之多執行緒
    • 1. threading模組介紹
    • 2.開啟執行緒的兩種方式
    • 3.執行緒vs程序的程式碼對比
      • 1.多執行緒與多程序開啟速度區別
      • 2.對比pid
      • 3.同一程序內執行緒共享內部資料
    • 4.執行緒的其他方法
    • 5.join 執行緒中的用法
    • 6.守護執行緒
    • 7.互斥鎖
    • 8.死鎖現象和遞迴鎖
    • 9.訊號量
    • 10.Python GIL
    • 11.GIL與lock鎖的區別
    • 12.驗證計算密集型IO密集型的效率
    • 13.多執行緒實現socket通訊
    • 14 . 程序池,執行緒池
    • 15.阻塞,非阻塞,同步,非同步
    • 16.同步呼叫,非同步呼叫
    • 17.非同步呼叫+回撥函式
    • 18.執行緒佇列queque
    • 19.事件event
    • 20.協程
      • 1.Greenlet
      • 2.Gevent

一、併發程式設計之多程序

1.multiprocessing模組介紹

python中的多執行緒無法利用多核優勢,如果想要充分地使用多核CPU的資源(os.cpu_count()檢視),在python中大部分情況需要使用多程序。Python提供了multiprocessing。
multiprocessing模組用來開啟子程序,並在子程序中執行我們定製的任務(比如函式),該模組與多執行緒模組threading的程式設計介面類似。

  multiprocessing模組的功能眾多:支援子程序、通訊和共享資料、執行不同形式的同步,提供了Process、Queue、Pipe、Lock等元件。

​ 需要再次強調的一點是:與執行緒不同,程序沒有任何共享狀態,程序修改的資料,改動僅限於該程序內。

2.Process類的介紹

建立程序的類

Process([group [, target [, name [, args [, kwargs]]]]]),由該類例項化得到的物件,表示一個子程序中的任務(尚未啟動)

強調:
1. 需要使用關鍵字的方式來指定引數
2. args指定的為傳給target函式的位置引數,是一個元組形式,必須有逗號

引數介紹

group引數未使用,值始終為None
target表示呼叫物件,即子程序要執行的任務
args表示呼叫物件的位置引數元組,args=(1,2,'egon',)
kwargs表示呼叫物件的字典,kwargs={'name':'egon','age':18}
name為子程序的名稱

方法介紹

p.start():啟動程序,並呼叫該子程序中的p.run() 
p.run():程序啟動時執行的方法,正是它去呼叫target指定的函式,我們自定義類的類中一定要實現該方法  

p.terminate():強制終止程序p,不會進行任何清理操作,如果p建立了子程序,該子程序就成了殭屍程序,使用該方法需要特別小心這種情況。如果p還儲存了一個鎖那麼也將不會被釋放,進而導致死鎖
p.is_alive():如果p仍然執行,返回True

p.join([timeout]):主執行緒等待p終止(強調:是主執行緒處於等的狀態,而p是處於執行的狀態)。timeout是可選的超時時間,需要強調的是,p.join只能join住start開啟的程序,而不能join住run開啟的程序

屬性介紹

p.daemon:預設值為False,如果設為True,代表p為後臺執行的守護程序,當p的父程序終止時,p也隨之終止,並且設定為True後,p不能建立自己的新程序,必須在p.start()之前設定

p.name:程序的名稱

p.pid:程序的pid

p.exitcode:程序在執行時為None、如果為–N,表示被訊號N結束(瞭解即可)

p.authkey:程序的身份驗證鍵,預設是由os.urandom()隨機生成的32字元的字串。這個鍵的用途是為涉及網路連線的底層程序間通訊提供安全性,這類連線只有在具有相同的身份驗證鍵時才能成功(瞭解即可)

3.Process類的使用

注意:在windows中Process()必須放到# if name == 'main':下

由於Windows沒有fork,多處理模組啟動一個新的Python程序並匯入呼叫模組。 
如果在匯入時呼叫Process(),那麼這將啟動無限繼承的新程序(或直到機器耗盡資源)。 
這是隱藏對Process()內部呼叫的原,使用if __name__ == “__main __”,這個if語句中的語句將不會在匯入時被呼叫。

3.1 建立開啟子程序的兩種方式

方式一

from   multiprocessing import Process
import time

def task(name):
    print(f"{name} is running")
    time.sleep(2)
    print(f"{name} is gone")

if __name__ == '__main__':
#在windos中,開啟程序必須在__name__ == '__main__'下面
    p = Process(target=task,args=("zbb",)) #建立一個程序物件
    p.start()
#只是向作業系統發出一個開闢子程序的訊號,然後執行下一行
# 這個訊號作業系統接收到之後,會從記憶體中開闢一個子程序空間,
# 然後在將主程序所有資料copy載入到子程序,然後在呼叫cpu去執行.
# 開闢子程序開銷是很大的.

    print("==主開始")
    time.sleep(3)
    print("主結束")

# ==主開始
# zbb is running
# zbb is gone
# 主結束

方式二(瞭解不常用)

from multiprocessing  import Process
import time

class MyProcess(Process):

    def __init__(self,name):
        super().__init__()
        self.name = name
    def run(self): #必須為run 否則會執行父類的,但是父類的為None
        print(f"{self.name} is running")
        time.sleep(2)
        print(f"{self.name} is gone ")

if __name__ == '__main__':
    p = MyProcess("zbb")
    p.start()
    print("==主")    
# ==主
# zbb is running
# zbb is gone 

3.2 獲取程序pid

pid是程序在記憶體中唯一的標識

列如,linux中 kill pid

程式碼獲取

from multiprocessing import Process
import os

def task(name):
    print(f'子程序:{os.getpid()}')
    print(f'主程序:{os.getppid()}')



if __name__ == '__main__':
    p = Process(target=task,args=('zbb',))  # 建立一個程序物件
    p.start()
    print(f'====主{os.getpid()}')
    
# ====主13548
# 子程序:1832
# 主程序:13548

win命令列獲取pid

linux中獲取

3.3驗證程序之間的空間隔離

子程序和主程序在不同的空間

from multiprocessing import Process
import time
name = '追夢NAN'

def task():
    global name
    name = 'zbb'
    print(f'子程序{name}')


if __name__ == '__main__':
    p = Process(target=task)  # 建立一個程序物件
    p.start()
    # print('==主開始')
    time.sleep(3)
    print(f'主:{name}')

# 子程序zbb
# 主:追夢NAN

3.4 程序物件的join方法

join讓主程序等待子程序結束之後,在執行主程序.

from multiprocessing import Process
import time

def task(name):
    print(f'{name} is running')
    time.sleep(2)
    print(f'{name} is gone')



if __name__ == '__main__':

    p = Process(target=task,args=('zbb',))  # 建立一個程序物件
    p.start()
    p.join() #
    print('==主開始')

多個子程序使用join

驗證一

from multiprocessing import Process
import time


def task(name,sec):
    print(f'{name}is running')
    time.sleep(sec)
    print(f'{name} is gone')


if __name__ == '__main__':
    start_time = time.time()
    p1 = Process(target=task,args=('1',1))
    p2 = Process(target=task,args=('2',2))
    p3 = Process(target=task,args=('3',3))
    p1.start()
    p2.start()
    p3.start()
    p1.join()  # join只針對主程序,如果join下面多次join 他是不阻塞的.
    p2.join()
    p3.join()
    print(f'==主{time.time()-start_time}')
# 1is running
# 2is running
# 3is running
# 1 is gone
# 2 is gone
# 3 is gone
# ==主3.186117172241211

驗證2

# 多個子程序使用join
from multiprocessing import Process
import time


def task(name,sec):
    print(f'{name}is running')
    time.sleep(sec)
    print(f'{name} is gone')


if __name__ == '__main__':
    start_time = time.time()
    p1 = Process(target=task,args=('1',3))
    p2 = Process(target=task,args=('2',2))
    p3 = Process(target=task,args=('3',1))
    p1.start()
    p2.start()
    p3.start()
    p1.join() #p1就是阻塞 走完周後才走主
    print(f'==主1-{time.time() - start_time}')
    p2.join()
    print(f'==主2-{time.time() - start_time}')
    p3.join()
    print(f'==主3-{time.time()-start_time}')
# 1is running
# 2is running
# 3is running
# 3 is gone
# 2 is gone
# 1 is gone
# ==主1-3.152977705001831
# ==主2-3.152977705001831
# ==主3-3.152977705001831

優化上面程式碼

from multiprocessing import Process
import time


def task(name,sec):
    print(f'{name}is running')
    time.sleep(sec)
    print(f'{name} is gone')


if __name__ == '__main__':
    start_time = time.time()

    l1 = []
    for i in range(1,4):
        p=Process(target=task,args=("zbb",1))
        l1.append(p)
        p.start()
    for i in l1:
        i.join()
    print(f'==主{time.time() - start_time}')
    print(l1)
# zbbis running
# zbbis running
# zbbis running
# zbb is gone
# zbb is gone
# zbb is gone
# ==主1.1665570735931396
# [<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, stopped)>]

join就是阻塞,主程序有join,主程序下面的程式碼一律不執行,直到程序執行完畢之後,在執行.

3.5程序物件的其他屬性(瞭解)

from multiprocessing import Process
import time

def task(name):
    print(f'{name} is running')
    time.sleep(2)
    print(f'{name} is gone')



if __name__ == '__main__':
    p = Process(target=task,args=('cc',),name='aaa')  # 建立一個程序物件
    print(p.name) #給子程序起名字
    p.start()
    # time.sleep(1)
    p.terminate()  #殺死子程序
    time.sleep(0.5) #先睡一會要不然判斷還是活的
    print(p.is_alive())#判斷子程序是否存活 
    # p.name = 'sb' #改子程序的名字

    print('==主開始')

3.6 殭屍程序和孤兒程序(瞭解)

基於unix環境(linux,macOS)

正常:主程序需要等待子程序結束之後,主程序才結束

主程序時刻監測子程序的執行狀態,當子程序結束之後,一段時間之內,將子程序進行回收.

為什麼主程序不在子程序結束後馬上對其回收呢?

1. 主程序與子程序是非同步關係.主程序無法馬上捕獲子程序什麼時候結束.
2. 如果子程序結束之後馬上再記憶體中釋放資源,主程序就沒有辦法監測子程序的狀態了.

unix針對於上面的問題,提供了一個機制.

所有的子程序結束之後,立馬會釋放掉檔案的操作連結,記憶體的大部分資料,但是會保留一些內容: 程序號,結束時間,執行狀態,等待主程序監測,回收.

所有的子程序結束之後,在被主程序回收之前,都會進入殭屍程序狀態.

一:殭屍程序(有害)
  殭屍程序:一個程序使用fork建立子程序,如果子程序退出,而父程序並沒有呼叫wait或waitpid獲取子程序的狀態資訊,那麼子程序的程序描述符仍然儲存在系統中。這種程序稱之為僵死程序。

如果父程序不對殭屍程序進行回收(wait/waitpid),產生大量的殭屍程序,這樣就會佔用記憶體,佔用程序pid號.

殭屍程序如何解決???

父程序產生了大量子程序,但是不回收,這樣就會形成大量的殭屍程序,解決方式就是直接殺死父程序,將所有的殭屍程序變成孤兒程序程序,由init進行回收
二:孤兒程序(無害)

  孤兒程序:一個父程序退出,而它的一個或多個子程序還在執行,那麼那些子程序將成為孤兒程序。孤兒程序將被init程序(程序號為1)所收養,並由init程序對它們完成狀態收集工作。

4.守護程序

子程序守護著主程序,只要主程序結束,子程序跟著就結束

from multiprocessing import Process
import time

def task(name):
    print(f'{name} is running')
    time.sleep(2)
    print(f'{name} is gone')



if __name__ == '__main__':
    # 在windows環境下, 開啟程序必須在 __name__ == '__main__' 下面
    p = Process(target=task,args=('常鑫',))  # 建立一個程序物件
    p.daemon = True  # 將p子程序設定成守護程序,只要主程序結束,守護程序馬上結束.
    p.start()
    time.sleep(1)
    print('===主')

5.互斥鎖(程序同步控制)

多個使用者搶佔一個資源時,第一個使用者先搶到了,加上鎖,用完之後才給第二個使用者使用

問題

三個同事 同時用一個印表機列印內容.

三個程序模擬三個同事, 輸出平臺模擬印表機.

#版本一:
#併發是以效率優先的,但是目前我們的需求: 順序優先.
#多個程序共強一個資源時, 要保證順序優先: 序列,一個一個來.

from multiprocessing import Process
import time
import random
import os

def task1(p):
    print(f'{p}開始列印了')
    time.sleep(random.randint(1,3))
    print(f'{p}列印結束了')

def task2(p):
    print(f'{p}開始列印了')
    time.sleep(random.randint(1,3))
    print(f'{p}列印結束了')


if __name__ == '__main__':

    p1 = Process(target=task1,args=('p1',))
    p2 = Process(target=task2,args=('p2',))


    p2.start()
    p2.join()
    p1.start()
    p1.join()

#我們利用join 解決序列的問題,保證了順序優先,但是這個誰先誰後是固定的.
#這樣不合理. 你在爭搶同一個資源的時候,應該是先到先得,保證公平.
from multiprocessing import Process
from multiprocessing import Lock
import time
import random
import os

def task1(p,lock):
    '''
    一把鎖不能連續鎖兩次
    '''
    lock.acquire()
    print(f'{p}開始列印了')
    time.sleep(random.randint(1,3))
    print(f'{p}列印結束了')
    lock.release()

def task2(p,lock):
    lock.acquire()
    print(f'{p}開始列印了')
    time.sleep(random.randint(1,3))
    print(f'{p}列印結束了')
    lock.release()

if __name__ == '__main__':

    mutex = Lock()
    p1 = Process(target=task1,args=('p1',mutex))
    p2 = Process(target=task2,args=('p2',mutex))

    p2.start()
    p1.start()

lock與join的區別.

共同點: 都可以把併發變成序列, 保證了順序.

不同點: join人為設定順序,lock讓其爭搶順序,保證了公平性.

6.程序之間的通訊

1.基於檔案通訊

# 搶票系統.
# 先可以查票.查詢餘票數.  併發
# 進行購買,向服務端傳送請求,服務端接收請求,在後端將票數-1,返回到前端. 序列.
# 當多個程序共強一個數據時,如果要保證資料的安全,必須要序列.
# 要想讓購買環節進行序列,我們必須要加鎖處理.


from multiprocessing import Process
from multiprocessing import Lock
import json
import time
import os
import random


def search():
    time.sleep(random.randint(1,3))  # 模擬網路延遲(查詢環節)
    with open('ticket.json',encoding='utf-8') as f1:
        dic = json.load(f1)
        print(f'{os.getpid()} 查看了票數,剩餘{dic["count"]}')


def paid():
    with open('ticket.json', encoding='utf-8') as f1:

        dic = json.load(f1)
    if dic['count'] > 0:
        dic['count'] -= 1
        time.sleep(random.randint(1,3))  # 模擬網路延遲(購買環節)
        with open('ticket.json', encoding='utf-8',mode='w') as f1:
            json.dump(dic,f1)
        print(f'{os.getpid()} 購買成功')


def task(lock):
    search()
    lock.acquire()
    paid()
    lock.release()

if __name__ == '__main__':
    mutex = Lock()
    for i in range(6):
        p = Process(target=task,args=(mutex,))
        p.start()


# 當很多程序共強一個資源(資料)時, 你要保證順序(資料的安全),一定要序列.
# 互斥鎖: 可以公平性的保證順序以及資料的安全.

# 基於檔案的程序之間的通訊:
    # 效率低.
    # 自己加鎖麻煩而且很容易出現死鎖.

2.基於佇列通訊

程序彼此之間互相隔離,要實現程序間通訊(IPC),multiprocessing模組支援兩種形式:佇列和管道,這兩種方式都是使用訊息傳遞的

佇列: 把佇列理解成一個容器,這個容器可以承載一些資料,

佇列的特性: 先進先出永遠保持這個資料.

.

建立佇列的類(底層就是以管道和鎖定的方式實現):

1 Queue([maxsize]):建立共享的程序佇列,Queue是多程序安全的佇列,可以使用Queue實現多程序之間的資料傳遞。 

引數介紹:

1 maxsize是佇列中允許最大項數,省略則無大小限制。    

主要方法:

q.put方法用以插入資料到佇列中,put方法還有兩個可選引數:blocked和timeout。如果blocked為True(預設值),並且timeout為正值,該方法會阻塞timeout指定的時間,直到該佇列有剩餘的空間。如果超時,會丟擲Queue.Full異常。如果blocked為False,但該Queue已滿,會立即丟擲Queue.Full異常
q.get方法可以從佇列讀取並且刪除一個元素。
同樣,get方法有兩個可選引數:blocked和timeout。如果blocked為True(預設值),並且timeout為正值,那麼在等待時間內沒有取到任何元素,會丟擲Queue.Empty異常。
如果blocked為False,有兩種情況存在,如果Queue有一個值可用,則立即返回該值,否則,如果佇列為空,則立即丟擲Queue.Empty異常.
q.get_nowait():同q.get(False)
q.put_nowait():同q.put(False)
q.empty():呼叫此方法時q為空則返回True,該結果不可靠,比如在返回True的過程中,如果佇列中又加入了專案。
q.full():呼叫此方法時q已滿則返回True,該結果不可靠,比如在返回True的過程中,如果佇列中的專案被取走。
9 q.qsize():返回佇列中目前專案的正確數量,結果也不可靠,理由同q.empty()和q.full()一樣

其他方法(瞭解):

q.cancel_join_thread():不會在程序退出時自動連線後臺執行緒。可以防止join_thread()方法阻塞
q.close():關閉佇列,防止佇列中加入更多資料。呼叫此方法,後臺執行緒將繼續寫入那些已經入佇列但尚未寫入的資料,但將在此方法完成時馬上關閉。如果q被垃圾收集,將呼叫此方法。關閉佇列不會在佇列使用者中產生任何型別的資料結束訊號或異常。例如,如果某個使用者正在被阻塞在get()操作上,關閉生產者中的佇列不會導致get()方法返回錯誤。
q.join_thread():連線佇列的後臺執行緒。此方法用於在呼叫q.close()方法之後,等待所有佇列項被消耗。預設情況下,此方法由不是q的原始建立者的所有程序呼叫。呼叫q.cancel_join_thread方法可以禁止這種行為
from multiprocessing import Queue
q = Queue(3)  # maxsize

q.put(1)
q.put('alex')
q.put([1,2,3])
# q.put(5555,block=False) # 當佇列滿了時,在程序put資料就會阻塞.
#
print(q.get())
print(q.get())
print(q.get())
print(q.get(timeout=3))  # 阻塞3秒,3秒之後還阻塞直接報錯.
# print(q.get(block=False)) # 當資料取完時,在程序get資料也會出現阻塞,直到某一個程序put資料.

# block=False 只要遇到阻塞就會報錯.

3.基於管道

管道是有問題的,管道會造成資料的不安全,官方給予的解釋是管道有可能會造成資料損壞。

7.生產者和消費者

在併發程式設計中使用生產者和消費者模式能夠解決絕大多數併發問題。該模式通過平衡生產執行緒和消費執行緒的工作能力來提高程式的整體處理資料的速度。

1.為什麼要使用生產者和消費者模式

執行緒世界裡,生產者就是生產資料的執行緒,消費者就是消費資料的執行緒。在多執行緒開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產資料。同樣的道理,如果消費者的處理能力大於生產者,那麼消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。

2.什麼是生產者消費者模式

生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞佇列來進行通訊,所以生產者生產完資料之後不用等待消費者處理,直接扔給阻塞佇列,消費者不找生產者要資料,而是直接從阻塞佇列裡取,阻塞佇列就相當於一個緩衝區,平衡了生產者和消費者的處理能力。

起到緩衝的作用,平衡生產力與消費力,解耦.

from multiprocessing import Process
from multiprocessing import Queue
import time
import random

def producer(q,name):
    for i in range(1,6):
        time.sleep(random.randint(1,2))
        res = f'{i}號包子'
        q.put(res)
        print(f'生產者{name} 生產了{res}')


def consumer(q,name):

    while 1:
        try:
            food = q.get(timeout=3)
            time.sleep(random.randint(1, 3))
            print(f'\033[31;0m消費者{name} 吃了{food}\033[0m')
        except Exception:
            return


if __name__ == '__main__':
    q = Queue()

    p1 = Process(target=producer,args=(q,'路人甲'))
    p2 = Process(target=consumer,args=(q,'路人乙'))

    p1.start()
    p2.start()

二、併發程式設計之多執行緒

1. threading模組介紹

multiprocess模組的完全模仿了threading模組的介面,二者在使用層面,有很大的相似性,因而不再詳細介紹

官方解釋https://docs.python.org/3/library/threading.html?highlight=threading#

2.開啟執行緒的兩種方式

from threading import Thread
import time

def task(name):
    print(f"{name} is runing!")
    time.sleep(2)
    print(f"{name} is  stop")

t1 =Thread(target=task,args=("mysql",))
t1.start()
print('===主執行緒')  # 執行緒是沒有主次之分的(為了好記)

第二種瞭解即可(不常用)

from threading import Thread 
import time

class MyThread(Thread):
    def __init__(self,name):
        super().__init__()
        self.name = name

    def run(self):
        print(f'{self.name} is running!')
        time.sleep(1)
        print(f'{self.name} is stop')


t1 = MyThread("mysql")
t1.start()
print('主')

3.執行緒vs程序的程式碼對比

1.多執行緒與多程序開啟速度區別

多程序

先執行主程序

from multiprocessing import Process


def work():
    print('hello')

if __name__ == '__main__':
    #在主程序下開啟執行緒
    t=Process(target=work)
    t.start()
    print('主執行緒/主程序') 

多執行緒

先執行子執行緒

from threading import Thread
import time

def task(name):
    print(f'{name} is running')
    time.sleep(1)
    print(f'{name} is gone')

if __name__ == '__main__':

    t1 = Thread(target=task,args=('A',))
    t1.start()
    print('===主執行緒')

2.對比pid

程序

from multiprocessing import Process
import time
import os
def task(name):
    print(f'子程序: {os.getpid()}')
    print(f'主程序: {os.getppid()}')


if __name__ == '__main__':

    p1 = Process(target=task,args=('A',))  # 建立一個程序物件
    p2 = Process(target=task,args=('B',))  # 建立一個程序物件
    p1.start()
    p2.start()
    print(f'==主{os.getpid()}')
# ==主17444
# 子程序: 8636
# 主程序: 17444
# 子程序: 14200
# 主程序: 17444

執行緒

資源共享

from threading import Thread
import os

def task():
    print(os.getpid())

if __name__ == '__main__':

    t1 = Thread(target=task)
    t2 = Thread(target=task)
    t1.start()
    t2.start()
    print(f'===主執行緒{os.getpid()}')
# 18712
# 18712
# ===主執行緒18712  

3.同一程序內執行緒共享內部資料

from threading import Thread
import os

x = 3
def task():
    global x
    x = 100

if __name__ == '__main__':

    t1 = Thread(target=task)
    t1.start()
    # t1.join()
    print(f'===主執行緒{x}')

# 同一程序內的資源資料對於這個程序的多個執行緒來說是共享的.

4.執行緒的其他方法

Thread例項物件的方法

  # isAlive(): 返回執行緒是否活動的。
  # getName(): 返回執行緒名。
  # setName(): 設定執行緒名。

threading模組提供的一些方法:
  # threading.currentThread(): 返回當前的執行緒變數。
  # threading.enumerate(): 返回一個包含正在執行的執行緒的list。正在執行指執行緒啟動後、結束前,不包括啟動前和終止後的執行緒。
  # threading.activeCount(): 返回正在執行的執行緒數量,與len(threading.enumerate())有相同的結果。
from threading import Thread
from threading import currentThread
from threading import enumerate
from threading import activeCount
import os
import time

x = 3
def task():
    # print(currentThread())
    time.sleep(1)
    print('666')
print(123)
if __name__ == '__main__':

    t1 = Thread(target=task,name='執行緒1')
    t2 = Thread(target=task,name='執行緒2')
    # name 設定執行緒名
    t1.start()
    t2.start()
    # time.sleep(2)
    print(t1.isAlive())  # 判斷執行緒是否活著
    print(t1.getName())  # 獲取執行緒名
    t1.setName('子執行緒-1')
    print(t1.name)  # 獲取執行緒名  ***

    # threading方法
    print(currentThread())  # 獲取當前執行緒的物件
    print(enumerate())  # 返回一個列表,包含所有的執行緒物件
    print(activeCount())  # ***返回正在執行的執行緒數量,
    print(f'===主執行緒{os.getpid()}')

5.join 執行緒中的用法

join: 阻塞 告知主執行緒要等待我子執行緒執行完畢之後再執行主執行緒

from threading import Thread
import time

def task(name):
    print(f'{name} is running')
    time.sleep(1)
    print(f'{name} is gone')

if __name__ == '__main__':
    start_time = time.time()
    t1 = Thread(target=task,args=('A',))
    t2 = Thread(target=task,args=('B',))

    t1.start()
    t1.join()
    t2.start()
    t2.join()
    print(f'===主執行緒{time.time() - start_time}')

6.守護執行緒

需要強調的是:執行完畢並非終止執行

#1.對主程序來說,執行完畢指的是主程序程式碼執行完畢

#2.對主執行緒來說,執行完畢指的是主執行緒所在的程序內所有非守護執行緒統統執行完畢,主執行緒才算執行完畢

如果守護執行緒的生命週期小於其他執行緒,則啊肯定先結束,否則等待其他非守護執行緒和主執行緒結束

詳細解釋:

#1 主程序在其程式碼結束後就已經算執行完畢了(守護程序在此時就被回收),然後主程序會一直等非守護的子程序都執行完畢後回收子程序的資源(否則會產生殭屍程序),才會結束,

#2 主執行緒在其他非守護執行緒執行完畢後才算執行完畢(守護執行緒在此時就被回收)。因為主執行緒的結束意味著程序的結束,程序整體的資源都將被回收,而程序必須保證非守護執行緒都執行完畢後才能結束。
from threading import Thread
import time


def sayhi(name):
    print('你滾!')
    time.sleep(2)
    print('%s say hello' %name) #主執行緒結束了不執行

if __name__ == '__main__':
    t = Thread(target=sayhi,args=('A',))
    # t.setDaemon(True) #必須在t.start()之前設定
    t.daemon = True
    t.start()  # 執行緒的開啟速度比程序快很多

    print('主執行緒')
# 你滾!
# 主執行緒

主執行緒什麼時候結束???

守護執行緒 等待非守護子執行緒以及主執行緒結束之後,結束.

from threading import Thread
import time

def foo():
    print(123)  # 1
    time.sleep(3)  
    print("end123")   #等帶時間太長了了,子程序已經執行完了,守護程序掛掉不執行

def bar():
    print(456)  # 2
    time.sleep(1)
    print("end456")  # 4


t1=Thread(target=foo)
t2=Thread(target=bar)

t1.daemon=True
t1.start()
t2.start()
print("main-------")  # 3

# 123
# 456
# main-------
# end456

7.互斥鎖

多執行緒的同步鎖與多程序的同步鎖是一個道理,就是多個執行緒搶佔同一個資料(資源)時,我們要保證資料的安全,合理的順序。

多個任務公搶一個數據,保證資料的安全的目的,要讓其序列

from threading import Thread
from threading import Lock
import time
import random
x = 100

def task(lock):

    lock.acquire()
    # time.sleep(random.randint(1,2))
    global x
    temp = x
    time.sleep(0.01)
    temp = temp - 1
    x = temp
    lock.release()


if __name__ == '__main__':
    mutex = Lock()
    l1 = []
    for i in range(100):
        t = Thread(target=task,args=(mutex,))
        l1.append(t)
        t.start()

    time.sleep(3)
    print(f'主執行緒{x}')

8.死鎖現象和遞迴鎖

程序也有死鎖與遞迴鎖,程序的死鎖和遞迴鎖與執行緒的死鎖遞迴鎖同理

from threading import Thread
from threading import Lock
import time

lock_A = Lock()
lock_B = Lock()


class MyThread(Thread):

    def run(self):
        self.f1()
        self.f2()


    def f1(self):

        lock_A.acquire()
        print(f'{self.name}拿到了A鎖')

        lock_B.acquire()
        print(f'{self.name}拿到了B鎖')

        lock_B.release()

        lock_A.release()

    def f2(self):

        lock_B.acquire()
        print(f'{self.name}拿到了B鎖')

        time.sleep(0.1)
        lock_A.acquire()
        print(f'{self.name}拿到了A鎖')

        lock_A.release()

        lock_B.release()



if __name__ == '__main__':

    for i in range(3):
        t = MyThread()
        t.start()

遞迴鎖有一個計數的功能, 原數字為0,上一次鎖,計數+1,釋放一次鎖,計數-1

只要遞迴鎖上面的數字不為零,其他執行緒就不能搶鎖.

#遞迴鎖可以解決死鎖現象,業務需要多個鎖時,先要考慮遞迴鎖.
class MyThread(Thread):

    def run(self):
        self.f1()
        self.f2()

    def f1(self):

        lock_A.acquire()
        print(f'{self.name}拿到了A鎖')

        lock_B.acquire()
        print(f'{self.name}拿到了B鎖')

        lock_B.release()

        lock_A.release()

    def f2(self):

        lock_B.acquire()
        print(f'{self.name}拿到了B鎖')

        lock_A.acquire()
        print(f'{self.name}拿到了A鎖')

        lock_A.release()

        lock_B.release()

if __name__ == '__main__':

    for i in range(3):
        t = MyThread()
        t.start()

9.訊號量

也是一種鎖, 控制併發數量

同進程的一樣

Semaphore管理一個內建的計數器,
每當呼叫acquire()時內建計數器-1;
呼叫release() 時內建計數器+1;
計數器不能小於0;當計數器為0時,acquire()將阻塞執行緒直到其他執行緒呼叫release()。

例項:(同時只有5個執行緒可以獲得semaphore,即可以限制最大連線數為5):

from threading import Thread, Semaphore, current_thread
import time
import random
sem = Semaphore(5) #廁所只有五個五位置 出一個進入一個

def task():
    sem.acquire()

    print(f'{current_thread().name} 廁所')
    time.sleep(random.randint(1,3))

    sem.release()


if __name__ == '__main__':
    for i in range(20):
        t = Thread(target=task,)
        t.start()

10.Python GIL

GIL全域性直譯器鎖

好多自稱大神的說,GIL鎖就是python的致命缺陷,Python不能多核,併發不行等等 .....

這篇文章透徹的剖析了GIL對python多執行緒的影響,

強烈推薦看一下:http://www.dabeaz.com/python/UnderstandingGIL.pdf

為什麼加鎖?

  1. 當時都是單核時代,而且cpu價格非常貴.

  2. 如果不加全域性直譯器鎖, 開發Cpython直譯器的程式設計師就會在原始碼內部各種主動加鎖,解鎖,非常麻煩,各種死鎖現象等等.他為了省事兒,直接進入直譯器時給執行緒加一個鎖.

    優點: 保證了Cpython直譯器的資料資源的安全.

    缺點: 單個程序的多執行緒不能利用多核.

Jpython沒有GIL鎖.

pypy也沒有GIL鎖.

現在多核時代, 我將Cpython的GIL鎖去掉行麼?

因為Cpython直譯器所有的業務邏輯都是圍繞著單個執行緒實現的,去掉這個GIL鎖,幾乎不可能.

單個程序的多執行緒可以併發,但是不能利用多核,不能並行.

多個程序可以併發,並行.

IO密集型

計算密集型

11.GIL與lock鎖的區別

相同點: 都是同種鎖,互斥鎖.

不同點:

​ GIL鎖全域性直譯器鎖,保護直譯器內部的資源資料的安全.

​ GIL鎖 上鎖,釋放無需手動操作.

​ 自己程式碼中定義的互斥鎖保護程序中的資源資料的安全.

​ 自己定義的互斥鎖必須自己手動上鎖,釋放鎖.

詳解聯絡

因為Python直譯器幫你自動定期進行記憶體回收,你可以理解為python直譯器裡有一個獨立的執行緒,每過一段時間它起wake up做一次全域性輪詢看看哪些記憶體資料是可以被清空的,此時你自己的程式 裡的執行緒和 py直譯器自己的執行緒是併發執行的,假設你的執行緒刪除了一個變數,py直譯器的垃圾回收執行緒在清空這個變數的過程中的clearing時刻,可能一個其它執行緒正好又重新給這個還沒來及得清空的記憶體空間賦值了,結果就有可能新賦值的資料被刪除了,為了解決類似的問題,python直譯器簡單粗暴的加了鎖,即當一個執行緒執行時,其它人都不能動,這樣就解決了上述的問題,  這可以說是Python早期版本的遺留問題。

12.驗證計算密集型IO密集型的效率

io密集型:

from threading import Thread
from multiprocessing import Process
import time
import random

def task():
    count = 0
    time.sleep(random.randint(1,3))
    count += 1

if __name__ == '__main__':

#多程序的併發,並行
    start_time = time.time()
    l1 = []
    for i in range(50):
        p = Process(target=task,)
        l1.append(p)
        p.start()

    for p in l1:
        p.join()

    print(f'執行效率:{time.time()- start_time}')  # 4.41826057434082

#多執行緒的併發
    start_time = time.time()
    l1 = []
    for i in range(50):
        p = Thread(target=task,)
        l1.append(p)
        p.start()

    for p in l1:
        p.join()

    print(f'執行效率:{time.time()- start_time}')  # 3.0294392108917236

# 對於IO密集型: 單個程序的多執行緒的併發效率高.

計算密集型:

from threading import Thread
from multiprocessing import Process
import time


def task():
    count = 0
    for i in range(10000000):
        count += 1


if __name__ == '__main__':

    #多程序的併發,並行
    start_time = time.time()
    l1 = []
    for i in range(4):
        p = Process(target=task,)
        l1.append(p)
        p.start()

    for p in l1:
        p.join()

    print(f'執行效率:{time.time()- start_time}')  # 1.1186981201171875

    #多執行緒的併發
    start_time = time.time()
    l1 = []
    for i in range(4):
        p = Thread(target=task,)
        l1.append(p)
        p.start()

    for p in l1:
        p.join()

    print(f'執行效率:{time.time()- start_time}')  # 2.729006767272949

# 總結: 計算密集型: 多程序的併發並行效率高.

13.多執行緒實現socket通訊

無論是多執行緒還是多程序,如果按照,來一個客戶端請求,我就開一個執行緒,來一個請求開一個執行緒,

應該是這樣: 你的計算機允許範圍內,開啟的執行緒程序數量越多越好.

服務端

import socket
from threading import Thread

def communicate(conn,addr):
    while 1:
        try:
            from_client_data = conn.recv(1024)
            print(f'來自客戶端{addr[1]}的訊息: {from_client_data.decode("utf-8")}')
            to_client_data = input('>>>').strip()
            conn.send(to_client_data.encode('utf-8'))
        except Exception:
            break
    conn.close()



def _accept():
    server = socket.socket()

    server.bind(('127.0.0.1', 8848))

    server.listen(5)

    while 1:
        conn, addr = server.accept()
        t = Thread(target=communicate,args=(conn,addr))
        t.start()

if __name__ == '__main__':
    _accept()

客戶端

import socket

client = socket.socket()

client.connect(('127.0.0.1',8848))

while 1:
    try:
        to_server_data = input('>>>').strip()
        client.send(to_server_data.encode('utf-8'))

        from_server_data = client.recv(1024)
        print(f'來自服務端的訊息: {from_server_data.decode("utf-8")}')

    except Exception:
        break
client.close()

14 . 程序池,執行緒池

為什麼要將程序池和執行緒池放到一起呢,是為了統一使用方式,使用threadPollExecutor和ProcessPollExecutor的方式一樣,而且只要通過這個concurrent.futures匯入就可以直接用他們兩個了

執行緒池: 一個容器,這個容器限制住你開啟執行緒的數量,比如4個

,第一次肯定只能併發的處理4個任務,只要有任務完成,執行緒馬上就會接下一個任務.

以時間換空間.

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import os
import time
import random

def task(n):
    print(f'{os.getpid()} 接客')
    time.sleep(random.randint(1,3))


if __name__ == '__main__':

    # 開啟程序池  (並行(並行+併發))
    p = ProcessPoolExecutor()  # 預設不寫,程序池裡面的程序數與cpu個數相等

    for i in range(20): #釋出20個任務,cpu個數的程序處理
        p.submit(task,i)
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import os
import time
import random

def task(n):
    print(f'{os.getpid()} 接客')
    time.sleep(random.randint(1,3))


if __name__ == '__main__':

    # 開啟程序池  (並行(並行+併發))
    p = ProcessPoolExecutor()  # 預設不寫,程序池裡面的程序數與cpu個數相等

    for i in range(20): #釋出20個任務,cpu個數的程序處理
        p.submit(task,i)

15.阻塞,非阻塞,同步,非同步

執行的角度

阻塞:程式執行時,遇到IO、程式掛起CPU被切走

非阻塞:程式沒有遇到IO,程式遇到IO但是通過某種手段,讓cpu強行執行我的程式。

提交任務的角度

同步:提交一個任務,自任務開始執行直到此任務結束(可能有IO),返回一個返回值之後,我再提交下一個任務

非同步:一次性提交多個任務,然後我就直接執行下一行程式碼

16.同步呼叫,非同步呼叫

非同步呼叫

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import time
import random
import os

def task(i):
    print(f'{os.getpid()}開始任務')
    time.sleep(random.randint(1,3))
    print(f'{os.getpid()}任務結束')
    return i
if __name__ == '__main__':

    # 非同步呼叫
    pool = ProcessPoolExecutor()
    for i in range(10):
        pool.submit(task,i)

    pool.shutdown(wait=True)
    # shutdown: 讓我的主程序等待程序池中所有的子程序都結束任務之後,在執行. 有點類似與join.
    # shutdown: 在上一個程序池沒有完成所有的任務之前,不允許新增新的任務.
    # 一個任務是通過一個函式實現的,任務完成了他的返回值就是函式的返回值.
    print('===主')

同步呼叫

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import time
import random
import os

def task(i):
    print(f'{os.getpid()}開始任務')
    time.sleep(random.randint(1,3))
    print(f'{os.getpid()}任務結束')
    return i
if __name__ == '__main__':

    # 同步呼叫
    pool = ProcessPoolExecutor()
    for i in range(10):
        obj = pool.submit(task,i)
        # obj是一個動態物件,返回的當前的物件的狀態,有可能執行中,可能(就緒阻塞),還可能是結束了.
        # obj.result() 必須等到這個任務完成後,返回了結果之後,在執行下一個任務.
        print(f'任務結果:{obj.result()}')

    pool.shutdown(wait=True)
    print('===主') 

17.非同步呼叫+回撥函式

瀏覽器工作原理, 向服務端傳送一個請求,服務端驗證你的請求,如果正確,給你的瀏覽器返回一個檔案,
瀏覽器接收到檔案,將檔案裡面的程式碼渲染成你看到的漂亮美麗的模樣.

什麼叫爬蟲?

  1. 利用程式碼模擬一個瀏覽器,進行瀏覽器的工作流程得到一堆原始碼.
  2. 對原始碼進行資料清洗得到我想要資料.

pip install requests

import requests
ret = requests.get('http://www.baidu.com')
if ret.status_code == 200:
    print(ret.text)

基於 非同步呼叫回收所有任務的結果我要做到實時回收結果,
併發執行任務每個任務只是處理IO阻塞的,不能增加新得功能.
非同步呼叫 + 回撥函式

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import requests


def task(url):
    '''模擬的就是爬取多個原始碼 一定有IO操作'''
    ret = requests.get(url)
    if ret.status_code == 200:
        return ret.text


def parse(obj):
    '''模擬對資料進行分析 一般沒有IO'''
    print(len(obj.result()))


if __name__ == '__main__':

    # 開啟執行緒池,併發並行的執行
    url_list = [
        'http://www.baidu.com',
        'http://www.JD.com',
        'http://www.JD.com',
        'http://www.JD.com',
        'http://www.taobao.com',
        'https://www.cnblogs.com/jin-xin/articles/7459977.html',
        'https://www.luffycity.com/',
        'https://www.cnblogs.com/jin-xin/articles/9811379.html',
        'https://www.cnblogs.com/jin-xin/articles/11245654.html',
        'https://www.sina.com.cn/',

    ]
    pool = ThreadPoolExecutor(4)

    for url in url_list:
        obj = pool.submit(task, url)
        obj.add_done_callback(parse) #回撥函式

    '''
    執行緒池設定4個執行緒, 非同步發起10個任務,每個任務是通過網頁獲取原始碼, 併發執行,
    當一個任務完成之後,將parse這個分析程式碼的任務交由剩餘的空閒的執行緒去執行,你這個執行緒繼續去處理其他任務.
    如果程序池+回撥: 回撥函式由主程序去執行.
    如果執行緒池+回撥: 回到函式由空閒的執行緒去執行.
    '''

非同步 回撥是一回事兒?
非同步站在釋出任務的角度,
站在接收結果的角度: 回撥函式 按順序接收每個任務的結果,進行下一步處理.

非同步 + 回撥:
非同步處理的IO型別.
回撥處理非IO

18.執行緒佇列queque

1.先進先出

import  queue
#先進先出
q  = queue.Queue(3)
q.put(1)
q.put(2)
q.put(3)

print(q.get())
print(q.get())
print(q.get())
# print(q.get(block=False))#遇到阻塞直接報錯
q.get(timeout=3) #阻塞2秒沒有值直接報錯

2.後進先出,堆疊

q =queue.LifoQueue(4)
q.put(1)
q.put(2)
q.put(3)
q.put(4)
print(q.get())
print(q.get())
print(q.get())
print(q.get())

3.優先順序佇列

q = queue.PriorityQueue(4)
q.put((5,"z"))
q.put((0,"b"))
q.put((-1,"2"))
q.put((-1,"3"))
#同一級別按照accik碼 一般不會存在同一級別的
print(q.get())
print(q.get())
print(q.get())
print(q.get())

19.事件event

開啟兩個執行緒,一個執行緒執行到中間的某個階段,觸發另個執行緒執行.兩個執行緒增加了耦合性.

from threading import  Thread,current_thread,Event
import time

event = Event()
def check():
    print(f"{current_thread().name}檢測伺服器是否開啟")
    time.sleep(3)
    # print(event.is_set())#判斷set是否存在
    event.set()
    # print(event.is_set())#再set之後顯示T
    print('伺服器已經開啟')

def connect():
    print(f'{current_thread().name}等待連線..')
    event.wait() #阻塞 知道event.set()執行後
    # event.wait(1)#只阻塞1秒,一秒之後執行
    print(f"{current_thread().name}連線成功")

t1 = Thread(target=check,)
t2 = Thread(target=connect,)
t1.start()
t2.start()

20.協程

協程:是單執行緒下的併發,又稱微執行緒,纖程。英文名Coroutine。一句話說明什麼是執行緒:協程是一種使用者態的輕量級執行緒,即協程是由使用者程式自己控制排程的。

單個cpu: 10個任務,讓你給我併發的執行這個10個任務:

1. 方式一:開啟多程序併發執行, 作業系統切換+保持狀態.
2. 方式二:開啟多執行緒併發執行,作業系統切換+保持狀態.
3. 方式三:開啟協程併發的執行, 自己的程式 把控著cpu 在3個任務之間來回切換+保持狀態.

對3詳細解釋: 協程他切換速度非常快,矇蔽作業系統的眼睛,讓作業系統認為cpu一直在執行你這一個執行緒(協程.)

需要強調的是:

#1. python的執行緒屬於核心級別的,即由作業系統控制排程(如單執行緒遇到io或執行時間過長就會被迫交出cpu執行許可權,切換其他執行緒執行)
#2. 單執行緒內開啟協程,一旦遇到io,就會從應用程式級別(而非作業系統)控制切換,以此來提升效率(!!!非io操作的切換與效率無關)

  對比作業系統控制執行緒的切換,使用者在單執行緒內控制協程的切換

  優點如下:

#1. 協程的切換開銷更小,屬於程式級別的切換,作業系統完全感知不到,因而更加輕量級
#2. 單執行緒內就可以實現併發的效果,最大限度地利用cpu

  缺點如下:

#1. 協程的本質是單執行緒下,無法利用多核,可以是一個程式開啟多個程序,每個程序內開啟多個執行緒,每個執行緒內開啟協程

  總結協程特點:

  1. 必須在只有一個單執行緒裡實現併發
  2. 修改共享資料不需加鎖
  3. 使用者程式裡自己儲存多個控制流的上下文棧

1.Greenlet

如果我們在單個執行緒內有20個任務,要想實現在多個任務之間切換,使用yield生成器的方式過於麻煩(需要先得到初始化一次的生成器,然後再呼叫send。。。非常麻煩),而使用greenlet模組可以非常簡單地實現這20個任務直接的切換

#安裝
pip3 install greenlet
# 切換 +保持狀態(遇到IO不會主動切換)
#真正的協程模組就是使用greenlet完成的切換
from greenlet import greenlet

def eat(name):
    print('%s eat 1' %name)  #2
    g2.switch('taibai')   #3
    print('%s eat 2' %name) #6
    g2.switch() #7
def play(name):
    print('%s play 1' %name) #4
    g1.switch()      #5
    print('%s play 2' %name) #8

g1=greenlet(eat)
g2=greenlet(play)

g1.switch('taibai')#可以在第一次switch時傳入引數,以後都不需要  1

工作中:

​ 一般在工作中我們都是程序+執行緒+協程的方式來實現併發,以達到最好的併發效果,如果是4核的cpu,一般起5個程序,每個程序中20個執行緒(5倍cpu數量),每個執行緒可以起500個協程,大規模爬取頁面的時候,等待網路延遲的時間的時候,我們就可以用協程去實現併發。 併發數量 = 5 * 20 * 500 = 50000個併發,這是一般一個4cpu的機器最大的併發數。nginx在負載均衡的時候最大承載量就是5w個

  單執行緒裡的這20個任務的程式碼通常會既有計算操作又有阻塞操作,我們完全可以在執行任務1時遇到阻塞,就利用阻塞的時間去執行任務2。。。。如此,才能提高效率,這就用到了Gevent模組。

2.Gevent

#安裝
pip3 install gevent

  Gevent 是一個第三方庫,可以輕鬆通過gevent實現併發同步或非同步程式設計,在gevent中用到的主要模式是Greenlet, 它是以C擴充套件模組形式接入Python的輕量級協程。 Greenlet全部執行在主程式作業系統程序的內部,但它們被協作式地排程。

用法

g1=gevent.spawn(func,1,2,3,x=4,y=5)建立一個協程物件g1,spawn括號內第一個引數是函式名,如eat,後面可以有多個引數,可以是位置實參或關鍵字實參,都是傳給函式eat的,spawn是非同步提交任務

g2=gevent.spawn(func2)

g1.join() #等待g1結束

g2.join() #等待g2結束  有人測試的時候會發現,不寫第二個join也能執行g2,是的,協程幫你切換執行了,但是你會發現,如果g2裡面的任務執行的時間長,但是不寫join的話,就不會執行完等到g2剩下的任務了


#或者上述兩步合作一步:gevent.joinall([g1,g2])

g1.value#拿到func1的返回值

最終版本

import gevent
from gevent import monkey
monkey.patch_all()  # 打補丁: 將下面的所有的任務的阻塞都打上標記
def eat(name):
    print('%s eat 1' %name)
    time.sleep(2)
    print('%s eat 2' %name)

def play(name):
    print('%s play 1' %name)
    time.sleep(1)
    print('%s play 2' %name)


g1 = gevent.spawn(eat,'egon')
g2 = gevent.spawn(play,name='egon')

# g1.join()
# g2.join()
gevent.joinall([g1,g2])