1. 程式人生 > >python 併發之程序

python 併發之程序

一.什麼是程序

 程序(Process)是計算機中的程式關於某資料集合上的一次執行活動,是系統進行資源分配和排程的基本單位,是作業系統結構的基礎。在早期面向程序設計的計算機結構中,程序是程式的基本執行實體;在當代面向執行緒設計的計算機結構中,程序是執行緒的容器。程式是指令、資料及其組織形式的描述,程序是程式的實體。我們自己在python檔案中寫了一些程式碼,這叫做程式,執行這個python檔案的時候,這叫做程序。

 狹義定義:程序是正在執行的程式的例項(an instance of a computer program that is being executed)。  廣義定義:程序是一個具有一定獨立功能的程式關於某個資料集合的一次執行活動。它是作業系統動態執行的基本單元,在傳統的作業系統中,程序既是基本的分配單元,也是基本的執行單元。
 舉例: 比如py1檔案中有個變數a=1,py2檔案中有個變數a=2,他們兩個會衝突嗎?不會的,是不是,因為兩個檔案執行起來後是兩個程序,作業系統讓他們在記憶體上隔離開,對吧。
 程序(Process)是計算機中的程式關於某資料集合上的一次執行活動,是系統進行資源分配和排程的基本單位,是作業系統結構的基礎。在早期面向程序設計的計算機結構中,程序是程式的基本執行實體;在當代面向執行緒設計的計算機結構中,程序是執行緒的容器。程式是指令、資料及其組織形式的描述,程序是程式的實體。我們自己在python檔案中寫了一些程式碼,這叫做程式,執行這個python檔案的時候,這叫做程序。

  狹義定義:程序是正在執行的程式的例項(an instance of a computer program that 
is being executed)。   廣義定義:程序是一個具有一定獨立功能的程式關於某個資料集合的一次執行活動。它是作業系統動態執行的基本單元,在傳統的作業系統中,程序既是基本的分配單元,也是基本的執行單元。   舉例: 比如py1檔案中有個變數a=1,py2檔案中有個變數a=2,他們兩個會衝突嗎?不會的,是不是,因為兩個檔案執行起來後是兩個程序,作業系統讓他們在記憶體上隔離開,對吧。
程序的概念

 

動態性:程序的實質是程式在多道程式系統中的一次執行過程,程序是動態產生,動態消亡的。
併發性:任何程序都可以同其他程序一起併發執行
獨立性:程序是一個能獨立執行的基本單位,同時也是系統分配資源和排程的獨立單位;
非同步性:由於程序間的相互制約,使程序具有執行的間斷性,即程序按各自獨立的、不可預知的速度向前推進
結構特徵:程序由程式、資料和程序控制塊三部分組成。
多個不同的程序可以包含相同的程式:一個程式在不同的資料集裡就構成不同的程序,能得到不同的結果;但是執行過程中,程式不能發生改變。
程序的特徵

 

程式是指令和資料的有序集合,其本身沒有任何執行的含義,是一個靜態的概念。
而程序是程式在處理機上的一次執行過程,它是一個動態的概念。
程式可以作為一種軟體資料長期存在,而程序是有一定生命期的。
程式是永久的,程序是暫時的。
舉例:就像qq一樣,qq是我們安裝在自己電腦上的客戶端程式,其實就是一堆的程式碼檔案,我們不執行qq,那麼他就是一堆程式碼程式,當我們執行qq的時候,這些程式碼執行起來,就成為一個程序了。
程序與程式的區別

 

注意:同一個程式執行兩次,就會在作業系統中出現兩個程序,所以我們可以同時執行一個軟體,分別做不同的事情也不會混亂。比如開啟暴風影音,雖然都是同一個軟體,但是一個可以播放米老鼠,一個可以播放唐老鴨。

    二.併發和併發

通過程序之間的排程,也就是程序之間的切換,我們使用者感知到的好像是兩個視訊檔案同時在播放,或者音樂和遊戲同時在進行,那就讓我們來看一下什麼叫做併發和並行

無論是並行還是併發,在使用者看來都是'同時'執行的,不管是程序還是執行緒,都只是一個任務而已,真是幹活的是cpu,cpu來做這些任務,而一個cpu同一時刻只能執行一個任務

  併發:是偽並行,即看起來是同時執行。單個cpu+多道技術就可以實現併發,(並行也屬於併發)

