1. 程式人生 > >併發程式設計多程序

併發程式設計多程序

1.守護程序

守護程序:表示一個程序b守護另一個程序a,當被守護的程序a結束後,那麼b也跟著結束了。

如何實現:在子程序開始之前,將daemon的值設定為True。

應用場景:子程序是幫助主程序完成任務的,如果主程序結束了,並且沒必要去使用子程序執行的結果,就可以將子程序設定為守護程序。例如:在執行qq的過程中,開啟一個下載任務的子程序,下載任務沒有完成,qq就終止掉,下載任務也跟著終止掉。

主程序建立守護程序:

  其一:守護程序會在主程序程式碼執行結束後就終止

  其二:守護程序內無法再開啟子程序,否則丟擲異常:AssertionError: daemonic processes are not allowed to have children

注意:程序之間是互相獨立的,主程序程式碼執行結束,守護程序隨即終止。

import time
from multiprocessing import Process

def task():
    print("妃子的一生")
    time.sleep(5)
    print("妃子涼了")

if __name__ == '__main__':
    fz = Process(target=task)
    fz.daemon = True  # 將子程序作為主程序的守護程序  要注意  必須在開啟子程序之前 設定!
    fz.start()


    print
("皇帝登基了") time.sleep(2) print("當了十年皇帝..") print("皇帝駕崩")

2.互斥鎖

當多個程序共享一個數據時,共享資料就意味著資料的競爭,這樣就可能發生資料錯亂的情況。我們可以使用join來讓這些程序穿行,但是會導致無法併發,降低效率,並且程序執行的順序被固定死了。我們就可以使用互斥鎖,來解決這種問題。

互斥鎖簡單的來說就是互相排斥的鎖,將需要共享的資料進行加鎖,其他程序在訪問資料時,就必須等待當前程序使用完畢,才能進行訪問。其本質就是一個bool型別的資料,在執行程式碼前,會先判斷該值。

注:在使用互斥鎖時,必須保證互斥鎖是同一個。多次執行acquire會把資料鎖死。

使用:在main下面建立Lock的物件(lock = Lock()),將這個物件作為引數傳給子程序,在子程序中,在需要加鎖的程式碼上方加上lock.acquire() (上鎖),在程式碼下方加上lock.release()(解鎖)。

RLock表示可重用鎖,其特點是可以多次執行acquire。RLock在執行多次acqurie時和普通Lock沒有任何區別,如果在多程序中使用RLock,並且一個程序a執行多次acquire,其他程序b要想獲得這個鎖,需要程序a把鎖解開,就是說鎖了幾次,就要解開幾次。

from multiprocessing import Process,Lock       
import time,random                             
def task1(lock):                               
    lock.acquire()                             
    print('子程序1活著..')                          
    time.sleep(random.randint(1, 3))           
    print('子程序1正常死去。。。')                       
    lock.release()                             
                                               
                                               
def task2(lock):                               
    lock.acquire()                             
    print('子程序2活著..')                          
    time.sleep(random.randint(1, 3))           
    print('子程序2正常死去。。。')                       
    lock.release()                             
                                               
                                               
def task3(lock):                               
    lock.acquire()                             
    print('子程序3活著..')                          
    time.sleep(random.randint(1, 3))           
    print('子程序3正常死去。。。')                       
    lock.release()                             
                                               
                                               
if __name__ == '__main__':                     
    lock = Lock()                              
    p1 = Process(target=task1, args=(lock,))   
    p1.start()                                 
    p2 = Process(target=task2, args=(lock,))   
    p2.start()                                 
    p3 = Process(target=task3, args=(lock,))   
    p3.start()                                 
互斥鎖
# def task3(lock):
#     pass
# # 鎖的實現原理 虛擬碼
# # l = False
# # def task3(lock):
# #     global l
# #     if l == False:
# #         l = True
# #         print("3my name is:常威")
# #         time.sleep(random.randint(1, 2))
# #         print("3my age is:68")
# #         time.sleep(random.randint(1, 2))
# #         print("3my sex is:femal")
# #     l = False
#
# if __name__ == '__main__':
#     lock = Lock()
#
#     p1 = Process(target=task1,args=(lock,))
#     p1.start()
#     # p1.join()
#
#     p2 = Process(target=task2,args=(lock,))
#     p2.start()
#     # p2.join()
#
#     p3 = Process(target=task3,args=(lock,))
#     p3.start()
#     # p3.join()
互斥鎖實現原理
# lock = RLock()
# lock.acquire()
# lock.acquire()
#
# print("哈哈")
# lock.release()

