1. 程式人生 > >Python 自定義執行緒池

Python 自定義執行緒池

"""
思路
1,將任務放在佇列
1)建立佇列:(初始化)
2)設定大小,執行緒池的最大容量
3)真實建立的執行緒 列表
4)空閒的執行緒數量

2,著手開始處理任務
1)建立執行緒
2)空閒執行緒數量大於0,則不再建立執行緒
3)建立執行緒池的數量 不能高於執行緒池的限制
4)根據任務個數判斷 建立執行緒的數量
2)執行緒去佇列中取任務
1)取任務包(任務包是一個元祖)
2)任務為空時,不再取(終止)
"""

import time
import threading
import queue

stopEvent = object() # 停止任務的標誌


class ThreadPool(object):
def __init__(self, max_thread):
# 建立任務佇列,可以放無限個任務
self.queue = queue.Queue()
# 指定最大執行緒數
self.max_thread = max_thread
# 停止標誌
self.terminal = False
# 建立真實執行緒數
self.generate_list = []
# 空閒執行緒數
self.free_thread = []

def run(self, action, args, callback=None):
"""
執行緒池執行一個任務
:param action:任務函式
:param args:任務引數
:param callback:執行完任務的回撥函式,成功或者失敗的返回值。
:return:
"""
# 執行緒池執行的條件:1)
if len(self.free_thread) == 0 and len(self.generate_list) < self.max_thread:
self.generate_thread()
task = (action, args, callback)
self.queue.put(task)

def callback(self):
"""
回撥函式:迴圈取獲取任務,並執行任務函式
:return:
"""
# 獲取當前執行緒
current_thread = threading.current_thread()
self.generate_list.append(current_thread)
# 取任務並執行
event = self.queue.get()
# 事件型別是任務
while event != stopEvent: # 重點是這個判斷 使任務終止
# 解開任務包 ,(任務是一個元祖)
# 執行任務
# 標記:執行任務前的狀態,執行任務後的狀態
action, args, callback = event
try:
ret = action(*args)
success = True
except Exception as x:
success = False
ret = x
if callback is not None:
try:
callback(success, ret)
except Exception as e:
print(e)
else:
pass
if not self.terminal:
self.free_thread.append(current_thread)
event = self.queue.get()
self.free_thread.remove(current_thread)
else:
# 停止進行取任務
event = stopEvent
else:
# 不是元祖,不是任務,則清空當前執行緒,不在去取任務
self.generate_list.remove(current_thread)

def generate_thread(self):
"""
建立一個執行緒
:return:
"""
t = threading.Thread(target=self.callback)
t.start()

# 終止取任務
def terminals(self):
"""
無論是否還有任務,終止執行緒
:return:
"""
self.terminal = True

def close(self):
"""
執行完所有的任務後,所有執行緒停止
:return:
"""
num = len(self.generate_list)
self.queue.empty()
while num:
self.queue.put(stopEvent)
num -= 1


def test(pi):
time.sleep(0.5)
print(pi)


pool = ThreadPool(10)

for i in range(100):
pool.run(action=test, args=(i,))

pool.terminals()
pool.close()