1. 程式人生 > 實用技巧 >多程序與多執行緒,python程式例項

多程序與多執行緒,python程式例項

轉載:https://www.cnblogs.com/yuanchenqi/articles/6755717.html

一 程序與執行緒的概念

1.1 程序

考慮一個場景:瀏覽器,網易雲音樂以及notepad++ 三個軟體只能順序執行是怎樣一種場景呢?另外,假如有兩個程式A和B,程式A在執行到一半的過程中,需要讀取大量的資料輸入(I/O操作),而此時CPU只能靜靜地等待任務A讀取完資料才能繼續執行,這樣就白白浪費了CPU資源。你是不是已經想到在程式A讀取資料的過程中,讓程式B去執行,當程式A讀取完資料之後,讓程式B暫停。聰明,這當然沒問題,但這裡有一個關鍵詞:切換。

既然是切換,那麼這就涉及到了狀態的儲存,狀態的恢復,加上程式A與程式B所需要的系統資源(記憶體,硬碟,鍵盤等等)是不一樣的。自然而然的就需要有一個東西去記錄程式A和程式B分別需要什麼資源,怎樣去識別程式A和程式B等等(比如讀書)。

程序定義:

程序就是一個程式在一個數據集上的一次動態執行過程。程序一般由程式、資料集、程序控制塊三部分組成。我們編寫的程式用來描述程序要完成哪些功能以及如何完成;資料集則是程式在執行過程中所需要使用的資源;程序控制塊用來記錄程序的外部特徵,描述程序的執行變化過程,系統可以利用它來控制和管理程序,它是系統感知程序存在的唯一標誌。

舉一例說明程序:
想象一位有一手好廚藝的電腦科學家正在為他的女兒烘製生日蛋糕。他有做生日蛋糕的食譜,廚房裡有所需的原料:麵粉、雞蛋、糖、香草汁等。在這個比喻中,做蛋糕的食譜就是程式(即用適當形式描述的演算法)電腦科學家就是處理器(cpu),而做蛋糕的各種原料就是輸入資料。程序就是廚師閱讀食譜、取來各種原料以及烘製蛋糕等一系列動作的總和。現在假設電腦科學家的兒子哭著跑了進來,說他的頭被一隻蜜蜂蟄了。電腦科學家就記錄下他照著食譜做到哪兒了(儲存程序的當前狀態),然後拿出一本急救手冊,按照其中的指示處理蟄傷。這裡,我們看到處理機從一個程序(做蛋糕)切換到另一個高優先順序的程序(實施醫療救治),每個程序擁有各自的程式(食譜和急救手冊)。當蜜蜂蟄傷處理完之後,這位電腦科學家又回來做蛋糕,從他離開時的那一步繼續做下去。

1.2 執行緒

執行緒的出現是為了降低上下文切換的消耗,提高系統的併發性,並突破一個程序只能幹一樣事的缺陷,使到程序內併發成為可能。

假設,一個文字程式,需要接受鍵盤輸入,將內容顯示在螢幕上,還需要儲存資訊到硬碟中。若只有一個程序,勢必造成同一時間只能幹一樣事的尷尬(當儲存時,就不能通過鍵盤輸入內容)。若有多個程序,每個程序負責一個任務,程序A負責接收鍵盤輸入的任務,程序B負責將內容顯示在螢幕上的任務,程序C負責儲存內容到硬碟中的任務。這裡程序A,B,C間的協作涉及到了程序通訊問題,而且有共同都需要擁有的東西——-文字內容,不停的切換造成效能上的損失。若有一種機制,可以使任務A,B,C共享資源,這樣上下文切換所需要儲存和恢復的內容就少了,同時又可以減少通訊所帶來的效能損耗,那就好了。是的,這種機制就是執行緒。
執行緒也叫輕量級程序,它是一個基本的CPU執行單元,也是程式執行過程中的最小單元,由執行緒ID、程式計數器、暫存器集合和堆疊共同組成。執行緒的引入減小了程式併發執行時的開銷,提高了作業系統的併發效能。執行緒沒有自己的系統資源。

1.3 程序與執行緒的關係

程序是計算機中的程式關於某資料集合上的一次執行活動,是系統進行資源分配和排程的基本單位,是作業系統結構的基礎。或者說程序是具有一定獨立功能的程式關於某個資料集合上的一次執行活動,程序是系統進行資源分配和排程的一個獨立單位。
執行緒則是程序的一個實體,是CPU排程和分派的基本單位,它是比程序更小的能獨立執行的基本單位。

程序和執行緒的關係:

(1)一個執行緒只能屬於一個程序,而一個程序可以有多個執行緒,但至少有一個執行緒。
(2)資源分配給程序,同一程序的所有執行緒共享該程序的所有資源。
(3)CPU分給執行緒,即真正在CPU上執行的是執行緒。

1.4 並行和併發

並行處理(Parallel Processing)是計算機系統中能同時執行兩個或更多個處理的一種計算方法。並行處理可同時工作於同一程式的不同方面。並行處理的主要目的是節省大型和複雜問題的解決時間。併發處理(concurrency Processing):指一個時間段中有幾個程式都處於已啟動執行到執行完畢之間,且這幾個程式都是在同一個處理機(CPU)上執行,但任一個時刻點上只有一個程式在處理機(CPU)上執行

併發的關鍵是你有處理多個任務的能力,不一定要同時。並行的關鍵是你有同時處理多個任務的能力。所以說,並行是併發的子集

1.5 同步與非同步

在計算機領域,同步就是指一個程序在執行某個請求的時候,若該請求需要一段時間才能返回資訊,那麼這個程序將會一直等待下去,直到收到返回資訊才繼續執行下去;非同步是指程序不需要一直等下去,而是繼續執行下面的操作,不管其他程序的狀態。當有訊息返回時系統會通知程序進行處理,這樣可以提高執行的效率。舉個例子,打電話時就是同步通訊,發短息時就是非同步通訊。

回到頂部

二 threading模組