你是一個cpu,你同時談了三個女朋友,每一個都可以是一個戀愛任務,你被這三個任務共享要玩出併發戀愛的效果,
應該是你先跟女友1去看電影,看了一會說:不好,我要拉肚子,然後跑去跟第二個女友吃飯,吃了一會說:那啥,我去趟洗手間,然後跑去跟女友3開了個房,然後在你的基友眼裡,你就在和三個女友同時在一起玩。
單核cpu多程序併發舉例

 

  並行:並行:同時執行,只有具備多個cpu才能實現並行

將多個cpu必須成高速公路上的多個車道,程序就好比每個車道上行駛的車輛,並行就是說,大家在自己的車道上行駛,會不影響,同時在開車。這就是並行
多個cpu多個程序舉例

 

三.同步\非同步\阻塞\非阻塞 1.程序狀態

在瞭解其他概念之前,我們首先要了解程序的幾個狀態。在程式執行的過程中,由於被作業系統的排程演算法控制,程式會進入幾個狀態:就緒,執行和阻塞。

  (1)就緒(Ready)狀態

    當程序已分配到除CPU以外的所有必要的資源,只要獲得處理機便可立即執行,這時的程序狀態稱為就緒狀態。

  (2)執行/執行(Running)狀態當程序已獲得處理機,其程式正在處理機上執行,此時的程序狀態稱為執行狀態。

  (3)阻塞(Blocked)狀態正在執行的程序,由於等待某個事件發生而無法執行時,便放棄處理機而處於阻塞狀態。引起程序阻塞的事件可有多種,例如,等待I/O完成、申請緩衝區不能滿足、等待信件(訊號)等。

    事件請求:input、sleep、檔案輸入輸出、recv、accept等

    事件發生:sleep、input等完成了

    時間片到了之後有回到就緒狀態,這三個狀態不斷的在轉換。

 

2.同步非同步

所謂同步就是一個任務的完成需要依賴另外一個任務時,只有等待被依賴的任務完成後,依賴的任務才能算完成,這是一種可靠的任務序列。要麼成功都成功,失敗都失敗,兩個任務的狀態可以保持一致。其實就是一個程式結束才執行另外一個程式,序列的,不一定兩個程式就有依賴關係。阻塞和非阻塞這兩個概念與程式(執行緒)等待訊息通知(無所謂同步或者非同步)時的狀態有關。也就是說阻塞與非阻塞主要是程式(執行緒)等待訊息通知時的狀態角度來說的

    所謂非同步是不需要等待被依賴的任務完成,只是通知被依賴的任務要完成什麼工作,依賴的任務也立即執行,只要自己完成了整個任務就算完成了至於被依賴的任務最終是否真正完成,依賴它的任務無法確定所以它是不可靠的任務序列

比如我們去樓下的老家肉餅吃飯,飯點好了,取餐的時候發生了一些同步非同步的事情。
同步:我們都站在隊裡等著取餐,前面有個人點了一份肉餅,後廚做了很久,但是由於同步機制,我們還是要站在隊裡等著前面那個人的肉餅做好取走,我們才往前走一步。
非同步:我們點完餐之後,點餐員給了我們一個取餐號碼,跟你說,你不用在這裡排隊等著,去找個地方坐著玩手機去吧,等飯做好了,我叫你。這種機制(等待別人通知)就是非同步等待訊息通知。在非同步訊息處理中,等待訊息通知者(在這個例子中等著取餐的你)往往註冊一個回撥機制,在所等待的事件被觸發時由觸發機制(點餐員)通過某種機制(喊號,‘250號你的包子好了‘)找到等待該事件的人。
同步非同步舉例

3.阻塞與非阻塞

阻塞和非阻塞這兩個概念與程式(執行緒)等待訊息通知(無所謂同步或者非同步)時的狀態有關。也就是說阻塞與非阻塞主要是程式(執行緒)等待訊息通知時的狀態角度來說的

 

