1. 程式人生 > >python程序池專題總結 原始碼

python程序池專題總結 原始碼

  python中兩個常用來處理程序的模組分別是subprocess和multiprocessing,其中subprocess通常用於執行外部程式,比如一些第三方應用程式,而不是Python程式。如果需要實現呼叫外部程式的功能,python的psutil模組是更好的選擇,它不僅支援subprocess提供的功能,而且還能對當前主機或者啟動的外部程式進行監控,比如獲取網路、cpu、記憶體等資訊使用情況,在做一些自動化運維工作時支援的更加全面。multiprocessing是python的多程序模組,主要通過啟動python程序,呼叫target回撥函式來處理任務,與之對應的是python的多執行緒模組threading,它們擁有類似的介面,通過定義multiprocessing.Process、threading.Thread,指定target方法,呼叫start()執行程序或者執行緒。

  在python中由於全域性解釋鎖(GIL)的存在,使用多執行緒,並不能大大提高程式的執行效率【1】。因此,用python處理併發問題時,儘量使用多程序而非多執行緒。併發程式設計中,最簡單的模式是,主程序等待任務,當有新任務到來時,啟動一個新的程序來處理當前任務。這種每個任務一個程序的處理方式,每處理一個任務都會伴隨著一個程序的建立、執行、銷燬,如果程序的執行時間越短,建立和銷燬的時間所佔的比重就越大,顯然,我們應該儘量避免建立和銷燬程序本身的額外開銷,提高程序的執行效率。我們可以用程序池來減少程序的建立和開銷,提高程序物件的複用。

  實際上,python中已經實現了一個功能強大的程序池(multiprocessing.Pool),這裡我們來簡單剖析下python自帶的程序池是如何實現的。

  要建立程序池物件,需要呼叫Pool函式,函式的宣告如下:

Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None)
    Returns a process pool object
processes表示工作程序的個數,預設為None,表示worker程序數為cpu_count()
initializer表示工作程序start時呼叫的初始化函式,initargs表示initializer函式的引數,如果initializer不為None,在每個工作程序start之前會呼叫initializer(*initargs)
maxtaskperchild表示每個工作程序在退出/被其他新的程序替代前,需要完成的工作任務數,預設為None
,表示工作程序存活時間與pool相同,即不會自動退出/被替換。 函式返回一個程序池(Pool)物件

  Pool函式返回的程序池物件中有下面一些資料結構:

self._inqueue  接收任務佇列(SimpleQueue),用於主程序將任務傳送給worker程序
self._outqueue  傳送結果佇列(SimpleQueue),用於worker程序將結果傳送給主程序
self._taskqueue  同步的任務佇列,儲存執行緒池分配給主程序的任務
self._cache = {}  任務快取
self._processes  worker程序個數
self._pool = []  woker程序佇列

  程序池工作時,任務的接收、分配。結果的返回,均由程序池內部的各個執行緒合作完成,來看看程序池內部由那些執行緒:

  • _work_handler執行緒,負責保證程序池中的worker程序在有退出的情況下,創建出新的worker程序,並新增到程序佇列(pools)中,保持程序池中的worker程序數始終為processes個。_worker_handler執行緒回撥函式為Pool._handler_workers方法,在程序池state==RUN時,迴圈呼叫_maintain_pool方法,監控是否有程序退出,並建立新的程序,append到程序池pools中,保持程序池中的worker程序數始終為processes個。
    self._worker_handler = threading.Thread(
                target=Pool._handle_workers,
                args=(self, )
    )
    
    Pool._handle_workers方法在_worker_handler執行緒狀態為執行時(status==RUN),迴圈呼叫_maintain_pool方法:
    def _maintain_pool(self):
        if self._join_exited_workers():
            self._repopulate_pool()
    
    _join_exited_workers()監控pools佇列中的程序是否有結束的,有則等待其結束,並從pools中刪除,當有程序結束時,呼叫_repopulate_pool(),建立新的程序:
    w = self.Process(target=worker,
                    args=(self._inqueue, self._outqueue,
                          self._initializer, self._initargs,                 
                           self._maxtasksperchild)
                     )
    self._pool.append(w)
    
    w是新建立的程序,它是用來處理實際任務的程序,worker是它的回撥函式:
    def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None):
        assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0)
        put = outqueue.put
        get = inqueue.get
        if hasattr(inqueue, '_writer'):
            inqueue._writer.close()
            outqueue._reader.close()
    
        if initializer is not None:
            initializer(*initargs)
    
        completed = 0
        while maxtasks is None or (maxtasks and completed < maxtasks):
            try:
                task = get()
            except (EOFError, IOError):
                debug('worker got EOFError or IOError -- exiting')
                break
    
            if task is None:
                debug('worker got sentinel -- exiting')
                break
    
            job, i, func, args, kwds = task
            try:
                result = (True, func(*args, **kwds))
            except Exception, e:
                result = (False, e)
            try:
                put((job, i, result))
            except Exception as e:
                wrapped = MaybeEncodingError(e, result[1])
                debug("Possible encoding error while sending result: %s" % (
                    wrapped))
                put((job, i, (False, wrapped)))
            completed += 1
        debug('worker exiting after %d tasks' % completed)
    
    所有worker程序都使用worker回撥函式對任務進行統一的處理,從原始碼中可以看出:
    它的功能是從接入任務佇列中(inqueue)讀取出task任務,然後根據任務的函式、引數進行呼叫(result = (True, func(*args, **kwds),
    再將結果放入結果佇列中(outqueue),如果有最大處理上限的限制maxtasks,那麼當程序處理到任務數上限時退出。
  • _task_handler執行緒,負責從程序池中的task_queue中,將任務取出,放入接收任務佇列(Pipe)
    self._task_handler = threading.Thread(
                target=Pool._handle_tasks,
                args=(self._taskqueue, self._quick_put, self._outqueue, self._pool)
    )
    Pool._handle_tasks方法不斷從task_queue中獲取任務,並放入接受任務佇列(in_queue),以此觸發worker程序進行任務處理。當從task_queue讀取到None元素時,
    表示程序池將要被終止(terminate),不再處理之後的任務請求,同時向接受任務佇列和結果任務佇列put None元素,通知其他執行緒結束。
  • _handle_results執行緒,負責將處理完的任務結果,從outqueue(Pipe)中讀取出來,放在任務快取cache中,
    self._result_handler = threading.Thread(
            target=Pool._handle_results,
            args=(self._outqueue, self._quick_get, self._cache)
    )
  • _terminate,這裡的_terminate並不是一個執行緒,而是一個Finalize物件
    self._terminate = Finalize(
                self, self._terminate_pool,
                args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,
                      self._worker_handler, self._task_handler,
                      self._result_handler, self._cache),
                exitpriority=15
    )
    Finalize類的建構函式與執行緒建構函式類似,_terminate_pool是它的回撥函式,args回撥函式的引數。
    _terminate_pool函式負責終止程序池的工作:終止上述的三個執行緒,終止程序池中的worker程序,清除佇列中的資料。
    _terminate是個物件而非執行緒,那麼它如何像執行緒呼叫start()方法一樣,來執行回撥函式_terminate_pool呢?檢視Pool原始碼,發現程序池的終止函式:
    def terminate(self):
        debug('terminating pool')
        self._state = TERMINATE
        self._worker_handler._state = TERMINATE
        self._terminate()
    函式中最後將_terminate物件當做一個方法來執行,而_terminate本身是一個Finalize物件,我們看一下Finalize類的定義,發現它實現了__call__方法:
    def __call__(self, wr=None):
        try:
            del _finalizer_registry[self._key]
        except KeyError:
            sub_debug('finalizer no longer registered')
        else:
            if self._pid != os.getpid():
                res = None
            else:
                res = self._callback(*self._args, **self._kwargs)
            self._weakref = self._callback = self._args = \
                            self._kwargs = self._key = None
            return res
    而方法中 self._callback(*self._args, **self._kwargs) 這條語句,就執行了_terminate_pool函式,進而將程序池終止。

  程序池中的資料結構、各個執行緒之間的合作關係如下圖所示:

  【1】這裡針對的是CPU密集型程式,多執行緒並不能帶來效率上的提升,相反還可能會因為執行緒的頻繁切換,導致效率下降;如果是IO密集型,多執行緒程序可以利用IO阻塞等待時的空閒時間執行其他執行緒,提升效率。

下面我們看下客戶端如何對向程序池分配任務,並獲取結果的。

  我們知道,當程序池中任務佇列非空時,才會觸發worker程序去工作,那麼如何向程序池中的任務佇列中新增任務呢,程序池類有兩組關鍵方法來建立任務,分別是apply/apply_async和map/map_async,實際上程序池類的apply和map方法與python內建的兩個同名方法類似,apply_async和map_async分別為它們的非阻塞版本。

  首先來看apply_async方法,原始碼如下:

def apply_async(self, func, args=(), kwds={}, callback=None):
    assert self._state == RUN
    result = ApplyResult(self._cache, callback)
    self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
    return result
func表示執行此任務的方法
args、kwds分別表func的位置引數和關鍵字引數
callback表示一個單引數的方法,當有結果返回時,callback方法會被呼叫,引數即為任務執行後的結果

每呼叫一次apply_result方法,實際上就向_taskqueue中添加了一條任務,注意這裡採用了非阻塞(非同步)的呼叫方式,即apply_async方法中新建的任務只是被新增到任務佇列中,還並未執行,不需要等待,直接返回建立的ApplyResult物件,注意在建立ApplyResult物件時,將它放入程序池的快取_cache中。

  任務佇列中有了新建立的任務,那麼根據上節分析的處理流程,程序池的_task_handler執行緒,將任務從taskqueue中獲取出來,放入_inqueue中,觸發worker程序根據args和kwds呼叫func,執行結束後,將結果放入_outqueue,再由程序池中的_handle_results執行緒,將執行結果從_outqueue中取出,並找到_cache快取中的ApplyResult物件,_set其執行結果,等待呼叫端獲取。

  apply_async方法既然是非同步的,那麼它如何知道任務結束,並獲取結果呢?這裡需要了解ApplyResult類中的兩個主要方法:

def get(self, timeout=None):
    self.wait(timeout)
    if not self._ready:
        raise TimeoutError
    if self._success:
        return self._value
    else:
        raise self._value

def _set(self, i, obj):
    self._success, self._value = obj
    if self._callback and self._success:
        self._callback(self._value)
    self._cond.acquire()
    try:
        self._ready = True
        self._cond.notify()
    finally:
        self._cond.release()
    del self._cache[self._job]

從這兩個方法名可以看出,get方法是提供給客戶端獲取worker程序執行結果的,而執行的結果是通過_handle_result執行緒呼叫_set方法,存放在ApplyResult物件中。
_set方法將執行結果儲存在ApplyResult._value中,喚醒阻塞在條件變數上的get方法。客戶端通過呼叫get方法,返回執行結果

apply方法是以阻塞的方式執行獲取程序結果,它的實現很簡單,同樣是呼叫apply_async,只不過不返回ApplyResult,而是直接返回worker程序執行的結果:

def apply(self, func, args=(), kwds={}):
        assert self._state == RUN
        return self.apply_async(func, args, kwds).get()

 以上的apply/apply_async方法,每次只能向程序池分配一個任務,那如果想一次分配多個任務到程序池中,可以使用map/map_async方法。首先來看下map_async方法是如何定義的:

def map_async(self, func, iterable, chunksize=None, callback=None):
    assert self._state == RUN
    if not hasattr(iterable, '__len__'):
        iterable = list(iterable)

    if chunksize is None:
        chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
        if extra:
            chunksize += 1
        if len(iterable) == 0:
            chunksize = 0

    task_batches = Pool._get_tasks(func, iterable, chunksize)
    result = MapResult(self._cache, chunksize, len(iterable), callback)
    self._taskqueue.put((((result._job, i, mapstar, (x,), {})
                              for i, x in enumerate(task_batches)), None))
    return result
func表示執行此任務的方法
iterable表示任務引數序列
chunksize表示將iterable序列按每組chunksize的大小進行分割,每個分割後的序列提交給程序池中的一個任務進行處理
callback表示一個單引數的方法,當有結果返回時,callback方法會被呼叫,引數即為任務執行後的結果

 從原始碼可以看出,map_async要比apply_async複雜,首先它會根據chunksize對任務引數序列進行分組,chunksize表示每組中的任務個數,當預設chunksize=None時,根據任務引數序列和程序池中程序數計算分組數:chunk, extra = divmod(len(iterable), len(self._pool) * 4)。假設程序池中程序數為len(self._pool)=4,任務引數序列iterable=range(123),那麼chunk=7, extra=11,向下執行,得出chunksize=8,表示將任務引數序列分為8組。任務實際分組:

task_batches = Pool._get_tasks(func, iterable, chunksize)
def _get_tasks(func, it, size):
    it = iter(it)
    while 1:
        x = tuple(itertools.islice(it, size))
        if not x:
            return
        yield (func, x)

這裡使用yield將_get_tasks方法編譯成生成器。實際上對於range(123)這樣的序列,按照chunksize=8進行分組後,一共16組每組的元素如下:
(func, (0,   1,   2,   3,   4,   5,   6,   7))
(func, (8,   9,   10,  11,  12,  13,  14,  15))
(func, (16,  17,  18,  19,  20,  21,  22,  23))
...
(func, (112, 113, 114, 115, 116, 117, 118, 119))
(func, (120, 121, 122))

分組之後,這裡定義了一個MapResult物件:result = MapResult(self._cache, chunksize, len(iterable), callback)它繼承自AppyResult類,同樣提供get和_set方法介面。將分組後的任務放入任務佇列中,然後就返回剛剛建立的result物件。

self._taskqueue.put((((result._job, i, mapstar, (x,), {})
                              for i, x in enumerate(task_batches)), None))
以任務引數序列=range(123)為例,實際上這裡向任務佇列中put了一個16組元組元素的集合,元組依次為:
(result._job, 0, mapstar, ((func, (0,   1,   2,   3,   4,   5,   6,   7)),), {}, None)
(result._job, 1, mapstar, ((func, (8,   9,   10,  11,  12,  13,  14,  15)),), {}, None)
……
(result._job, 15, mapstar, ((func, (120, 121, 122)),), {}, None)
注意每一個元組中的 i,它表示當前元組在整個任務元組集合中的位置,通過它,_handle_result執行緒才能將worker程序執行的結果,以正確的順序填入到MapResult物件中。

注意這裡只調用了一次put方法,將16組元組作為一個整體序列放入任務佇列,那麼這個任務是否_task_handler執行緒是否也會像apply_async方法一樣,將整個任務序列傳遞給_inqueue,這樣就會導致程序池中的只有一個worker程序獲取到任務序列,而並非起到多程序的處理方式。我們來看下_task_handler執行緒是怎樣處理的:

def _handle_tasks(taskqueue, put, outqueue, pool, cache):
    thread = threading.current_thread()

    for taskseq, set_length in iter(taskqueue.get, None):
        i = -1
        for i, task in enumerate(taskseq):
            if thread._state:
                debug('task handler found thread._state != RUN')
                break
            try:
                put(task)
            except Exception as e:
                job, ind = task[:2]
                try:
                    cache[job]._set(ind, (False, e))
                except KeyError:
                    pass
        else:
            if set_length:
                debug('doing set_length()')
                set_length(i+1)
            continue
        break
    else:
        debug('task handler got sentinel')

  注意到語句 for i, task in enumerate(taskseq),原來_task_handler執行緒在通過taskqueue獲取到任務序列後,並不是直接放入_inqueue中的,而是將序列中任務按照之前分好的組,依次放入_inqueue中的,而迴圈中的task即上述的每個任務元組:(result._job, 0, mapstar, ((func, (0,   1,   2,   3,   4,   5,   6,   7)),), {}, None)。接著觸發worker程序。worker程序獲取出每組任務,進行任務的處理:

job, i, func, args, kwds = task 
try:   
    result = (True, func(*args, **kwds))
except Exception, e:
    result = (False, e)
try:
    put((job, i, result))
except Exception as e:
    wrapped = MaybeEncodingError(e, result[1])
    debug("Possible encoding error while sending result: %s" % (
        wrapped))
    put((job, i, (False, wrapped)))
根據之前放入_inqueue的順序對應關係:
(result._job, 0, mapstar, ((func, (0,   1,   2,   3,   4,   5,   6,   7)),), {}, None)
job, i, func, args, kwds = task
可以看出,元組中 mapstar 表示這裡的回撥函式func,((func, (0, 1, 2, 3, 4, 5, 6, 7)),)和{}分別表示args和kwds引數。
執行result = (True, func(*args, **kwds))
再來看下mapstar是如何定義的:
def mapstar(args): 
return map(*args)
這裡mapstar表示回撥函式func,它的定義只有一個引數,而在worker程序執行回撥時,使用的是func(*args, **kwds)語句,這裡多一個引數能夠正確執行嗎?答案時肯定的,在呼叫mapstar時,如果kwds為空字典,那麼傳入第二個引數不會影響函式的呼叫,而一個無參函式func_with_none_params,在呼叫時使用func_with_none_params(*(), **{})也是沒有問題的,python會自動忽視傳入的兩個空引數。
看到這裡,我們明白了,實際上對任務引數分組後,每一組的任務是通過內建的map方法來進行呼叫的。
執行之後呼叫put(job, i, result)將結果放入_outqueue中,_handle_result執行緒會從_outqueue中將結果取出,並找到_cache快取中的MapResult物件,_set其執行結果

現在來我們來總結下,程序池的map_async方法是如何執行的,我們將range(123)這個任務序列,將它傳入map_async方法,假設不指定chunksize,並且cpu為四核,那麼方法內部會分為16個組(0~14組每組8個元素,最後一組3個元素)。將分組後的任務放入任務佇列,一共16組,那麼每個程序需要執行4次來處理,每次通過內建的map方法,順序將組中8個任務執行,再將結果放入_outqueue,找到_cache快取中的MapResult物件,_set其執行結果,等待客戶端獲取。使用map_async方法會呼叫多個worker程序處理任務,每個worler程序執行結束,會將結果傳入_outqueue,再有_handle_result執行緒將結果寫入MapResult物件,那如何保證結果序列的順序與呼叫map_async時傳入的任務引數序列一致呢,我們來看看MapResult的建構函式和_set方法的實現。

def __init__(self, cache, chunksize, length, callback):
    ApplyResult.__init__(self, cache, callback)
    self._success = True
    self._value = [None] * length
    self._chunksize = chunksize
    if chunksize <= 0:
        self._number_left = 0
        self._ready = True
        del cache[self._job]
    else:
        self._number_left = length//chunksize + bool(length % chunksize)

def _set(self, i, success_result):
    success, result = success_result
    if success:
        self._value[i*self._chunksize:(i+1)*self._chunksize] = result
        self._number_left -= 1
        if self._number_left == 0:
            if self._callback:
                self._callback(self._value)
            del self._cache[self._job]
            self._cond.acquire()
            try:
                self._ready = True
                self._cond.notify()
            finally:
                self._cond.release()

    else:
        self._success = False
        self._value = result
        del self._cache[self._job]
        self._cond.acquire()
        try:
            self._ready = True
            self._cond.notify()
        finally:
            self._cond.release()

   MapResult類中,_value儲存map_async的執行結果,初始化時為一個元素為None的list,list的長度與任務引數序列的長度相同,_chunksize表示將任務分組後,每組有多少個任務,_number_left表示整個任務序列被分為多少個組。_handle_result執行緒會通過_set方法將worker程序的執行結果儲存到_value中,那麼如何將worker程序執行的結果填入到_value中正確的位置呢,還記得在map_async在向task_queue填入任務時,每組中的 i嗎,i表示的就是當前任務組的組號,_set方法會根據當前任務的組號即引數 i,並且遞減_number_left,當_number_left遞減為0時,表示任務引數序列中的所有任務都已被woker程序處理,_value全部被計算出,喚醒阻塞在get方法上的條件變數,是客戶端可以獲取執行結果。

  map函式為map_async的阻塞版本,它在map_async的基礎上,呼叫get方法,直接阻塞到結果全部返回:

def map(self, func, iterable, chunksize=None):
    assert self._state == RUN
    return self.map_async(func, iterable, chunksize).get()

本節主要分析了兩組向程序池分配任務的介面:apply/apply_async和map/map_async。apply方法每次處理一個任務,不同任務的執行方法(回撥函式)、引數可以不同,而map方法每次可以處理一個任務序列,每個任務的執行方法相同。

       我們知道,程序池內部由多個執行緒互相協作,向客戶端提供可靠的服務,那麼這些執行緒之間是怎樣做到資料共享與同步的呢?在客戶端使用apply/map函式向程序池分配任務時,使用self._taskqueue來存放任務元素,_taskqueue定義為Queue.Queue(),這是一個python標準庫中的執行緒安全的同步佇列,它保證通知時刻只有一個執行緒向佇列新增或從佇列獲取元素。這樣,主執行緒向程序池中分配任務(taskqueue.put),程序池中_handle_tasks執行緒讀取_taskqueue佇列中的元素,兩個執行緒同時操作taskqueue,互不影響。程序池中有N個worker程序在等待任務下發,那麼程序池中的_handle_tasks執行緒讀取出任務後,又如何保證一個任務不被多個worker程序獲取到呢?我們來看下_handle_tasks執行緒將任務讀取出來之後如何交給worker程序的:

for taskseq, set_length in iter(taskqueue.get, None):
    i = -1
    for i, task in enumerate(taskseq):
        if thread._state:
            debug('task handler found thread._state != RUN')
            break
        try:
            put(task)
        except Exception as e:
            job, ind = task[:2]
            try:
                cache[job]._set(ind, (False, e))
            except KeyError:
                pass
    else:
        if set_length:
            debug('doing set_length()')
            set_length(i+1)
        continue
    break
else:
    debug('task handler got sentinel')
#在從taskqueue中get到任務之後,對任務中的每個task,呼叫了put函式,這個put函式實際上是將task放入了管道,而主程序與worker程序的互動,正是通過管道來完成的。
#再來看看worker程序的定義:
w = self.Process(target=worker,
                 args=(self._inqueue, self._outqueue,
                         self._initializer,
                   self._initargs, self._maxtasksperchild)
            )

其中self._inqueue和self._outqueue為SimpleQueue()物件,實際是帶鎖的管道,上述_handle_task執行緒呼叫的put函式,即為SimpleQueue物件的方法。我們看到,這裡worker程序定義均相同,所以程序池中的worker程序共享self._inqueue和self._outqueue物件,那麼當一個task元素被put到共享的_inqueue管道中時,如何確保只有一個worker獲取到呢,答案同樣是加鎖,在SimpleQueue()類的定義中,put以及get方法都帶有鎖,進行同步,唯一不同的是,這裡的鎖是用於程序間同步的。這樣就保證了多個worker之間能夠確保任務的同步。與分配任務類似,在worker程序執行完之後,會將結果put會_outqueue,_outqueue同樣是SimpleQueue類物件,可以在多個程序之間進行互斥。

在worker程序執行結束之後,會將執行結果通過管道傳回,程序池中有_handle_result執行緒來負責接收result,取出之後,通過呼叫_set方法將結果寫回ApplyResult/MapResult物件,客戶端可以通過get方法取出結果,這裡通過使用條件變數進行同步,當_set函式執行之後,通過條件變數喚醒阻塞在get函式的主程序。

  程序池終止工作通過呼叫Pool.terminate()來實現,這裡的實現很巧妙,用了一個可呼叫物件,將終止Pool時的需要執行的回撥函式先註冊好,等到需要終止時,直接呼叫物件即可。

self._terminate = Finalize(
                self, self._terminate_pool,
                args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,
                   self._worker_handler, self._task_handler,
                   self._result_handler, self._cache),
                exitpriority=15
            )
Finalize類的實現了__call__方法,在執