2.1 執行緒物件的建立

2.1.1 Thread類直接建立

import threading
import time

def countNum(n): # 定義某個執行緒要執行的函式

    print("running on number:%s" %n)

    time.sleep(3)

if __name__ == '__main__':

    t1 = threading.Thread(target=countNum,args=(23,)) #生成一個執行緒例項
    t2 = threading.Thread(target=countNum,args=(34,))

    t1.start() #啟動執行緒
    t2.start()

    print("ending!")

2.1.2 Thread類繼承式建立

#繼承Thread式建立

import threading
import time

class MyThread(threading.Thread):

    def __init__(self,num):
        threading.Thread.__init__(self)
        self.num=num

    def run(self):
        print("running on number:%s" %self.num)
        time.sleep(3)

t1=MyThread(56)
t2=MyThread(78)

t1.start()
t2.start()
print("ending")

2.2 Thread類的例項方法

2.2.1 join()和setDaemon()

# join():在子執行緒完成執行之前,這個子執行緒的父執行緒將一直被阻塞。

# setDaemon(True):
        '''
         將執行緒宣告為守護執行緒,必須在start() 方法呼叫之前設定,如果不設定為守護執行緒程式會被無限掛起。

         當我們在程式執行中,執行一個主執行緒,如果主執行緒又建立一個子執行緒,主執行緒和子執行緒 就分兵兩路,分別執行,那麼當主執行緒完成

         想退出時,會檢驗子執行緒是否完成。如果子執行緒未完成,則主執行緒會等待子執行緒完成後再退出。但是有時候我們需要的是隻要主執行緒

         完成了,不管子執行緒是否完成,都要和主執行緒一起退出,這時就可以 用setDaemon方法啦'''


import threading
from time import ctime,sleep
import time

def Music(name):

        print ("Begin listening to {name}. {time}".format(name=name,time=ctime()))
        sleep(3)
        print("end listening {time}".format(time=ctime()))

def Blog(title):

        print ("Begin recording the {title}. {time}".format(title=title,time=ctime()))
        sleep(5)
        print('end recording {time}'.format(time=ctime()))


threads = []


t1 = threading.Thread(target=Music,args=('FILL ME',))
t2 = threading.Thread(target=Blog,args=('',))

threads.append(t1)
threads.append(t2)

if __name__ == '__main__':

    #t2.setDaemon(True)

    for t in threads:

        #t.setDaemon(True) #注意:一定在start之前設定
        t.start()

        #t.join()

    #t1.join()
    #t2.join()    #  考慮這三種join位置下的結果?

    print ("all over %s" %ctime())
daemon

2.2.2 其它方法

Thread例項物件的方法
# isAlive(): 返回執行緒是否活動的。 # getName(): 返回執行緒名。 # setName(): 設定執行緒名。 threading模組提供的一些方法: # threading.currentThread(): 返回當前的執行緒變數。 # threading.enumerate(): 返回一個包含正在執行的執行緒的list。正在執行指執行緒啟動後、結束前,不包括啟動前和終止後的執行緒。 # threading.activeCount(): 返回正在執行的執行緒數量,與len(threading.enumerate())有相同的結果。

2.3 GIL(全域性直譯器鎖)

'''
定義: In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.) '''

Python中的執行緒是作業系統的原生執行緒,Python虛擬機器使用一個全域性直譯器鎖(Global Interpreter Lock)來互斥執行緒對Python虛擬機器的使用。為了支援多執行緒機制,一個基本的要求就是需要實現不同執行緒對共享資源訪問的互斥,所以引入了GIL。
GIL:在一個執行緒擁有了直譯器的訪問權之後,其他的所有執行緒都必須等待它釋放直譯器的訪問權,即使這些執行緒的下一條指令並不會互相影響。
在呼叫任何Python C API之前,要先獲得GIL
GIL缺點:多處理器退化為單處理器;優點:避免大量的加鎖解鎖操作

2.3.1 GIL的早期設計

Python支援多執行緒,而解決多執行緒之間資料完整性和狀態同步的最簡單方法自然就是加鎖。 於是有了GIL這把超級大鎖,而當越來越多的程式碼庫開發者接受了這種設定後,他們開始大量依賴這種特性(即預設python內部物件是thread-safe的,無需在實現時考慮額外的記憶體鎖和同步操作)。慢慢的這種實現方式被發現是蛋疼且低效的。但當大家試圖去拆分和去除GIL的時候,發現大量庫程式碼開發者已經重度依賴GIL而非常難以去除了。有多難?做個類比,像MySQL這樣的“小專案”為了把Buffer Pool Mutex這把大鎖拆分成各個小鎖也花了從5.5到5.6再到5.7多個大版為期近5年的時間,並且仍在繼續。MySQL這個背後有公司支援且有固定開發團隊的產品走的如此艱難,那又更何況Python這樣核心開發和程式碼貢獻者高度社群化的團隊呢?

2.3.2 GIL的影響

無論你啟多少個執行緒,你有多少個cpu, Python在執行一個程序的時候會淡定的在同一時刻只允許一個執行緒執行。
所以,python是無法利用多核CPU實現多執行緒的。
這樣,python對於計算密集型的任務開多執行緒的效率甚至不如序列(沒有大量切換),但是,對於IO密集型的任務效率還是有顯著提升的。

計算密集型:

#coding:utf8
from threading import Thread
import time

def counter():
    i = 0
    for _ in range(50000000):
        i = i + 1

    return True


def main():

    l=[]
    start_time = time.time()

    for i in range(2):

        t = Thread(target=counter)
        t.start()
        l.append(t)
        t.join()

    # for t in l:
    #     t.join()

    end_time = time.time()
    print("Total time: {}".format(end_time - start_time))

if __name__ == '__main__':
    main()


'''
py2.7:
     序列:25.4523348808s
     併發:31.4084379673s
py3.5:
     序列:8.62115597724914s
     併發:8.99609899520874s

'''

2.3.3 解決方案

