python 多線程與隊列
阿新 • • 發佈:2019-01-23
pro 錯誤 rotation 變化 lam exce num 開始 dex 各位好,之前寫了多線程,但是在實際的生產中,往往情況比較復雜,要處理一批任務(比如要處理列表中所有元素),這時候不可能創建很多的線程,線程過多反而不好,還會造成資源開銷太大,這時候想到了隊列。
Queue隊列
Queue用於建立和操作隊列,常和threading類一起用來建立一個簡單的線程隊列。
- Queue.Queue(maxsize) FIFO(先進先出隊列)
- Queue.LifoQueue(maxsize) LIFO(先進後出隊列)
- Queue.PriorityQueue(maxsize) 為優先級越高的越先出來,對於一個隊列中的所有元素組成的entries,優先隊列優先返回的一個元素是sorted(list(entries))[0]。至於對於一般的數據,優先隊列取什麽東西作為優先度要素進行判斷,官方文檔給出的建議是一個tuple如(priority, data),取priority作為優先度。
FIFO是常用的隊列,常用的方法有:
- Queue.qsize() 返回隊列大小
- Queue.empty() 判斷隊列是否為空
-
Queue.full() 判斷隊列是否滿了
-
Queue.get([block[,timeout]]) 從隊列頭刪除並返回一個item,block默認為True,表示當隊列為空卻去get的時候會阻塞線程,等待直到有有item出現為止來get出這個item。如果是False的話表明當隊列為空你卻去get的時候,會引發異常。
在block為True的情況下可以再設置timeout參數。表示當隊列為空,get阻塞timeout指定的秒數之後還沒有get到的話就引發Full異常。 -
Queue.put(...[,block[,timeout]]) 向隊尾插入一個item,同樣若block=True的話隊列滿時就阻塞等待有空位出來再put,block=False時引發異常。
同get的timeout,put的timeout是在block為True的時候進行超時設置的參數。
Queue.task_done() 從場景上來說,處理完一個get出來的item之後,調用task_done將向隊列發出一個信號,表示本任務已經完成。 - Queue.join() 監視所有item並阻塞主線程,直到所有item都調用了task_done之後主線程才繼續向下執行。這麽做的好處在於,假如一個線程開始處理最後一個任務,它從任務隊列中拿走最後一個任務,此時任務隊列就空了但最後那個線程還沒處理完。當調用了join之後,主線程就不會因為隊列空了而擅自結束,而是等待最後那個線程處理完成了。
隊列-單線程
import threading
import queue
import time
class worker(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self.queue = queue
self.thread_stop = False
def run(self):
while not self.thread_stop:
print("thread%d %s: waiting for tast" % (self.ident, self.name))
try:
task = q.get(block=True, timeout=2) # 接收消息
except queue.Empty:
print("Nothing to do! I will go home!")
self.thread_stop = True
break
print("tasking: %s ,task No:%d" % (task[0], task[1]))
print("I am working")
time.sleep(3)
print("work finished!")
q.task_done() # 完成一個任務
res = q.qsize() # 判斷消息隊列大小(隊列中還有幾個任務)
if res > 0:
print("fuck! Still %d tasks to do" % (res))
def stop(self):
self.thread_stop = True
if __name__ == "__main__":
q = queue.Queue(3) # 創建隊列(大小為3)
worker = worker(q) # 將隊列加入類中
worker.start() # 啟動類
q.put(["produce cup!", 1], block=True, timeout=None) # 向隊列中添加元素,產生任務消息
q.put(["produce desk!", 2], block=True, timeout=None)
q.put(["produce apple!", 3], block=True, timeout=None)
q.put(["produce banana!", 4], block=True, timeout=None)
q.put(["produce bag!", 5], block=True, timeout=None)
print("***************leader:wait for finish!")
q.join() # 等待所有任務完成
print("***************leader:all task finished!")
輸出:
thread9212 Thread-1: waiting for tast
tasking: produce cup! ,task No:1
I am working
work finished!
fuck! Still 3 tasks to do
thread9212 Thread-1: waiting for tast
tasking: produce desk! ,task No:2
I am working
***************leader:wait for finish!
work finished!
fuck! Still 3 tasks to do
thread9212 Thread-1: waiting for tast
tasking: produce apple! ,task No:3
I am working
work finished!
fuck! Still 2 tasks to do
thread9212 Thread-1: waiting for tast
tasking: produce banana! ,task No:4
I am working
work finished!
fuck! Still 1 tasks to do
thread9212 Thread-1: waiting for tast
tasking: produce bag! ,task No:5
I am working
work finished!
thread9212 Thread-1: waiting for tast
***************leader:all task finished!
Nothing to do!i will go home!
隊列-多線程
import threading
import time
from queue import Queue
img_lists = [‘lipei_00006.mp3‘,‘lipei_00007.mp3‘,‘lipei_00012.mp3‘,‘lipei_00014.mp3‘,
‘lipei_00021.mp3‘,‘lipei_00027.mp3‘,‘lipei_00028.mp3‘,‘lipei_00035.mp3‘,
‘lipei_00039.mp3‘,‘lipei_00044.mp3‘,‘lipei_00047.mp3‘,‘lipei_00049.mp3‘,
‘lipei_00057.mp3‘,‘lipei_00058.mp3‘,‘lipei_00059.mp3‘,‘lipei_00061.mp3‘,
‘lipei_00066.mp3‘,‘lipei_00068.mp3‘,‘lipei_00070.mp3‘,‘lipei_00081.mp3‘,
‘lipei_00087.mp3‘,‘lipei_00104.mp3‘,‘lipei_00106.mp3‘,‘lipei_00117.mp3‘,
‘lipei_00123.mp3‘,‘lipei_00129.mp3‘,]
q = Queue(10)
class Music_Cols(threading.Thread):
def __init__(self, name):
super().__init__(name=name)
def run(self):
global img_lists
global q
while True:
try:
music = img_lists.pop(0)
q.put(music)
except IndexError:
break
class Music_Play(threading.Thread):
def __init__(self, name):
super().__init__(name=name)
def run(self):
global q
while True:
if q.not_empty:
music = q.get()
print(‘{}正在播放{}‘.format(threading.current_thread(), music))
time.sleep(5)
q.task_done()
print(‘{}播放結束‘.format(music))
else:
break
if __name__ == ‘__main__‘:
mc_thread = Music_Cols(‘music_cols‘)
mc_thread.setDaemon(True) # 設置為守護進程,主線程退出時,子進程也kill掉
mc_thread.start() # 啟動進程
for i in range(5): # 設置線程個數(批量任務時,線程數不必太大,註意內存及CPU負載)
mp_thread = Music_Play(‘music_play‘)
mp_thread.setDaemon(True)
mp_thread.start()
q.join() # 線程阻塞(等待所有子線程處理完成,再退出)
輸出:
<Music_Play(music_play, started daemon 1068)>正在播放lipei_00006.mp3
<Music_Play(music_play, started daemon 1072)>正在播放lipei_00007.mp3
<Music_Play(music_play, started daemon 4920)>正在播放lipei_00012.mp3
<Music_Play(music_play, started daemon 3880)>正在播放lipei_00014.mp3
<Music_Play(music_play, started daemon 5400)>正在播放lipei_00021.mp3
lipei_00014.mp3播放結束
... ...
<Music_Play(music_play, started daemon 1068)>正在播放lipei_00117.mp3
lipei_00066.mp3播放結束
<Music_Play(music_play, started daemon 1072)>正在播放lipei_00123.mp3
lipei_00104.mp3播放結束
<Music_Play(music_play, started daemon 4920)>正在播放lipei_00129.mp3
lipei_00123.mp3播放結束
lipei_00117.mp3播放結束
lipei_00087.mp3播放結束
lipei_00106.mp3播放結束
lipei_00129.mp3播放結束
或者(效果與上述一樣)
import threading
import time
from queue import Queue
img_lists = [‘lipei_00006.mp3‘,‘lipei_00007.mp3‘,‘lipei_00012.mp3‘,‘lipei_00014.mp3‘,
‘lipei_00021.mp3‘,‘lipei_00027.mp3‘,‘lipei_00028.mp3‘,‘lipei_00035.mp3‘,
‘lipei_00039.mp3‘,‘lipei_00044.mp3‘,‘lipei_00047.mp3‘,‘lipei_00049.mp3‘,
‘lipei_00057.mp3‘,‘lipei_00058.mp3‘,‘lipei_00059.mp3‘,‘lipei_00061.mp3‘,
‘lipei_00066.mp3‘,‘lipei_00068.mp3‘,‘lipei_00070.mp3‘,‘lipei_00081.mp3‘,
‘lipei_00087.mp3‘,‘lipei_00104.mp3‘,‘lipei_00106.mp3‘,‘lipei_00117.mp3‘,
‘lipei_00123.mp3‘,‘lipei_00129.mp3‘,]
q = Queue(10)
class Music_Cols(threading.Thread):
def __init__(self, name):
super().__init__(name=name)
def run(self):
while True:
try:
music = img_lists.pop(0)
q.put(music)
except IndexError:
break
class Music_Play(threading.Thread):
def __init__(self, name):
super().__init__(name=name)
def run(self):
while True:
if q.not_empty:
music = q.get()
print(‘{}正在播放{}‘.format(threading.current_thread(), music))
time.sleep(5)
q.task_done()
print(‘{}播放結束‘.format(music))
else:
break
if __name__ == ‘__main__‘:
mc_thread = Music_Cols(‘music_cols‘)
mc_thread.setDaemon(True) # 設置為守護進程,主線程退出時,子進程也kill掉
mc_thread.start() # 啟動進程
for i in range(5): # 設置線程個數(批量任務時,線程數不必太大,註意內存及CPU負載)
mp_thread = Music_Play(‘music_play‘)
mp_thread.setDaemon(True)
mp_thread.start()
q.join() # 線程阻塞(等待所有子線程處理完成,再退出)
隊列-多線程—圖像增強實例
"""
開啟多線程:圖像增強
"""
import os
import random
import queue
import numpy as np
import cv2
import time
import threading
def Affine_transformation(img_array):
rows, cols = img_array.shape[:2]
pointsA = np.float32([[30, 80], [180, 60], [80, 230]]) # 左偏
pointsB = np.float32([[60, 50], [220, 70], [20, 180]]) # 右偏
pointsC = np.float32([[70, 60], [180, 50], [50, 200]]) # 前偏
pointsD = np.float32([[40, 50], [210, 60], [70, 180]]) # 後偏
points1 = np.float32([[50, 50], [200, 50], [50, 200]])
points2 = random.choice((pointsA, pointsB, pointsC, pointsD))
matrix = cv2.getAffineTransform(points1, points2)
Affine_transfor_img = cv2.warpAffine(img_array, matrix, (cols, rows))
return Affine_transfor_img
def random_rotate_img(img):
rows, cols= img.shape[:2]
angle = random.choice([25, 90, -25, -90, 180])
Matrix = cv2.getRotationMatrix2D((cols / 2, rows / 2), angle, 1)
res = cv2.warpAffine(img, Matrix, (cols, rows), borderMode=cv2.BORDER_CONSTANT)
return res
def random_hsv_transform(img, hue_vari, sat_vari, val_vari):
"""
:param img:
:param hue_vari: 色調變化比例範圍(0,360)
:param sat_vari: 飽和度變化比例範圍(0,1)
:param val_vari: 明度變化比例範圍(0,1)
:return:
"""
hue_delta = np.random.randint(-hue_vari, hue_vari)
sat_mult = 1 + np.random.uniform(-sat_vari, sat_vari)
val_mult = 1 + np.random.uniform(-val_vari, val_vari)
img_hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV).astype(np.float)
img_hsv[:, :, 0] = (img_hsv[:, :, 0] + hue_delta) % 180
img_hsv[:, :, 1] *= sat_mult
img_hsv[:, :, 2] *= val_mult
img_hsv[img_hsv > 255] = 255
return cv2.cvtColor(np.round(img_hsv).astype(np.uint8), cv2.COLOR_HSV2BGR)
def random_gamma_transform(img, gamma_vari):
"""
:param img:
:param gamma_vari:
:return:
"""
log_gamma_vari = np.log(gamma_vari)
alpha = np.random.uniform(-log_gamma_vari, log_gamma_vari)
gamma = np.exp(alpha)
gamma_table = [np.power(x / 255.0, gamma) * 255.0 for x in range(256)]
gamma_table = np.round(np.array(gamma_table)).astype(np.uint8)
return cv2.LUT(img, gamma_table)
def random_flip_img(img):
"""
0 = X axis, 1 = Y axis, -1 = both
:param img:
:return:
"""
flip_val = [0,1,-1]
random_flip_val = random.choice(flip_val)
res = cv2.flip(img, random_flip_val)
return res
def clamp(pv): #防止像素溢出
if pv > 255:
return 255
if pv < 0:
return 0
else:
return pv
def gaussian_noise(image): # 加高斯噪聲
"""
:param image:
:return:
"""
h, w, c = image.shape
for row in range(h):
for col in range(w):
s = np.random.normal(0, 20, 3)
b = image[row, col, 0] # blue
g = image[row, col, 1] # green
r = image[row, col, 2] # red
image[row, col, 0] = clamp(b + s[0])
image[row, col, 1] = clamp(g + s[1])
image[row, col, 2] = clamp(r + s[2])
return image
def get_img(input_dir):
img_path_list = []
for (root_path,dirname,filenames) in os.walk(input_dir):
for filename in filenames:
Suffix_name = [‘.png‘, ‘.jpg‘, ‘.tif‘, ‘.jpeg‘]
if filename.endswith(tuple(Suffix_name)):
img_path = root_path+"/"+filename
img_path_list.append(img_path)
return img_path_list
class IMG_QUEUE(threading.Thread):
def __init__(self, name):
super().__init__(name=name)
def run(self):
while True:
try:
img_path = img_path_list.pop(0)
q.put(img_path)
except IndexError:
break
class IMG_AUG(threading.Thread):
def __init__(self, name):
super().__init__(name=name)
self.q = q
def run(self):
while True:
if q.not_empty:
img_path = q.get()
try:
print("doing...")
img_array = cv2.imread(img_path)
Affine_transfor_img = Affine_transformation(img_array)
cv2.imwrite(output_dir + "/" + img_path[len(input_dir):-4] + ‘_Affine_transfor.png‘, Affine_transfor_img)
res_rotate = random_rotate_img(img_array)
cv2.imwrite(output_dir + "/" + img_path[len(input_dir):-4] + ‘_rotate_img.png‘,res_rotate)
GAMMA_IMG = random_gamma_transform(img_array, 0.3)
cv2.imwrite(output_dir + "/" + img_path[len(input_dir):-4] + ‘_GAMMA_IMG.png‘,GAMMA_IMG)
res_flip = random_flip_img(img_array)
cv2.imwrite(output_dir + "/" + img_path[len(input_dir):-4] + ‘_flip_img.png‘,res_flip)
G_Noiseimg = gaussian_noise(img_array)
cv2.imwrite(output_dir + "/" + img_path[len(input_dir):-4] + ‘_G_Noise_img.png‘,G_Noiseimg)
HSV_IMG = random_hsv_transform(img_array, 2, 0.3, 0.6)
cv2.imwrite(output_dir + "/" + img_path[len(input_dir):-4] + ‘_HSV_IMG.png‘,HSV_IMG)
except:
print("圖像格式錯誤!")
pass
q.task_done()
else:
break
if __name__ == ‘__main__‘:
input_dir = ‘./cccc‘
output_dir = ‘./eeee‘
start_time = time.time() # 開始計時
img_path_list = get_img(input_dir) # 獲取圖像數據
q = queue.Queue(10) # 設置隊列元素個數
my_thread = IMG_QUEUE(‘IMG_QUEUE‘) # 實例化
my_thread.setDaemon(True) # 設置為守護進程,主線程退出時,子進程也kill掉
my_thread.start() # 啟動進程
for i in range(5): # 設置線程個數(批量任務時,線程數不必太大,註意內存及CPU負載)
mp_thread = IMG_AUG(‘IMG_AUG‘)
mp_thread.setDaemon(True)
mp_thread.start()
q.join() # 線程阻塞(等待所有子線程處理完成,再退出)
end_time = time.time()
print("Total Spend time:", str((end_time - start_time) / 60)[0:6] + "分鐘")
多線程-創建圖像縮略圖(等比縮放圖像)
import os
from PIL import Image
import threading
import time
import queue
def get_img(input_dir):
img_path_list = []
for (root_path,dirname,filenames) in os.walk(input_dir):
for filename in filenames:
Suffix_name = [‘.png‘, ‘.jpg‘, ‘.tif‘, ‘.jpeg‘]
if filename.endswith(tuple(Suffix_name)):
img_path = root_path+"/"+filename
img_path_list.append(img_path)
return img_path_list
class IMG_QUEUE(threading.Thread):
def __init__(self, name):
super().__init__(name=name)
def run(self):
while True:
try:
img_path = img_path_list.pop(0)
q.put(img_path)
except IndexError:
break
class IMG_RESIZE(threading.Thread):
def __init__(self, name):
super().__init__(name=name)
def run(self):
while True:
if q.not_empty:
img_path = q.get()
try:
im = Image.open(img_path)
im.thumbnail((size, size))
print(im.format, im.size, im.mode)
im.save(img_path, ‘JPEG‘)
except:
print("圖像格式錯誤!")
pass
q.task_done()
else:
break
if __name__==‘__main__‘:
input_dir = ‘D:\\20190112_20190114_all‘ #需要創建縮略圖,圖片的目錄
start_time = time.time() # 開始計時
img_path_list = get_img(input_dir) # 獲取圖像數據
size = 800
q = queue.Queue(100) # 設置隊列元素個數
my_thread = IMG_QUEUE(‘IMG_QUEUE‘) # 實例化
my_thread.setDaemon(True) # 設置為守護進程,主線程退出時,子進程也kill掉
my_thread.start() # 啟動進程
for i in range(5): # 設置線程個數(批量任務時,線程數不必太大,註意內存及CPU負載)
mp_thread = IMG_RESIZE(str(i))
mp_thread.setDaemon(True)
mp_thread.start()
q.join() # 線程阻塞(等待所有子線程處理完成,再退出)
end_time = time.time() # 計時結束
print("Total Spend time:", str((end_time - start_time) / 60)[0:6] + "分鐘")
python 多線程與隊列