1. 程式人生 > 實用技巧 >python執行緒池ThreadPoolExecutor,實現目的:執行緒高可用、防止記憶體炸裂、執行緒可複用,降低因為不斷的新建執行緒和銷燬執行緒造成的浪費

python執行緒池ThreadPoolExecutor,實現目的:執行緒高可用、防止記憶體炸裂、執行緒可複用,降低因為不斷的新建執行緒和銷燬執行緒造成的浪費

應用場景:不斷消費一個容器裡面的資料,使用同一個執行緒池,實現高可用性並減少系統性能開銷;(這裡拿redis作為容器來做示範),執行緒池的使用請檢視https://www.cnblogs.com/hoojjack/p/10846010.html。

需求:程式開始前建立一個執行緒池,然後一直用這個執行緒池來執行程式,不銷燬這個執行緒池,儘量高效的使用這個執行緒池;實現高可用性和節約記憶體的作用;

執行緒池消費原理:將任務都放在一個任務池A裡面,然後等執行緒池裡面的執行緒空閒了就分發給空閒的執行緒,然後執行緒的任務執行完畢後繼續接任務;這就要控制一下接受任務的速度,如果無限迴圈的將任務寫入任務池A裡面,那麼就會導致系統記憶體爆炸,例如迭代版本二例項;

主體程式:

import redis, time
from concurrent.futures import ThreadPoolExecutor


class Redis_demo(object):

    def __init__(self):
        self.redis_pool = redis.ConnectionPool(host='127.0.0.1', port=6379, db=0, decode_responses=True)

    def consumer(self):
        redis_client = redis.Redis(connection_pool=self.redis_pool)
        data = redis_client.rpop("test_qly")
        return data

    def List_all(self):
        lists = []
        while True:
            data = self.consumer()
            lists.append(data)
            if len(lists) > 20:
                return lists

    def parse(self, data):
        time.sleep(5)
        return data + ","

    def Future(self, future):
        response = future.result()
        print(response)

    def main(self):
        while True:
            datas = self.List_all()
            obj_list = []
            threadPool = ThreadPoolExecutor(max_workers=4)
            for data in datas:
                future = threadPool.submit(self.parse, data)
                obj_list.append(future)
                future.add_done_callback(self.Future)

            threadPool.shutdown(wait=True)

  

迭代版本一:

if __name__ == '__main__':
    cla = Redis_demo()
    while True:
        datas = cla.List_all()
        obj_list = []
        threadPool = ThreadPoolExecutor(max_workers=4)
        for data in datas:
            future = threadPool.submit(cla.parse, data)
            obj_list.append(future)
            future.add_done_callback(cla.Future)
        threadPool.shutdown(wait=True)

  缺點:while迴圈每次執行的時候都要新建立一個執行緒池,把任務執行完畢之後再銷燬;下一次的迴圈再建立一個執行緒池,沒有實現執行緒的高可用性;

迭代版本二:

if __name__ == '__main__':
    cla = Redis_demo()
    threadPool = ThreadPoolExecutor(max_workers=4)
    while True:
        datas = cla.List_all()
        obj_list = []
        for data in datas:
            future = threadPool.submit(cla.parse, data)
            obj_list.append(future)
            future.add_done_callback(cla.Future)
        # threadPool.shutdown(wait=True)  # 在這個例子裡面必須要關閉這個,因為執行這個後,執行緒池就會銷燬掉,那麼下一次迴圈呼叫執行緒池的話就會報錯,因為執行緒池已經銷燬掉了

  缺點:這個其實也叫錯誤,在開頭已經說了,再詳細說一遍吧;while迴圈會一直把任務存進任務池A裡面供執行緒池來消費,但是存任務的速度遠遠比執行緒池裡面消費的速度快,這就導致任務池A裡面存放的任務一直在變多,然後系統記憶體就會爆炸;程式執行一會後系統記憶體就會佔用100%,然後該任務會被系統kill掉。