繼續上面的那個例子,不論是排隊還是使用號碼等待通知,如果在這個等待的過程中,等待者除了等待訊息通知之外不能做其它的事情,那麼該機制就是阻塞的,表現在程式中,也就是該程式一直阻塞在該函式呼叫處不能繼續往下執行。
相反,有的人喜歡在等待取餐的時候一邊打遊戲一邊等待,這樣的狀態就是非阻塞的,因為他(等待者)沒有阻塞在這個訊息通知上,而是一邊做自己的事情一邊等待。阻塞的方法:input、time.sleep,socket中的recv、accept等等。
阻塞與非阻塞舉例

 

 

 4.同步/非同步 與 阻塞和非阻塞

  1. 同步阻塞形式

    效率最低。拿上面的例子來說,就是你專心排隊,什麼別的事都不做。

  1. 非同步阻塞形式

    如果在排隊取餐的人採用的是非同步的方式去等待訊息被觸發(通知),也就是領了一張小紙條,假如在這段時間裡他不能做其它的事情,就在那坐著等著,不能玩遊戲等,那麼很顯然,這個人被阻塞在了這個等待的操作上面;

    非同步操作是可以被阻塞住的,只不過它不是在處理訊息時阻塞,而是在等待訊息通知時被阻塞。

  1. 同步非阻塞形式

    實際上是效率低下的。

    想象一下你一邊打著電話一邊還需要擡頭看到底隊伍排到你了沒有,如果把打電話和觀察排隊的位置看成是程式的兩個操作的話,這個程式需要在這兩種不同的行為之間來回的切換,效率可想而知是低下的。

  1. 非同步非阻塞形式

    效率更高,

    因為打電話是你(等待者)的事情,而通知你則是櫃檯(訊息觸發機制)的事情,程式沒有在兩種不同的操作中來回切換

    比如說,這個人突然發覺自己煙癮犯了,需要出去抽根菸,於是他告訴點餐員說,排到我這個號碼的時候麻煩到外面通知我一下,那麼他就沒有被阻塞在這個等待的操作上面,自然這個就是非同步+非阻塞的方式了。

  很多人會把同步和阻塞混淆,是因為很多時候同步操作會以阻塞的形式表現出來,同樣的,很多人也會把非同步和非阻塞混淆,因為非同步操作一般都不會在真正的IO操作處被阻塞

 

四.程序的建立

 1.multiprocessing模組第一種建立程序方式

import time
from multiprocessing import Process

def f1():
    time.sleep(3)
    print('xxx')

def f2():
    time.sleep(3)
    print('sss')


# window系統下必須寫main,因為windows系統建立子程序的方式決定的,開啟一個子程序,這個子程序會copy一份主程序的所有程式碼,並且機制類似於import引入,這樣就容易導致引入程式碼的時候,被引入的程式碼中的可執行程式被執行,導致遞迴開始程序,會報錯


if __name__ == '__main__':
    p1 = Process(target=f1)
    p2 = Process(target=f2)
    p1.start()
    p2.start()

 

  2.for迴圈建立程序

import time
from multiprocessing import Process

def f1(i):
    time.sleep(3)
    print(i)
if __name__ == '__main__':

    for i in range(20):
        p1 = Process(target=f1, args=(i,))
        p1.start()

 

  3.multiprocessing模組第二種建立程序方式

from multiprocessing import Process

class MyProcess(Process):

    def __init__(self,n):
        super().__init__()  # 別忘了執行父類的init
        self.n = n

    def run(self):
        print('寶寶and%s不可告人的事情'%self.n)

if __name__ == '__main__':

    p1 = MyProcess('高望')
    p1.start()

 

  4.join方法
 1 import time
 2 from multiprocessing import Process
 3 
 4 def f1():
 5     time.sleep(2)
 6     print('xxx')
 7 
 8 def f2():
 9     time.sleep(2)
10     print('sss')
11 
12 if __name__ == '__main__':
13 
14 
15     p1 = Process(target=f1,)
16 
17     p2 = Process(target=f2,)
18 
19     p1.start()
20     p2.start()
21     p1.join()  # 等待子程序p1執行完畢
22     p2.join()  # 等待子程序p2執行完畢
23     print('我要等待子程序')
24     print('我是主程序!!!')

 

 

五.程序鎖

通過剛剛的學習,我們千方百計實現了程式的非同步,讓多個任務可以同時在幾個程序中併發處理,他們之間的執行沒有順序,一旦開啟也不受我們控制。儘管併發程式設計讓我們能更加充分的利用IO資源,但是也給我們帶來了新的問題:程序之間資料不共享,但是共享同一套檔案系統,所以訪問同一個檔案,或同一個列印終端,是沒有問題的,而共享帶來的是競爭,競爭帶來的結果就是錯亂,如何控制,就是加鎖處理。

