1. 程式人生 > >day40 Pyhton 併發程式設計03

day40 Pyhton 併發程式設計03

一.內容回顧

  程序是計算機中最小的資源分配單位

  程序與程序之間資料隔離,執行過程非同步

  為什麼會出現程序的概念?

    為了合理利用cpu,提高使用者體驗

    多個程序是可以同時利用多個cpu的,可以實現並行的效果

  殭屍程序

  程序 狀態碼 z/z 殭屍程序   linux

  在主程序中控制子程序的方法

    子程序物件 = Process (target,args)  在建立的這一刻根本就沒有通知作業系統

    子程序物件. start()  通知作業系統,開啟子程序,非同步非阻塞

    子程序物件.terminate() 通知作業系統,結束子程序,非同步非阻塞

    子程序物件.is_alive()  檢視子程序是否還活著

    子程序物件.join()  阻塞,直到子程序結束

    子程序物件.join(timeout = 10)  阻塞最多10s,期間子程序如果結束就結束阻塞,如果沒結束10s之後也結束阻塞

# 守護程序
    # 守護程序是一個子程序
    # 守護程序會在主程序程式碼結束之後才結束
    # 為什麼會這樣?
        # 由於主程序必須要回收所有的子程序的資源
        # 所以主程序必須在子程序結束之後才能結束
        # 而守護程序就是為了守護主程序存在的
# 不能守護到主程序結束,就只能退而求其次,守護到程式碼結束了 # 守護到主程序的程式碼結束,意味著如果有其他子程序沒有結束,守護程序無法繼續守護 # 解決方案 : 在主程序中加入對其他子程序的join操作,來保證守護程序可以守護所有主程序和子程序的執行 # 如何設定守護程序 # 子程序物件.daemon = True 這句話寫在start之前
#
    # 為什麼要用鎖?
    # 由於多個程序的併發,導致很多資料的操作都在同時進行
    # 所以就有可能產生多個程序同時操作 : 檔案\資料庫 中的資料
    #
導致資料不安全 # 所以給某一段修改資料的程式加上鎖,就可以控制這段程式碼永遠不會被多個程序同時執行 # 保證了資料的安全 # Lock 鎖(互斥鎖) # 鎖實際上是把你的某一段程式變成同步的了,降低了程式執行的速度,為了保證資料的安全性 # 沒有資料安全的效率都是耍流氓

 訊號量

# 對於鎖 保證一段程式碼同一時刻只能有一個程序執行
# 對於訊號量 保證一段程式碼同一時刻只能有n個程序執行
# 流量控制
# 10個程序
from multiprocessing import Semaphore

sem = Semaphore(4)
sem.acquire()
print('拿走一把鑰匙1')
sem.acquire()
print('拿走一把鑰匙2')
sem.acquire()
print('拿走一把鑰匙3')
sem.acquire()
print('拿走一把鑰匙4')
sem.release()#釋放訊號
sem.acquire()
print('拿走一把鑰匙5')
訊號量Semaphore是同時允許一定數量的執行緒更改資料
訊號量同步基於內部計數器,每呼叫一次acquire(),計數器減1;每呼叫一次release(),計數器加1.當計數器為0時,acquire()呼叫被阻塞。
 
 
 
import time
import random
from multiprocessing import Process,Semaphore
def ktv(name,sem):
    sem.acquire()
    print("%s走進了ktv"%name)
    time.sleep(random.randint(5,10))
    print("%s走出了ktv" % name)
    sem.release()

if __name__ == '__main__':
    sem = Semaphore(4)
    for i in range(100):
        p = Process(target=ktv,args = ('name%s'%i,sem))
        p.start()
name1走進了ktv
name0走進了ktv
name3走進了ktv
name4走進了ktv
name1走出了ktv
name2走進了ktv
name3走出了ktv
事件Event 事件類
e = Event()
e為事件物件,事件本身就帶著標識:False
wait 阻塞
它的阻塞條件是 物件標識為False
結束阻塞條件是 物件標識為True

物件的標識相關的
set 將物件的標識設定為True
clear 將物件的標識設定為False
is_set 檢視物件的標識是否為True
import time
import random
from multiprocessing import Event,Process def traffic_light(e): print('\033[1;31m紅燈亮\033[0m') while True: time.sleep(2) if e.is_set(): # 如果當前是綠燈 print('\033[1;31m紅燈亮\033[0m') # 先列印紅燈亮 e.clear() # 再把燈改成紅色 else : # 當前是紅燈 print('\033[1;32m綠燈亮\033[0m') # 先列印綠燈亮 e.set() # 再把燈變綠色 def car(e,carname): if not e.is_set(): print('%s正在等待通過'%carname) e.wait() print('%s正在通過'%carname) if __name__ == '__main__': e = Event() p = Process(target=traffic_light,args = (e,)) p.start() for i in range(100): time.sleep(random.randrange(0,3)) p = Process(target=car, args=(e,'car%s'%i)) p.start()
標識 控制wait是否阻塞的關鍵 
如何修改這個標識 : clear set
如何檢視這個標識 : is_set
程序之間的資料通訊 IPC
  管道 Pipe
  佇列 Queue