用multiprocessing替代Thread multiprocessing庫的出現很大程度上是為了彌補thread庫因為GIL而低效的缺陷。它完整的複製了一套thread所提供的介面方便遷移。唯一的不同就是它使用了多程序而不是多執行緒。每個程序有自己的獨立的GIL,因此也不會出現程序之間的GIL爭搶。

View Code

當然multiprocessing也不是萬能良藥。它的引入會增加程式實現時執行緒間資料通訊和同步的困難。就拿計數器來舉例子,如果我們要多個執行緒累加同一個變數,對於thread來說,申明一個global變數,用thread.Lock的context包裹住三行就搞定了。而multiprocessing由於程序之間無法看到對方的資料,只能通過在主執行緒申明一個Queue,put再get或者用share memory的方法。這個額外的實現成本使得本來就非常痛苦的多執行緒程式編碼,變得更加痛苦了。

總結:因為GIL的存在,只有IO Bound場景下得多執行緒會得到較好的效能 - 如果對平行計算效能較高的程式可以考慮把核心部分也成C模組,或者索性用其他語言實現 - GIL在較長一段時間內將會繼續存在,但是會不斷對其進行改進。

所以對於GIL,既然不能反抗,那就學會去享受它吧!

2.4 同步鎖 (Lock)

import time
import threading

def addNum():
    global num #在每個執行緒中都獲取這個全域性變數
    #num-=1

    temp=num
    time.sleep(0.1)
    num =temp-1  # 對此公共變數進行-1操作

num = 100  #設定一個共享變數

thread_list = []

for i in range(100):
    t = threading.Thread(target=addNum)
    t.start()
    thread_list.append(t)

for t in thread_list: #等待所有執行緒執行完畢
    t.join()

print('Result: ', num)

鎖通常被用來實現對共享資源的同步訪問。為每一個共享資源建立一個Lock物件,當你需要訪問該資源時,呼叫acquire方法來獲取鎖物件(如果其它執行緒已經獲得了該鎖,則當前執行緒需等待其被釋放),待資源訪問完後,再呼叫release方法釋放鎖:

import threading

R=threading.Lock()

R.acquire()
'''
對公共資料的操作
'''
R.release()

擴充套件思考

View Code

2.5 死鎖與遞迴鎖

所謂死鎖: 是指兩個或兩個以上的程序或執行緒在執行過程中,因爭奪資源而造成的一種互相等待的現象,若無外力作用,它們都將無法推進下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的程序稱為死鎖程序。

import threading
import time

mutexA = threading.Lock()
mutexB = threading.Lock()

class MyThread(threading.Thread):

    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        self.fun1()
        self.fun2()

    def fun1(self):

        mutexA.acquire()  # 如果鎖被佔用,則阻塞在這裡,等待鎖的釋放

        print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time()))

        mutexB.acquire()
        print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time()))
        mutexB.release()
        mutexA.release()


    def fun2(self):

        mutexB.acquire()
        print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time()))
        time.sleep(0.2)

        mutexA.acquire()
        print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time()))
        mutexA.release()

        mutexB.release()

if __name__ == "__main__":

    print("start---------------------------%s"%time.time())

    for i in range(0, 10):
        my_thread = MyThread()
        my_thread.start()

在Python中為了支援在同一執行緒中多次請求同一資源,python提供了可重入鎖RLock。這個RLock內部維護著一個Lock和一個counter變數,counter記錄了acquire的次數,從而使得資源可以被多次require。直到一個執行緒所有的acquire都被release,其他的執行緒才能獲得資源。上面的例子如果使用RLock代替Lock,則不會發生死鎖:

1 mutex=threading.RLock()

2.6 Event物件

執行緒的一個關鍵特性是每個執行緒都是獨立執行且狀態不可預測。如果程式中的其 他執行緒需要通過判斷某個執行緒的狀態來確定自己下一步的操作,這時執行緒同步問題就 會變得非常棘手。為了解決這些問題,我們需要使用threading庫中的Event物件。 物件包含一個可由執行緒設定的訊號標誌,它允許執行緒等待某些事件的發生。在 初始情況下,Event物件中的訊號標誌被設定為假。如果有執行緒等待一個Event物件, 而這個Event物件的標誌為假,那麼這個執行緒將會被一直阻塞直至該標誌為真。一個執行緒如果將一個Event物件的訊號標誌設定為真,它將喚醒所有等待這個Event物件的執行緒。如果一個執行緒等待一個已經被設定為真的Event物件,那麼它將忽略這個事件, 繼續執行

event.isSet():返回event的狀態值;

event.wait():如果 event.isSet()==False將阻塞執行緒;

event.set(): 設定event的狀態值為True,所有阻塞池的執行緒啟用進入就緒狀態, 等待作業系統排程;

event.clear():恢復event的狀態值為False。

可以考慮一種應用場景(僅僅作為說明),例如,我們有多個執行緒從Redis佇列中讀取資料來處理,這些執行緒都要嘗試去連線Redis的服務,一般情況下,如果Redis連線不成功,在各個執行緒的程式碼中,都會去嘗試重新連線。如果我們想要在啟動時確保Redis服務正常,才讓那些工作執行緒去連線Redis伺服器,那麼我們就可以採用threading.Event機制來協調各個工作執行緒的連線操作:主執行緒中會去嘗試連線Redis服務,如果正常的話,觸發事件,各工作執行緒會嘗試連線Redis服務。

import threading
import time
import logging

logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-10s) %(message)s',)

def worker(event):
    logging.debug('Waiting for redis ready...')
    event.wait()
    logging.debug('redis ready, and connect to redis server and do some work [%s]', time.ctime())
    time.sleep(1)

def main():
    readis_ready = threading.Event()
    t1 = threading.Thread(target=worker, args=(readis_ready,), name='t1')
    t1.start()

    t2 = threading.Thread(target=worker, args=(readis_ready,), name='t2')
    t2.start()

    logging.debug('first of all, check redis server, make sure it is OK, and then trigger the redis ready event')
    time.sleep(3) # simulate the check progress
    readis_ready.set()