多程序搶佔輸出資源,導致列印混亂的示例

import os
import time
import random
from multiprocessing import Process

def work(n):
    print('%s: %s is running' %(n,os.getpid()))
    time.sleep(random.random())
    print('%s:%s is done' %(n,os.getpid()))

if __name__ == '__main__':
    for i in range(5):
        p=Process(target=work,args=(i,))
        p.start()

# 看結果:通過結果可以看出兩個問題:問題一:每個程序中work函式的第一個列印就不是按照我們for迴圈的0-4的順序來列印的
#問題二:我們發現,每個work程序中有兩個列印,但是我們看到所有程序中第一個列印的順序為0-2-1-4-3,但是第二個列印沒有按照這個順序,變成了2-1-0-3-4,說明我們一個程序中的程式的執行順序都混亂了。
#問題的解決方法,第二個問題加鎖來解決,第一個問題是沒有辦法解決的,因為程序開到了核心,有作業系統來決定程序的排程,我們自己控制不了
# 0: 9560 is running
# 2: 13824 is running
# 1: 7476 is running
# 4: 11296 is running
# 3: 14364 is running

# 2:13824 is done
# 1:7476 is done
# 0:9560 is done
# 3:14364 is done
# 4:11296 is done

多程序搶佔輸出資源,導致列印混亂的示例
View Code

加鎖版

 1 import time,json
 2 from multiprocessing import Process,Lock
 3 
 4 def show_t(i):
 5     f = open('ticket', mode='r',encoding='utf-8')
 6     dic = json.load(f)
 7     print('%s查詢剩餘票數為%s'%(i,dic['count']))
 8 def get_t(i,l1):
 9     l1.acquire()
10     f = open('ticket', mode='r', encoding='utf-8')
11     dic = json.load(f)
12     if dic['count'] > 0:
13         dic['count'] -= 1
14         print('%s搶票成功'%i)
15         time.sleep(0.2)
16         f = open('ticket',mode= 'w',encoding='utf-8')
17         json.dump(dic,f)
18     else:
19         print('沒票了')
20     l1.release()
21 if __name__ == '__main__':
22     l1 = Lock()
23     for i in range(10):
24         p1 = Process(target=show_t,args=(i,))
25         p1.start()
26 
27     for i in range(10):
28         p2 = Process(target=get_t,args=(i,l1))
29         p2.start()

程序鎖總結:

#加鎖可以保證多個程序修改同一塊資料時,同一時間只能有一個任務可以進行修改,即序列的修改,沒錯,速度是慢了,但犧牲了速度卻保證了資料安全。
雖然可以用檔案共享資料實現程序間通訊,但問題是:
1.效率低(共享資料基於檔案,而檔案是硬碟上的資料)
2.需要自己加鎖處理

#因此我們最好找尋一種解決方案能夠兼顧:1、效率高(多個程序共享一塊記憶體的資料)2、幫我們處理好鎖問題。這就是mutiprocessing模組為我們提供的基於訊息的IPC通訊機制:佇列和管道。
佇列和管道都是將資料存放於記憶體中
佇列又是基於(管道+鎖)實現的,可以讓我們從複雜的鎖問題中解脫出來,
我們應該儘量避免使用共享資料,儘可能使用訊息傳遞和佇列,避免處理複雜的同步和鎖問題,而且在程序數目增多時,往往可以獲得更好的可獲展性。

IPC通訊機制(瞭解):IPC是intent-Process Communication的縮寫,含義為程序間通訊或者跨程序通訊,是指兩個程序之間進行資料交換的過程。IPC不是某個系統所獨有的,任何一個作業系統都需要有相應的IPC機制,
比如Windows上可以通過剪貼簿、管道和郵槽等來進行程序間通訊,而Linux上可以通過命名共享內容、訊號量等來進行程序間通訊。Android它也有自己的程序間通訊方式,Android建構在Linux基礎上,繼承了一
部分Linux的通訊方式。

 

六.守護程序