from multiprocessing import Queue,Process

def consumer(q):
    print(
       '子程序 :', q.get()
    )


if __name__ == '__main__':
    q = Queue()
    p = Process(target=consumer,args=(q,))
    p.start()
    q.put('hello,world')
 
 
 
# 生產者消費者模型
import time
from multiprocessing import Queue,Process

def producer(name,food,num,q):
    '''生產者'''
    for i in range(num):
        time.sleep(0.3)
        foodi = food + str(i)
        print('%s生產了%s'%(name,foodi))
        q.put(foodi)

def consumer(name,q):
    while True:
        food = q.get()   # 等待接收資料
        if food == None:break
        print('%s吃了%s'%(name,food))
        time.sleep(1)

if __name__ == '__main__':
    q = Queue(maxsize=10)
    p1 = Process(target=producer,args = ('寶元','泔水',20,q))
    p2 = Process(target=producer,args = ('戰山','魚刺',10,q))
    c1 = Process(target=consumer, args=('alex', q))
    c2 = Process(target=consumer, args=('wusir', q))
    p1.start()   # 開始生產
   p2.start()   # 開始生產
   c1.start()
    c2.start()
    p1.join()    # 生產者結束生產了
   p2.join()    # 生產者結束生產了
   q.put(None)  # put None 操作永遠放在所有的生產者結束生產之後
   q.put(None)  # 有幾個消費者 就put多少個None
佇列為空不準確
q.qsize() 佇列的大小 #
q.full() 是否滿了 滿返回True
q.empty() 是否空了 空返回True
import  time
from multiprocessing import JoinableQueue,Process

def consumer(name,q):
    while True:
        food = q.get()
        time.sleep(1)
        print('%s消費了%s'%(name,food))
        q.task_done()

def producer(name,food,num,q):
    '''生產者'''
    for i in range(num):
        time.sleep(0.3)
        foodi = food + str(i)
        print('%s生產了%s'%(name,foodi))
        q.put(foodi)
    q.join()   # 消費者消費完畢之後會結束阻塞
if __name__ == '__main__':
    q = JoinableQueue()
    p1 = Process(target=producer, args=('寶元', '泔水', 20, q))
    c1 = Process(target=consumer, args=('alex', q))
    c2 = Process(target=consumer, args=('wusir', q))
    c1.daemon = True
    c2.daemon = True
    p1.start()
    c1.start()
    c2.start()
    p1.join()
消費者每消費一個數據會給佇列傳送一條資訊 
當每一個數據都被消費掉之後 joinablequeue的join阻塞行為就會結束
以上就是為什麼我們要在生產完所有資料的時候發起一個q.join()
隨著生產者子程序的執行完畢,說明消費者的資料都消費完畢了
這個時候主程序中的p1.join結束
主程序的程式碼結束
守護程序也結束了

程序之間的資料共享
from multiprocessing import Manager,Process,Lock
def work(d,lock):
    with lock: #不加鎖而操作共享的資料,肯定會出現資料錯亂#{'count': 5}
        d['count']-=1
    # lock.acquire()#與上面意思一樣
    # d['count'] -= 1
    # lock.release()

if __name__ == '__main__':
    lock=Lock()
    m = Manager()
    dic=m.dict({'count':100})
    p_l=[]
    for i in range(100):
        p=Process(target=work,args=(dic,lock))
        p_l.append(p)
        p.start()
    for p in p_l:
        p.join()
    print(dic)
# Manager是一個類 內部有一些資料型別能夠實現程序之間的資料共享
# dict list這樣的資料 內部的數字進行自加 自減 是會引起資料不安全的,這種情況下 需要我們手動加鎖完成
# 因此 我們一般情況下 不適用這種方式來進行程序之間的通訊
# 我們寧可使用Queue佇列或者其他訊息中介軟體 來實現訊息的傳遞 保證資料的安全

程序池

multiprocess.Pool模組
為什麼要有程序池?程序池的概念

在成千上萬個任務需要被執行的時候,我們就需要去建立成千上萬個程序麼?首先,建立程序需要消耗時間,銷燬程序也需要消耗時間。第二即便開啟了成千上萬的程序,作業系統也不能讓他們同時執行,這樣反而會影響程式的效率。因此我們不能無限制的根據任務開啟或者結束程序