if __name__=="__main__":
    main()

threading.Event的wait方法還接受一個超時引數,預設情況下如果事件一致沒有發生,wait方法會一直阻塞下去,而加入這個超時引數之後,如果阻塞時間超過這個引數設定的值之後,wait方法會返回。對應於上面的應用場景,如果Redis伺服器一致沒有啟動,我們希望子執行緒能夠列印一些日誌來不斷地提醒我們當前沒有一個可以連線的Redis服務,我們就可以通過設定這個超時引數來達成這樣的目的:

def worker(event):
    while not event.is_set():
        logging.debug('Waiting for redis ready...')
        event.wait(2)
    logging.debug('redis ready, and connect to redis server and do some work [%s]', time.ctime())
    time.sleep(1)

這樣,我們就可以在等待Redis服務啟動的同時,看到工作執行緒里正在等待的情況。

2.7 Semaphore(訊號量)

Semaphore管理一個內建的計數器,
每當呼叫acquire()時內建計數器-1;
呼叫release() 時內建計數器+1;
計數器不能小於0;當計數器為0時,acquire()將阻塞執行緒直到其他執行緒呼叫release()。

例項:(同時只有5個執行緒可以獲得semaphore,即可以限制最大連線數為5):

import threading
import time

semaphore = threading.Semaphore(5)

def func():
    if semaphore.acquire():
        print (threading.currentThread().getName() + ' get semaphore')
        time.sleep(2)
        semaphore.release()

for i in range(20):
  t1 = threading.Thread(target=func)
  t1.start()

應用:連線池

思考:與Rlock的區別?

2.8 佇列(queue)

queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.

2.8.1 get與put方法

'''

建立一個“佇列”物件

import Queue
q = Queue.Queue(maxsize = 10)
Queue.Queue類即是一個佇列的同步實現。佇列長度可為無限或者有限。可通過Queue的建構函式的可選引數
maxsize來設定佇列長度。如果maxsize小於1就表示佇列長度無限。

將一個值放入佇列中
q.put(10)
呼叫佇列物件的put()方法在隊尾插入一個專案。put()有兩個引數,第一個item為必需的,為插入專案的值;
第二個block為可選引數,預設為
1。如果隊列當前為空且block為1,put()方法就使呼叫執行緒暫停,直到空出一個數據單元。如果block為0,
put方法將引發Full異常。

將一個值從佇列中取出
q.get()
呼叫佇列物件的get()方法從隊頭刪除並返回一個專案。可選引數為block,預設為True。如果佇列為空且
block為True,get()就使呼叫執行緒暫停,直至有專案可用。如果佇列為空且block為False,佇列將引發Empty異常。

'''

2.8.2 join與task_done方法

'''
join() 阻塞程序,直到所有任務完成,需要配合另一個方法task_done。

    def join(self):
     with self.all_tasks_done:
      while self.unfinished_tasks:
       self.all_tasks_done.wait()

task_done() 表示某個任務完成。每一條get語句後需要一條task_done。


import queue
q = queue.Queue(5)
q.put(10)
q.put(20)
print(q.get())
q.task_done()
print(q.get())
q.task_done()

q.join()

print("ending!")
'''

2.8.3 其他常用方法

'''

此包中的常用方法(q = Queue.Queue()):
q.qsize() 返回佇列的大小 q.empty() 如果佇列為空,返回True,反之False q.full() 如果佇列滿了,返回True,反之False q.full 與 maxsize 大小對應 q.get([block[, timeout]]) 獲取佇列,timeout等待時間 q.get_nowait() 相當q.get(False)非阻塞
q.put(item) 寫入佇列,timeout等待時間 q.put_nowait(item) 相當q.put(item, False) q.task_done() 在完成一項工作之後,q.task_done() 函式向任務已經完成的佇列傳送一個訊號 q.join() 實際上意味著等到佇列為空,再執行別的操作 '''

2.8.4 其他模式

'''

Python Queue模組有三種佇列及建構函式: 

1、Python Queue模組的FIFO佇列先進先出。  class queue.Queue(maxsize) 
2、LIFO類似於堆,即先進後出。           class queue.LifoQueue(maxsize) 
3、還有一種是優先順序佇列級別越低越先出來。 class queue.PriorityQueue(maxsize) 


import queue

#先進後出

q=queue.LifoQueue()

q.put(34)
q.put(56)
q.put(12)

#優先順序
q=queue.PriorityQueue()
q.put([5,100])
q.put([7,200])
q.put([3,"hello"])
q.put([4,{"name":"alex"}])

while 1:
  data=q.get()
  print(data)

'''

2.8.5 生產者消費者模型

線上程世界裡,生產者就是生產資料的執行緒,消費者就是消費資料的執行緒。在多執行緒開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產資料。同樣的道理,如果消費者的處理能力大於生產者,那麼消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。

生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞佇列來進行通訊,所以生產者生產完資料之後不用等待消費者處理,直接扔給阻塞佇列,消費者不找生產者要資料,而是直接從阻塞佇列裡取,阻塞佇列就相當於一個緩衝區,平衡了生產者和消費者的處理能力。

這就像,在餐廳,廚師做好菜,不需要直接和客戶交流,而是交給前臺,而客戶去飯菜也不需要不找廚師,直接去前臺領取即可,這也是一個結耦的過程。

import time,random
import queue,threading

q = queue.Queue()

def Producer(name):
  count = 0
  while count <10:
    print("making........")
    time.sleep(random.randrange(3))
    q.put(count)
    print('Producer %s has produced %s baozi..' %(name, count))
    count +=1
    #q.task_done()
    #q.join()
    print("ok......")
def Consumer(name):
  count = 0
  while count <10:
    time.sleep(random.randrange(4))
    if not q.empty():
        data = q.get()
        #q.task_done()
        #q.join()
        print(data)
        print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data))
    else:
        print("-----no baozi anymore----")
    count +=1