之前我們講的子程序是不會隨著主程序的結束而結束,子程序全部執行完之後,程式才結束,那麼如果有一天我們的需求是我的主程序結束了,由我主程序建立的那些子程序必須跟著結束,怎麼辦?守護程序就來了!

    主程序建立守護程序

      其一:守護程序會在主程序程式碼執行結束後就終止

      其二:守護程序內無法再開啟子程序,否則丟擲異常:AssertionError: daemonic processes are not allowed to have children

    注意:程序之間是互相獨立的,主程序程式碼執行結束,守護程序隨即終止

 1 import time
 2 from multiprocessing import Process
 3 
 4 def f1():
 5     time.sleep(3)
 6     print('xxxx')
 7 
 8 def f2():
 9     time.sleep(5)
10     print('我是普通子程序')
11 
12 if __name__ == '__main__':
13     p1 =Process(target=f1,)
14     p1.daemon = True  # 將該程序設定為守護程序,必須寫在start之前,意思如果我的主程序程式碼執行結束了,你這個子程序不管執行到什麼地方,都直接結束
15     p1.start()
16     # 開啟一個普通的子程序來驗證一下守護程序的結束只和主程序的程式碼執行結束有關係,而整個程式的結束需要主程序和普通的子程序的程式碼都執行結束才結束
17     p2 = Process(target=f2,)
18     p2.start()
19     # 等待2號普通程序的結束,才繼續執行下面主程序的程式碼
20     # p2.join()
21     # 守護程序會跟著父程序的程式碼執行結束,就結束
22     print('我是主程序')

七.佇列

1.基礎佇列

 1 from multiprocessing import Process,Queue
 2 q = Queue(3)  # 建立一個佇列隊形,佇列長度為3,先進先出
 3 q.put(1)
 4 print('>>>>',q.qsize()) # 返回當前佇列的內容長度
 5 print(q.full())
 6 q.put(2)
 7 print('>>>>',q.qsize())
 8 q.put(3)
 9 print(q.full())  # q.full()瞭解,因為這個東西不可靠,滿了返回一個True,不滿返回一個False
10 # print('sss')
11 # q.put(4)  # 放入資料的時候,如果佇列滿了,程式會在你put操作的地方阻塞
12 try:
13     q.put_nowait(4)  # 不阻塞程式,但是會報錯queue.Full,可以通過捕獲異常來進行其他的操作
14 except:
15     print('佇列滿了,玩別的去吧')
16 
17 print('xxx')
18 
19 print(q.get())
20 print(q.get())
21 print(q.get())
22 print('是不是空了呀:',q.empty())  # q.empty()瞭解,因為這個東西不可靠,空了返回一個True,不空返回False
23 q.put(4)
24 print('是不是空了呀:',q.empty())   # True或者False,因為q在put資料的時候,有一個細微的延遲迴一個False
25 print(q.get())  # 如果佇列空了,程式會在這個地方卡主,也就是阻塞程式
26 try:
27     q.put_nowait()  # queue.Empty
28 except:
29     print('佇列空了,搞別的事情')
30 
31 print('佇列拿完了')

2.佇列之佇列簡單通訊

from multiprocessing import Process,Queue

def f1(q):
    q.put('約嗎?')

if __name__ == '__main__':
    q = Queue(3)
    p =Process(target=f1,args=(q,))
    p.start()

    son_p_msg = q.get()

    print('來自子程序的訊息:',son_p_msg)

3.佇列之生產者消費者模型

 1 import time
 2 from multiprocessing import Process,JoinableQueue
 3 
 4 # 生產者
 5 def producer(q):
 6     for i in range(10):
 7         time.sleep(0.7)
 8         s = '大包子%s號'%i
 9         print(s+'新鮮出爐')
10         q.put(s)
11     q.join() # 就等著task_done()訊號的數量.和我put進去的數量想同時,才繼續執行
12     print('所有任務都被處理了,繼續前行吧少年們')
13 
14 
15 def consumer(q):
16     while 1:
17         time.sleep(0.2)
18         baozi =q.get()
19 
20         print(baozi+'被吃了')
21         q.task_done()  # 給佇列傳送一個取出的這個任務已經處理完畢的訊號
22 
23 
24 if __name__ == '__main__':
25     q = JoinableQueue(10)
26     pro_p = Process(target=producer,args=(q,))
27     con_p = Process(target=consumer,args=(q,))
28     pro_p.start()
29     con_p.daemon = True  # 守護程序
30     con_p.start()
31 
32     pro_p.join()
33     print('主程序結束')