程序池的概念,定義一個池子,在裡面放上固定數量的程序,有需求來了,就拿一個池中的程序來處理任務,等到處理完畢,程序並不關閉,而是將程序再放回程序池中繼續等待任務。如果有很多工需要執行,池中的程序數量不夠,任務就要等待之前的程序執行任務完畢歸來,拿到空閒程序才能繼續執行。也就是說,池中程序的數量是固定的,那麼同一時間最多有固定數量的程序在執行。這樣不會增加作業系統的排程難度,還節省了開閉程序的時間,也一定程度上能夠實現併發效果。

import os
import time
from multiprocessing import Pool  #
def func(i):
    i -= 1
    return i**2

# 你的池中打算放多少個程序,個數cpu的個數 * 1|2
if __name__ == '__main__':
    p = Pool(5)
    ret = p.map(func,range(100))  # 自動帶join
    print(ret)
有了程序池,不僅可以只開有限的程序來完成無限的任務 
還可以獲取程式執行的返回值
如果沒有池幫助你實現功能,那麼你自己能不能實現???
通過佇列
# 同步方式向程序池提交任務
import time
from multiprocessing import Pool  #
def func(i):
    i -= 1
    time.sleep(0.5)
    return i**2

# 你的池中打算放多少個程序,個數cpu的個數 * 1|2
if __name__ == '__main__':
    p = Pool(5)
    for i in range(100):
        ret = p.apply(func,args=(i,))  # 自動帶join  序列 同步  apply就是同步提交任務
        print(ret)
 
 
 
# 非同步方式向程序池提交任務
import time
from multiprocessing import Pool  #
def func(i):
    i -= 1
    time.sleep(0.1)
    print(i)
    return i**2

# 你的池中打算放多少個程序,個數cpu的個數 * 1|2
if __name__ == '__main__':
    p = Pool(5)
    for i in range(100):
        ret = p.apply_async(func,args=(i,))  # 自動帶join  非同步的  apply_async非同步提交任務
        print(ret)
    p.close()   # 關閉程序池的任務提交 從此之後不能再向p這個池提交新的任務
    p.join()    # 阻塞 一直到所有的任務都執行完
 
 
 
# 非同步方式向程序池提交任務並且獲取返回值
import time
from multiprocessing import Pool  #
def func(i):
    i -= 1
    time.sleep(1)
    return i**2

# 你的池中打算放多少個程序,個數cpu的個數 * 1|2
if __name__ == '__main__':
    p = Pool(5)
    l = []
    for i in range(100):
        ret = p.apply_async(func,args=(i,))  # 自動帶join  非同步的  apply_async非同步提交任務
        l.append(ret)
    for ret in l:
        print(ret.get())
 
 
 
# 為什麼要用程序池?
    # 任務很多 cpu個數*5個任務以上
    # 為了節省建立和銷燬程序的時間 和 作業系統的資源
# 一般程序池中程序的個數:
    # cpu的1-2倍
    # 如果是高計算,完全沒有io,那麼就用cpu的個數
    # 隨著IO操作越多,可能池中的程序個數也可以相應增加
# 向程序池中提交任務的三種方式
    # map    非同步提交任務 簡便演算法 接收的引數必須是 子程序要執行的func,可迭代的(可迭代中的每一項都會作為引數被傳遞給子程序)
        # 能夠傳遞的引數是有限的,所以比起apply_async限制性比較強
    # apply  同步提交任務(你刪了吧)
    # apply_async 非同步提交任務
        # 能夠傳遞比map更豐富的引數,但是比較麻煩
        # 首先 apply_async提交的任務和主程序完全非同步
        # 可以通過先close程序池,再join程序池的方式,強制主程序等待程序池中任務的完成
        # 也可以通過get獲取返回值的方式,來等待任務的返回值
            # 我們不能在apply_async提交任務之後直接get獲取返回值
            #    for i in range(100):
            #         ret = p.apply_async(func,args=(i,))  # 自動帶join  非同步的  apply_async非同步提交任務
            #         l.append(ret)
            #     for ret in l:
            #         print(ret.get())
 
 
 
# 回撥函式
import os
import time
import random
from multiprocessing import Pool  #
def func(i):     # [2,1,1,5,0,0.2]
    i -= 1
    time.sleep(random.uniform(0,2))
    return i**2

def back_func(args):
    print(args,os.getpid())

if __name__ == '__main__':
    print(os.getpid())
    p = Pool(5)
    l = []
    for i in range(100):
        ret = p.apply_async(func,args=(i,),callback=back_func)  # 5個任務
    p.close()
    p.join()
# callback回撥函式
# 主動執行func,然後在func執行完畢之後的返回值,直接傳遞給back_func作為引數,呼叫back_func
# 處理池中任務的返回值
# 回撥函式是由誰執行的? 主程序