p1 = threading.Thread(target=Producer, args=('A',))
c1 = threading.Thread(target=Consumer, args=('B',))
# c2 = threading.Thread(target=Consumer, args=('C',))
# c3 = threading.Thread(target=Consumer, args=('D',))
p1.start()
c1.start()
# c2.start()
# c3.start()
回到頂部

三 multiprocessing模組

Multiprocessing is a package that supports spawning processes using an API similar to the threading module. The multiprocessing package offers both local and remote concurrency,effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads. Due to this, the multiprocessing module allows the programmer to fully leverage multiple processors on a given machine. It runs on both Unix and Windows.

由於GIL的存在,python中的多執行緒其實並不是真正的多執行緒,如果想要充分地使用多核CPU的資源,在python中大部分情況需要使用多程序。

multiprocessing包是Python中的多程序管理包。與threading.Thread類似,它可以利用multiprocessing.Process物件來建立一個程序。該程序可以執行在Python程式內部編寫的函式。該Process物件與Thread物件的用法相同,也有start(), run(), join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition類 (這些物件可以像多執行緒那樣,通過引數傳遞給各個程序),用以同步程序,其用法與threading包中的同名類一致。所以,multiprocessing的很大一部份與threading使用同一套API,只不過換到了多程序的情境。

3.1 python的程序呼叫

# Process類呼叫

from multiprocessing import Process
import time
def f(name):

    print('hello', name,time.ctime())
    time.sleep(1)

if __name__ == '__main__':
    p_list=[]
    for i in range(3):
        p = Process(target=f, args=('alvin:%s'%i,))
        p_list.append(p)
        p.start()
    for i in p_list:
        p.join()
    print('end')

# 繼承Process類呼叫
from multiprocessing import Process
import time

class MyProcess(Process):
    def __init__(self):
        super(MyProcess, self).__init__()
        # self.name = name

    def run(self):

        print ('hello', self.name,time.ctime())
        time.sleep(1)


if __name__ == '__main__':
    p_list=[]
    for i in range(3):
        p = MyProcess()
        p.start()
        p_list.append(p)

    for p in p_list:
        p.join()

    print('end')

3.2 process類

構造方法:

Process([group [, target [, name [, args [, kwargs]]]]])

  group: 執行緒組,目前還沒有實現,庫引用中提示必須是None;
  target: 要執行的方法;
  name: 程序名;
  args/kwargs: 要傳入方法的引數。

例項方法:

  is_alive():返回程序是否在執行。

  join([timeout]):阻塞當前上下文環境的程序程,直到呼叫此方法的程序終止或到達指定的timeout(可選引數)。

  start():程序準備就緒,等待CPU排程

  run():strat()呼叫run方法,如果例項程序時未制定傳入target,這star執行t預設run()方法。

  terminate():不管任務是否完成,立即停止工作程序

屬性:

  daemon:和執行緒的setDeamon功能一樣

  name:程序名字。

  pid:程序號。

from multiprocessing import Process
import os
import time
def info(name):


    print("name:",name)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())
    print("------------------")
    time.sleep(1)

def foo(name):

    info(name)

if __name__ == '__main__':

    info('main process line')


    p1 = Process(target=info, args=('alvin',))
    p2 = Process(target=foo, args=('egon',))
    p1.start()
    p2.start()

    p1.join()
    p2.join()

    print("ending")

通過tasklist(Win)或者ps -elf |grep(linux)命令檢測每一個程序號(PID)對應的程序名

3.3 程序間通訊 

3.3.1 程序對列Queue

from multiprocessing import Process, Queue
import queue

def f(q,n):
    #q.put([123, 456, 'hello'])
    q.put(n*n+1)
    print("son process",id(q))

if __name__ == '__main__':
    q = Queue()  #try: q=queue.Queue()
    print("main process",id(q))

    for i in range(3):
        p = Process(target=f, args=(q,i))
        p.start()

    print(q.get())
    print(q.get())
    print(q.get())

3.3.2 管道(pipe)

ThePipe()function returns a pair of connection objects connected by a pipe which by default is duplex (two-way). For example:

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([12, {"name":"yuan"}, 'hello'])
    response=conn.recv()
    print("response",response)
    conn.close()
    if __name__ == '__main__':

    parent_conn, child_conn = Pipe()
   
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # prints "[42, None, 'hello']"
    parent_conn.send("兒子你好!")
    p.join()

Pipe()返回的兩個連線物件代表管道的兩端。每個連線物件都有send()和recv()方法(等等)。請注意,如果兩個程序(或執行緒)嘗試同時讀取或寫入管道的同一端,管道中的資料可能會損壞。

3.3.3 manager

Queue和pipe只是實現了資料互動,並沒實現資料共享,即一個程序去更改另一個程序的資料

A manager object returned byManager()controls a server process which holds Python objects and allows other processes to manipulate them using proxies.

from multiprocessing import Process, Manager

def f(d, l,n):

    d[n] = n
    d["name"] ="alvin"
    l.append(n)

    #print("l",l)

if __name__ == '__main__':

    with Manager() as manager:

        d = manager.dict()

        l = manager.list(range(5))
        p_list = []

        for i in range(10):
            p = Process(target=f, args=(d,l,i))
            p.start()
            p_list.append(p)

        for res in p_list:
            res.join()

        print(d)
        print(l)

3.4 程序池

程序池內部維護一個程序序列,當使用時,則去程序池中獲取一個程序,如果程序池序列中沒有可供使用的進程序,那麼程式就會等待,直到程序池中有可用程序為止。

from multiprocessing import Pool
import time

def foo(args):
 time.sleep(1)
 print(args)

if __name__ == '__main__':
 p = Pool(5)
 for i in range(30):
     p.apply_async(func=foo, args= (i,))

 p.close()   # 等子程序執行完畢後關閉執行緒池
 # time.sleep(2)
 # p.terminate()  # 立刻關閉執行緒池
 p.join()