迭代版本三:

  
def __init__(self):
self.thread_time = 0
self.times = 0
self.redis_pool = redis.ConnectionPool(host='127.0.0.1', port=6379, db=0, decode_responses=True)
def Future(self, future):
response = future.result()
print(response)
status = future.done()
if status:
self.times += 1
# 重寫上面兩個方法

if __name__ == '__main__': cla = Redis_demo() threadPool = ThreadPoolExecutor(max_workers=4) while True: datas = cla.List_all() obj_list = [] for data in datas: future = threadPool.submit(cla.parse, data) obj_list.append(future) future.add_done_callback(cla.Future) # 防止執行緒池裡面堆積太多的資料引起記憶體爆炸 while True: if cla.times > cla.thread_time-1: cla.times = 0 break else: time.sleep(2)

  實現原理:用future.done()來判斷所有執行緒執行的任務數,用全域性變數times來累計,如果全域性變數times與任務數相同了,代表該任務池A裡面所有的任務都執行完了,執行緒池裡面的執行緒也空閒了,然後再把任務寫入任務池A中,如果全域性變數times小於任務數,則進入等待階段,知道所有的任務全部執行完畢;

  好處:實現了執行緒池部分高可用性(一個執行緒池從頭用到尾)、節約了記憶體開銷;

  缺點:該執行緒池中如果有一個執行緒反應慢了,那麼其他空閒執行緒就會等待這個響應慢的執行緒結束後才會重新拉取任務;嚴重一點,如果一個執行緒堵塞了,那麼就會卡住,因為全域性變數times總是小於任務數;還有就是必須要用到(future.add_done_callback(cla.Future)),如果沒有回撥函式就沒法統計執行完畢的任務數。

迭代版本四(終極版):

def parse(self, data):
  print(threading.current_thread().name) # 列印執行緒名字,監控是否有執行緒一直在堵塞狀態
time.sleep(5) return data + ","
# 重寫上面的方法,列印執行緒號


if __name__ == '__main__': cla = Redis_demo() threadPool = ThreadPoolExecutor(max_workers=4, thread_name_prefix="threadName") # 執行緒序號前面新增預設名稱,例如:執行緒名字是_1,新增後就成為threadName_1
    while True:
        datas = cla.List_all()
        obj_list = []
        for data in datas:
            future = threadPool.submit(cla.parse, data)
            obj_list.append(future)
            future.add_done_callback(cla.Future)
        # 檢測執行緒池是否空閒了,如果有空閒的就執行新任務,如果所有執行緒都在執行任務,那麼就進入等待階段
        while True:
            status = False
            for future in obj_list[-4:]:   # 這裡的4與執行緒池裡面的執行緒數保持一致,每次的任務數儘量是執行緒池數量的整數倍
                status = future.done()  # 檢視任務執行狀態,True表示該任務已經執行完畢
                print(status, "列印執行狀態")
                if status:
                    break
            if status:
                break
            else:
                time.sleep(1)
    # threadPool.shutdown(wait=True)  # 這行程式碼其實沒什麼實質性作用,因為執行緒池從程式執行開始就不會停,直到程式停止

  實現原理:將所有任務放到一個列表obj_list中,然後不斷的迴圈驗證左右的任務是否有執行完畢的,去最後4個任務檢視一下狀態,如果最後4個裡面也有執行完畢的,那麼就寫入新的任務到任務池A;

  好處:需求裡面要求的點基本上全部都實現了;

  缺點:嚴格來說,如果有堵塞的執行緒,那麼就會減少工作的執行緒,所以就要求對程式要有嚴格的測試,保證程式不會出現嚴重堵塞的問題;其次,最好還是列印一下執行緒名稱,這樣就能更好的監控是否有堵塞的執行緒;

  程式碼解決辦法:1、建一個元祖,將執行緒號存入元祖,然後每隔一段時間檢測一下元祖中元素的數量,如果小於執行緒數就發出報警資訊;2、建一個字典,將每個執行緒出現的次數計入字典中,如果執行緒間數量相差太多的話發出報警資訊。