import time
def task(i,lock):
    lock.acquire()
    lock.acquire()
    print(i)
    time.sleep(3)
    lock.release()
    lock.release()
#第一個過來 睡一秒  第二個過來了 睡一秒   第一個列印1  第二個列印2

if __name__ == '__main__':
    lock = RLock()
    p1 = Process(target=task,args=(1,lock))
    p1.start()

    p2 = Process(target=task, args=(2,lock))
    p2.start()
RLock
import json
from multiprocessing import Process,Lock
import time
import random

# 檢視剩餘票數
def check_ticket(usr):
    time.sleep(random.randint(1,3))
    with open("ticket.json","r",encoding="utf-8") as f:
        dic = json.load(f)
        print("%s檢視 剩餘票數:%s" % (usr,dic["count"]))

def buy_ticket(usr):
    with open("ticket.json","r",encoding="utf-8") as f:
        dic = json.load(f)
        if dic["count"] > 0:
            time.sleep(random.randint(1,3))
            dic["count"] -= 1
            with open("ticket.json", "w", encoding="utf-8") as f2:
                json.dump(dic,f2)
                print("%s 購票成功!" % usr)


def task(usr,lock):

    check_ticket(usr)

    lock.acquire()
    buy_ticket(usr)
    lock.release()

if __name__ == '__main__':
    lock = Lock()

    for i in range(10):
        p = Process(target=task,args=("使用者%s" % i,lock))
        p.start()
        #p.join() # 只有第一個整個必須完畢 別人才能買 這是不公平的
互斥鎖模擬搶票

死鎖:指的是鎖無法開啟,導致程式卡死,一把鎖是不會鎖死的,只有存在多個鎖的狀態下,才會鎖死,就是一個子程序a搶了一把鎖,另一個子程序同時搶了另一把鎖,就出現死鎖狀況。正常在程式開發時,是不要建立多把鎖的。

from multiprocessing import Process,Lock
import time
def task1(l1,l2,i):
    l1.acquire()
    print("盤子被%s搶走了" % i)
    time.sleep(1)


    l2.acquire()
    print("筷子被%s搶走了" % i)
    print("吃飯..")
    l1.release()
    l2.release()


    pass

def task2(l1,l2,i):

    l2.acquire()
    print("筷子被%s搶走了" % i)

    l1.acquire()
    print("盤子被%s搶走了" % i)

    print("吃飯..")
    l1.release()
    l2.release()


if __name__ == '__main__':
    l1 = Lock()
    l2 = Lock()
    Process(target=task1,args=(l1,l2,1)).start()
    Process(target=task2,args=(l1,l2,2)).start()
死鎖

3.IPC

IPC指的是程序間通訊。由於程序之間記憶體是相互獨立的,所以我們需要一種方案能夠使得程序之間可以相互傳遞資料,有三種方式:

1.使用共享檔案,多個程序同時讀寫同一個檔案,其特點:I/O速度慢,傳輸資料大小不受限制。

2.管道:他是基於記憶體的,速度比較快,但是它是單向的,用起來比較麻煩。

3.申請共享記憶體空間,多個程序可以共享這個記憶體區域,其特點:速度快,傳輸資料量不能太大。

from multiprocessing import Manager,Process,Lock
def work(d):
    # with lock:
        d['count']-=1

if __name__ == '__main__':

    with Manager() as m:
        dic=m.dict({'count':100}) #建立一個共享的字典
        p_l=[]
        for i in range(100):
            p=Process(target=work,args=(dic,))
            p_l.append(p)
            p.start()

        for p in p_l:
            p.join()
        print(dic)
IPC管道

4.佇列

佇列不只用於程序間通訊,也是一種常見的資料容器,先進先出。還可以保證資料不會錯亂,即使在多程序下,由於其put和get預設都是阻塞的。

from multiprocessing import Queue