程序池內部維護一個程序序列,當使用時,去程序池中獲取一個程序,如果程序池序列中沒有可供使用的程序,那麼程式就會等待,直到程序池中有可用程序為止。

程序池中有以下幾個主要方法:

  1. apply:從程序池裡取一個程序並執行
  2. apply_async:apply的非同步版本
  3. terminate:立刻關閉執行緒池
  4. join:主程序等待所有子程序執行完畢,必須在close或terminate之後
  5. close:等待所有程序結束後,才關閉執行緒池
回到頂部

四 協程

協程,又稱微執行緒,纖程。英文名Coroutine。一句話說明什麼是執行緒:協程是一種使用者態的輕量級執行緒。

協程擁有自己的暫存器上下文和棧。協程排程切換時,將暫存器上下文和棧儲存到其他地方,在切回來的時候,恢復先前儲存的暫存器上下文和棧。因此:

協程能保留上一次呼叫時的狀態(即所有區域性狀態的一個特定組合),每次過程重入時,就相當於進入上一次呼叫的狀態,換種說法:進入上一次離開時所處邏輯流的位置。

4.1 yield與協程

import time

"""
傳統的生產者-消費者模型是一個執行緒寫訊息,一個執行緒取訊息,通過鎖機制控制佇列和等待,但一不小心就可能死鎖。
如果改用協程,生產者生產訊息後,直接通過yield跳轉到消費者開始執行,待消費者執行完畢後,切換回生產者繼續生產,效率極高。
"""
# 注意到consumer函式是一個generator(生成器):
# 任何包含yield關鍵字的函式都會自動成為生成器(generator)物件

def consumer():
    r = ''
    while True:
        # 3、consumer通過yield拿到訊息,處理,又通過yield把結果傳回;
        #    yield指令具有return關鍵字的作用。然後函式的堆疊會自動凍結(freeze)在這一行。
        #    當函式呼叫者的下一次利用next()或generator.send()或for-in來再次呼叫該函式時,
        #    就會從yield程式碼的下一行開始,繼續執行,再返回下一次迭代結果。通過這種方式,迭代器可以實現無限序列和惰性求值。
        n = yield r
        if not n:
            return
        print('[CONSUMER] ←← Consuming %s...' % n)
        time.sleep(1)
        r = '200 OK'
def produce(c):
    # 1、首先呼叫c.next()啟動生成器
    next(c)
    n = 0
    while n < 5:
        n = n + 1
        print('[PRODUCER] →→ Producing %s...' % n)
        # 2、然後,一旦生產了東西,通過c.send(n)切換到consumer執行;
        cr = c.send(n)
        # 4、produce拿到consumer處理的結果,繼續生產下一條訊息;
        print('[PRODUCER] Consumer return: %s' % cr)
    # 5、produce決定不生產了,通過c.close()關閉consumer,整個過程結束。
    c.close()
if __name__=='__main__':
    # 6、整個流程無鎖,由一個執行緒執行,produce和consumer協作完成任務,所以稱為“協程”,而非執行緒的搶佔式多工。
    c = consumer()
    produce(c)
    
    
'''
result:

[PRODUCER] →→ Producing 1...
[CONSUMER] ←← Consuming 1...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] →→ Producing 2...
[CONSUMER] ←← Consuming 2...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] →→ Producing 3...
[CONSUMER] ←← Consuming 3...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] →→ Producing 4...
[CONSUMER] ←← Consuming 4...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] →→ Producing 5...
[CONSUMER] ←← Consuming 5...
[PRODUCER] Consumer return: 200 OK
'''

4.2 greenlet

greenlet機制的主要思想是:生成器函式或者協程函式中的yield語句掛起函式的執行,直到稍後使用next()或send()操作進行恢復為止。可以使用一個排程器迴圈在一組生成器函式之間協作多個任務。greentlet是python中實現我們所謂的"Coroutine(協程)"的一個基礎庫.

from greenlet import greenlet
 
def test1():
    print (12)
    gr2.switch()
    print (34)
    gr2.switch()
 
def test2():
    print (56)
    gr1.switch()
    print (78)
 
gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()

4.2 基於greenlet的框架

4.2.1 gevent模組實現協程

Python通過yield提供了對協程的基本支援,但是不完全。而第三方的gevent為Python提供了比較完善的協程支援。

gevent是第三方庫,通過greenlet實現協程,其基本思想是:

當一個greenlet遇到IO操作時,比如訪問網路,就自動切換到其他的greenlet,等到IO操作完成,再在適當的時候切換回來繼續執行。由於IO操作非常耗時,經常使程式處於等待狀態,有了gevent為我們自動切換協程,就保證總有greenlet在執行,而不是等待IO。

由於切換是在IO操作時自動完成,所以gevent需要修改Python自帶的一些標準庫,這一過程在啟動時通過monkey patch完成:

import gevent
import time

def foo():
    print("running in foo")
    gevent.sleep(2)
    print("switch to foo again")

def bar():
    print("switch to bar")
    gevent.sleep(5)
    print("switch to bar again")

start=time.time()

gevent.joinall(
    [gevent.spawn(foo),
    gevent.spawn(bar)]
)

print(time.time()-start)

當然,實際程式碼裡,我們不會用gevent.sleep()去切換協程,而是在執行到IO操作時,gevent自動切換,程式碼如下:

from gevent import monkey
monkey.patch_all() import gevent from urllib import request import time def f(url): print('GET: %s' % url) resp = request.urlopen(url) data = resp.read() print('%d bytes received from %s.' % (len(data), url)) start=time.time() gevent.joinall([ gevent.spawn(f, 'https://itk.org/'), gevent.spawn(f, 'https://www.github.com/'), gevent.spawn(f, 'https://zhihu.com/'), ]) # f('https://itk.org/') # f('https://www.github.com/') # f('https://zhihu.com/') print(time.time()-start)

擴充套件

View Code

eventlet實現協程(瞭解)

