1. 程式人生 > >生產者消費者模型-python-多程序

生產者消費者模型-python-多程序

最近用python寫了一個base64解碼程式,解碼的過程比較耗CPU,為了充分發揮多核優勢,引入多執行緒,又因為python有全域性鎖GIL,多執行緒仍然只能使用一個核,於是重新用多程序multiprocessing實現。多執行緒下,通過繼承threading.Thread類實現時,可以將執行緒共享資料(如消費品佇列)作為類靜態變數儲存;在多程序下,通過繼承multiprocessing.Process類實現時,這種方法共享消費品佇列失效,最後通過將佇列作為建構函式引數傳入正確實現。

import multiprocessing, Queue
from multiprocessing import Process
from time import sleep
from datetime import datetime


########################################################################
class MultiProcessProducer(multiprocessing.Process):
    """"""

    #----------------------------------------------------------------------
    def __init__(self,num, queue):
        """Constructor"""
        multiprocessing.Process.__init__(self)
        self.num = num
        self.queue = queue
        
    def run(self):
        t1 = datetime.now()
        print 'producer start', self.num, t1
        for i in range(1000000):
            self.queue.put((i, self.num))
            #print 'producer put', i, self.num
        t2 = datetime.now()
        print 'producer exit', self.num, t2
        print 'producer', self.num, t2-t1
        
        
########################################################################
class MultiProcessConsumer(multiprocessing.Process):
    """"""

    #----------------------------------------------------------------------
    def __init__(self,num, queue):
        """Constructor"""
        multiprocessing.Process.__init__(self)
        self.num = num
        self.queue = queue
        
    def run(self):
        t1 = datetime.now()
        print 'consumer start', self.num, t1
        while True:
            d = self.queue.get()
            if d != None:
                #print 'consumer get', d, self.num
                continue
            else:
                break
        t2 = datetime.now()
        print 'consumer exit', self.num, t2
        print 'consumer', self.num, t2-t1
    
def main():
    #create queue
    queue = multiprocessing.Queue()
    
    #create processes    
    producer = []
    for i in range(1):
        producer.append(MultiProcessProducer(i, queue))
        
    consumer = []
    for i in range(2):
        consumer.append(MultiProcessConsumer(i, queue))

    #start processes
    for i in range(len(producer)):
        producer[i].start()
        
    for i in range(len(consumer)):
        consumer[i].start()
        
    #wait for processs to exit
    for i in range(len(producer)):
        producer[i].join()
        
    for i in range(len(consumer)):
        queue.put(None)
           
    for i in range(len(consumer)):
        consumer[i].join()
        
    print 'finish'

 
if __name__ == "__main__":
    main()