# q = Queue(1)  # 建立一個佇列 最多可以存一個數據
#
# q.put("張三")
# print(q.get())
#
# q.put("李四") # put預設會阻塞 當容器中已經裝滿了
#
# print(q.get())
# print(q.get()) # get預設會阻塞 當容器中已經沒有資料了
#
# print("over")


q = Queue(1)  # 建立一個佇列 最多可以存一個數據
#
q.put("張三")
# q.put("李四",False) # 第二個引數 設定為False表示不會阻塞 無論容器是滿了 都會強行塞 如果滿了就拋異常

print(q.get())
print(q.get(timeout=3)) # timeout 僅用於阻塞時

# q.put("李四") # put預設會阻塞 當容器中已經裝滿了
#
# print(q.get())
# print(q.get()) # get預設會阻塞 當容器中已經沒有資料了
#
# print("over")
佇列

5.生產者消費者模型

   生產者消費者模型

在併發程式設計中使用生產者和消費者模式能夠解決絕大多數併發問題。該模式通過平衡生產執行緒和消費執行緒的工作能力來提高程式的整體處理資料的速度。

    為什麼要使用生產者和消費者模式

線上程世界裡,生產者就是生產資料的執行緒,消費者就是消費資料的執行緒。在多執行緒開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產資料。同樣的道理,如果消費者的處理能力大於生產者,那麼消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。

    什麼是生產者消費者模式

生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞佇列來進行通訊,所以生產者生產完資料之後不用等待消費者處理,直接扔給阻塞佇列,消費者不找生產者要資料,而是直接從阻塞佇列裡取,阻塞佇列就相當於一個緩衝區,平衡了生產者和消費者的處理能力。

基於佇列實現生產者消費者模型。

   生產者消費者模型

在併發程式設計中使用生產者和消費者模式能夠解決絕大多數併發問題。該模式通過平衡生產執行緒和消費執行緒的工作能力來提高程式的整體處理資料的速度。

    為什麼要使用生產者和消費者模式

線上程世界裡,生產者就是生產資料的執行緒,消費者就是消費資料的執行緒。在多執行緒開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產資料。同樣的道理,如果消費者的處理能力大於生產者,那麼消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。

    什麼是生產者消費者模式

生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞佇列來進行通訊,所以生產者生產完資料之後不用等待消費者處理,直接扔給阻塞佇列,消費者不找生產者要資料,而是直接從阻塞佇列裡取,阻塞佇列就相當於一個緩衝區,平衡了生產者和消費者的處理能力。

基於佇列實現生產者消費者模型

   生產者消費者模型

在併發程式設計中使用生產者和消費者模式能夠解決絕大多數併發問題。該模式通過平衡生產執行緒和消費執行緒的工作能力來提高程式的整體處理資料的速度。

    為什麼要使用生產者和消費者模式

線上程世界裡,生產者就是生產資料的執行緒,消費者就是消費資料的執行緒。在多執行緒開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產資料。同樣的道理,如果消費者的處理能力大於生產者,那麼消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。

    什麼是生產者消費者模式

生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞佇列來進行通訊,所以生產者生產完資料之後不用等待消費者處理,直接扔給阻塞佇列,消費者不找生產者要資料,而是直接從阻塞佇列裡取,阻塞佇列就相當於一個緩衝區,平衡了生產者和消費者的處理能力。

基於佇列實現生產者消費者模型。生產者負責產生資料,消費者負責處理資料。

import random
from multiprocessing import Process,Queue
import time
# 爬資料
def get_data(q):

    for num in range(5):
        print("正在爬取第%s個數據" % num)
        time.sleep(random.randint(1,2))
        print("第%s個數據 爬取完成" % num)
        # 把資料裝到佇列中
        q.put("第%s個數據" % num)


def parse_data(q):
    for num in range(5):
        # 取出資料
        data = q.get()
        print("正在解析%s" % data)
        time.sleep(random.randint(1, 2))
        print("%s 解析完成" % data)

if __name__ == '__main__':
    # 共享資料容器
    q = Queue(5)
    #生產者程序
    produce =  Process(target=get_data,args=(q,))
    produce.start()
    #消費者程序
    customer = Process(target=parse_data,args=(q,))
    customer.start()
生產者消費者模型