eventlet 是基於 greenlet 實現的面向網路應用的併發處理框架,提供“執行緒”池、佇列等與其他 Python 執行緒、程序模型非常相似的 api,並且提供了對 Python 發行版自帶庫及其他模組的超輕量併發適應性調整方法,比直接使用 greenlet 要方便得多。

其基本原理是調整 Python 的 socket 呼叫,當發生阻塞時則切換到其他 greenlet 執行,這樣來保證資源的有效利用。需要注意的是:
eventlet 提供的函式只能對 Python 程式碼中的 socket 呼叫進行處理,而不能對模組的 C 語言部分的 socket 呼叫進行修改。對後者這類模組,仍然需要把呼叫模組的程式碼封裝在 Python 標準執行緒呼叫中,之後利用 eventlet 提供的介面卡實現 eventlet 與標準執行緒之間的協作。
雖然 eventlet 把 api 封裝成了非常類似標準執行緒庫的形式,但兩者的實際併發執行流程仍然有明顯區別。在沒有出現 I/O 阻塞時,除非顯式宣告,否則當前正在執行的 eventlet 永遠不會把 cpu 交給其他的 eventlet,而標準執行緒則是無論是否出現阻塞,總是由所有執行緒一起爭奪執行資源。所有 eventlet 對 I/O 阻塞無關的大運算量耗時操作基本沒有什麼幫助。

總結

協程的好處:

無需執行緒上下文切換的開銷
無需原子操作鎖定及同步的開銷
方便切換控制流,簡化程式設計模型
高併發+高擴充套件性+低成本:一個CPU支援上萬的協程都不是問題。所以很適合用於高併發處理。
缺點:

無法利用多核資源:協程的本質是個單執行緒,它不能同時將 單個CPU 的多個核用上,協程需要和程序配合才能執行在多CPU上.當然我們日常所編寫的絕大部分應用都沒有這個必要,除非是cpu密集型應用。
進行阻塞(Blocking)操作(如IO時)會阻塞掉整個程式

回到頂部

五 IO模型

同步(synchronous) IO和非同步(asynchronous) IO,阻塞(blocking) IO和非阻塞(non-blocking)IO分別是什麼,到底有什麼區別?這個問題其實不同的人給出的答案都可能不同,比如wiki,就認為asynchronous IO和non-blocking IO是一個東西。這其實是因為不同的人的知識背景不同,並且在討論這個問題的時候上下文(context)也不相同。所以,為了更好的回答這個問題,先限定一下本文的上下文。
本文討論的背景是Linux環境下的network IO。

Stevens在文章中一共比較了五種IO Model:

      • blocking IO
      • nonblocking IO
      • IO multiplexing
      • signal driven IO
      • asynchronous IO

由於signal driven IO在實際中並不常用,所以我這隻提及剩下的四種IO Model。
再說一下IO發生時涉及的物件和步驟。
對於一個network IO (這裡我們以read舉例),它會涉及到兩個系統物件,一個是呼叫這個IO的process (or thread),另一個就是系統核心(kernel)。當一個read操作發生時,它會經歷兩個階段:

  • 等待資料準備 (Waiting for the data to be ready)
  • 將資料從核心拷貝到程序中 (Copying the data from the kernel to the process)

記住這兩點很重要,因為這些IO Model的區別就是在兩個階段上各有不同的情況。

5.1 blocking IO (阻塞IO)

在linux中,預設情況下所有的socket都是blocking,一個典型的讀操作流程大概是這樣:

當用戶程序呼叫了recvfrom這個系統呼叫,kernel就開始了IO的第一個階段:準備資料。對於network io來說,很多時候資料在一開始還沒有到達(比如,還沒有收到一個完整的UDP包),這個時候kernel就要等待足夠的資料到來。而在使用者程序這邊,整個程序會被阻塞。當kernel一直等到資料準備好了,它就會將資料從kernel中拷貝到使用者記憶體,然後kernel返回結果,使用者程序才解除block的狀態,重新執行起來。
所以,blocking IO的特點就是在IO執行的兩個階段都被block了。

5.2 non-blocking IO(非阻塞IO)

linux下,可以通過設定socket使其變為non-blocking。當對一個non-blocking socket執行讀操作時,流程是這個樣子:

從圖中可以看出,當用戶程序發出read操作時,如果kernel中的資料還沒有準備好,那麼它並不會block使用者程序,而是立刻返回一個error。從使用者程序角度講 ,它發起一個read操作後,並不需要等待,而是馬上就得到了一個結果。使用者程序判斷結果是一個error時,它就知道資料還沒有準備好,於是它可以再次傳送read操作。一旦kernel中的資料準備好了,並且又再次收到了使用者程序的system call,那麼它馬上就將資料拷貝到了使用者記憶體,然後返回。所以,使用者程序其實是需要不斷的主動詢問kernel資料好了沒有。

注意:

在網路IO時候,非阻塞IO也會進行recvform系統呼叫,檢查資料是否準備好,與阻塞IO不一樣,”非阻塞將大的整片時間的阻塞分成N多的小的阻塞, 所以程序不斷地有機會 ‘被’ CPU光顧”。即每次recvform系統呼叫之間,cpu的許可權還在程序手中,這段時間是可以做其他事情的,

也就是說非阻塞的recvform系統呼叫呼叫之後,程序並沒有被阻塞,核心馬上返回給程序,如果資料還沒準備好,此時會返回一個error。程序在返回之後,可以乾點別的事情,然後再發起recvform系統呼叫。重複上面的過程,迴圈往復的進行recvform系統呼叫。這個過程通常被稱之為輪詢。輪詢檢查核心資料,直到資料準備好,再拷貝資料到程序,進行資料處理。需要注意,拷貝資料整個過程,程序仍然是屬於阻塞的狀態。

View Code

優點:能夠在等待任務完成的時間裡幹其他活了(包括提交其他任務,也就是 “後臺” 可以有多個任務在同時執行)。

缺點:任務完成的響應延遲增大了,因為每過一段時間才去輪詢一次read操作,而任務可能在兩次輪詢之間的任意時間完成。這會導致整體資料吞吐量的降低。

5.3 IO multiplexing(IO多路複用)

IO multiplexing這個詞可能有點陌生,但是如果我說select,epoll,大概就都能明白了。有些地方也稱這種IO方式為event driven IO。我們都知道,select/epoll的好處就在於單個process就可以同時處理多個網路連線的IO。它的基本原理就是select/epoll這個function會不斷的輪詢所負責的所有socket,當某個socket有資料到達了,就通知使用者程序。它的流程如圖:

當用戶程序呼叫了select,那麼整個程序會被block,而同時,kernel會“監視”所有select負責的socket,當任何一個socket中的資料準備好了,select就會返回。這個時候使用者程序再呼叫read操作,將資料從kernel拷貝到使用者程序。
這個圖和blocking IO的圖其實並沒有太大的不同,事實上,還更差一些。因為這裡需要使用兩個system call (select 和 recvfrom),而blocking IO只調用了一個system call (recvfrom)。但是,用select的優勢在於它可以同時處理多個connection。(多說一句。所以,如果處理的連線數不是很高的話,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server效能更好,可能延遲還更大。select/epoll的優勢並不是對於單個連線能處理得更快,而是在於能處理更多的連線。)
在IO multiplexing Model中,實際中,對於每一個socket,一般都設定成為non-blocking,但是,如上圖所示,整個使用者的process其實是一直被block的。只不過process是被select這個函式block,而不是被socket IO給block。

結論: select的優勢在於可以處理多個連線,不適用於單個連線

View Code

思考1:select監聽fd變化的過程

使用者程序建立socket物件,拷貝監聽的fd到核心空間,每一個fd會對應一張系統檔案表,核心空間的fd響應到資料後,就會發送訊號給使用者程序資料已到;使用者程序再發送系統呼叫,比如(accept)將核心空間的資料copy到使用者空間,同時作為接受資料端核心空間的資料清除,這樣重新監聽時fd再有新的資料又可以響應到了(傳送端因為基於TCP協議所以需要收到應答後才會清除)。

思考2: 上面的示例中,開啟三個客戶端,分別連續向server端傳送一個內容(中間server端不迴應),結果會怎樣,為什麼?

5.4 Asynchronous I/O(非同步IO)

linux下的asynchronous IO其實用得很少。先看一下它的流程:

使用者程序發起read操作之後,立刻就可以開始去做其它的事。而另一方面,從kernel的角度,當它受到一個asynchronous read之後,首先它會立刻返回,所以不會對使用者程序產生任何block。然後,kernel會等待資料準備完成,然後將資料拷貝到使用者記憶體,當這一切都完成之後,kernel會給使用者程序傳送一個signal,告訴它read操作完成了。

5.5 IO模型比較分析

到目前為止,已經將四個IO Model都介紹完了。現在回過頭來回答最初的那幾個問題:blocking和non-blocking的區別在哪,synchronous IO和asynchronous IO的區別在哪。
先回答最簡單的這個:blocking vs non-blocking。前面的介紹中其實已經很明確的說明了這兩者的區別。呼叫blocking IO會一直block住對應的程序直到操作完成,而non-blocking IO在kernel還準備資料的情況下會立刻返回。

在說明synchronous IO和asynchronous IO的區別之前,需要先給出兩者的定義。Stevens給出的定義(其實是POSIX的定義)是這樣子的:
A synchronous I/O operation causes the requesting process to be blocked until thatI/O operationcompletes;
An asynchronous I/O operation does not cause the requesting process to be blocked;
兩者的區別就在於synchronous IO做”IO operation”的時候會將process阻塞。按照這個定義,之前所述的blocking IO,non-blocking IO,IO multiplexing都屬於synchronous IO。有人可能會說,non-blocking IO並沒有被block啊。這裡有個非常“狡猾”的地方,定義中所指的”IO operation”是指真實的IO操作,就是例子中的recvfrom這個system call。non-blocking IO在執行recvfrom這個system call的時候,如果kernel的資料沒有準備好,這時候不會block程序。但是,當kernel中資料準備好的時候,recvfrom會將資料從kernel拷貝到使用者記憶體中,這個時候程序是被block了,在這段時間內,程序是被block的。而asynchronous IO則不一樣,當程序發起IO 操作之後,就直接返回再也不理睬了,直到kernel傳送一個訊號,告訴程序說IO完成。在這整個過程中,程序完全沒有被block。

各個IO Model的比較如圖所示:

經過上面的介紹,會發現non-blocking IO和asynchronous IO的區別還是很明顯的。在non-blocking IO中,雖然程序大部分時間都不會被block,但是它仍然要求程序去主動的check,並且當資料準備完成以後,也需要程序主動的再次呼叫recvfrom來將資料拷貝到使用者記憶體。而asynchronous IO則完全不同。它就像是使用者程序將整個IO操作交給了他人(kernel)完成,然後他人做完後發訊號通知。在此期間,使用者程序不需要去檢查IO操作的狀態,也不需要主動的去拷貝資料。

5.6 selectors模組

import selectors
import socket

sel = selectors.DefaultSelector()

def accept(sock, mask):
    conn, addr = sock.accept()  # Should be ready
    print('accepted', conn, 'from', addr)
    conn.setblocking(False)
    sel.register(conn, selectors.EVENT_READ, read)

def read(conn, mask):
    data = conn.recv(1000)  # Should be ready
    if data:
        print('echoing', repr(data), 'to', conn)
        conn.send(data)  # Hope it won't block
    else:
        print('closing', conn)
        sel.unregister(conn)
        conn.close()

sock = socket.socket()
sock.bind(('localhost', 1234))
sock.listen(100)
sock.setblocking(False)
sel.register(sock, selectors.EVENT_READ, accept)

while True:
    events = sel.select()
    for key, mask in events:
        callback = key.data
        callback(key.fileobj, mask)