1. 程式人生 > >2018.11.9

2018.11.9

執行緒 和程序的區別
程序是一個資源單位
一個程序可以包含多個執行緒
多個執行緒之間資料可以共享
執行緒開銷比程序小
在多執行緒中CPU的切換速度會非常快 但資源消耗沒有程序高


1生產者消費者模型
模型 設計模式 三層結構 等等表示的都是一種程式設計套路
生產者指的是能夠產生資料的一類任務
消費者指的是處理資料的一類任務

需求: 資料夾裡有十個文字文件 要求你找出檔案中包含習大大關鍵字的檔案
開啟並讀取檔案資料就是生產者
查詢關鍵字的過程就是消費者

生產者消費者模型為什麼出現?
生產者的處理能力與消費者的處理能力 不匹配不平衡 導致了一方等待另一方 浪費時間
目前我們通過多程序將生產 和 消費 分開處理
然後將生產者生產的資料通過佇列交給消費者

總結一下在生產者消費者模型中 不僅需要生產者消費者 還需要一個共享資料區域
1.將生產方和消費方耦合度降低
2.平衡雙方的能力 提高整體效率

程式碼實現 :
搞兩個程序 一個負責生產 一個負責消費
資料需要共享所以來個佇列

JoinableQueue 和Queue的區別
Queue裡面沒有類似task_done和join功能的方法
JoinableQueue 裡有task_done()方法 和join方法
JoinableQueue 創造了一個物件容器put方法可以讓生產者產生的資料存在物件容器裡task_done可以計算
容器裡的資料是否取完等取完了之後執行到join方法就代表裡面的資料已經取完然後可以在join
這個程式碼後面自己手動殺死已經取完資料的消費者的程序或者讓消費者程序變成守護程序
from multiprocessing import Process,JoinableQueue
import time,random
def make_hotdog(queue,name):
for i in range(10):
time.sleep(random.randint(1,2))
print("%s製作了%s個熱狗"%(name,i))
data ="%s製作了的熱狗%s"%(name,i)
queue.put(data)


def eat_hotdog(queue,name):
while True:
data = queue.get()
time.sleep(random.randint(1, 2))
print("%s 吃了%s" % (name,data))
queue.task_done()

 


if __name__ == '__main__':
q =JoinableQueue()
p1 = Process(target=make_hotdog,args=(q,"jt的店"))
p2 = Process(target=make_hotdog, args=(q,"yd的店"))
p3 = Process(target=make_hotdog, args=(q,"wd的店"))

c1 = Process(target=eat_hotdog, args=(q,"周鐵蛋在吃"))
c2 = Process(target=eat_hotdog, args=(q,"周銅蛋在吃"))
p1.start()
p2.start()
p3.start()

c1.start()
c2.start()
p1.join()
p2.join()
p3.join()
q.join()

print("完")

2執行緒


# 在多程序中 開啟子程序需要消耗大量的資源 所以主程序會先比子程序執行
# 子執行緒的開啟速度比程序快的多
# 在多執行緒中 子執行緒可以直接訪問主執行緒的內容
# 多個執行緒之間時平等的 所以不存在父子關係
# 在今後的開發中 每當出現i/o阻塞 比較耗時的操作
rom threading import Thread

import time,os

def task():
# time.sleep(2)
print("子執行緒 run.....")
print(os.getpid())

if __name__ == '__main__':
t = Thread(target=task)
t.start()
# 主執行緒等到子執行緒結束
# t.join()
print("over")
print(os.getpid())

2.1守護執行緒
from threading import Thread
import time

def task():
time.sleep(5)
print("子執行緒...")

t = Thread(target=task)
t.daemon = True # 守護執行緒 執行順序與程序中一樣
t.start()
print("over")

 

2.2常用的一些方法
current_thread() 獲取當前執行緒物件 name屬性 active_count() 當前活躍執行緒的數量
enumerate()返回活躍的執行緒物件列舉(當前的執行緒物件有哪些)
<_MainThread(MainThread, started 1036)>, <Thread(矮根執行緒!, started 19980)>

from threading import Thread,current_thread,active_count,enumerate
import time

def task():
print("子執行緒...")
time.sleep(1)
# 獲取當前執行緒物件 非常常用
print(current_thread(),"1")

t = Thread(target=task,name="矮根執行緒!")
# t.daemon = True # 守護執行緒 執行順序與程序中一樣

print(t.name)
print(t,"2")
t.start()
# 獲取當前活躍執行緒的數量
print(active_count())
# 返回活躍的執行緒物件列舉
print(enumerate(),"3")
print("over")

2.3啟動執行緒的另一種方式 和程序類似
from threading import Thread,current_thread

class MyThread(Thread):
def run(self):
print("run 函式執行!")
print(current_thread())

mt = MyThread()
mt.start()
print(current_thread())

3.鎖
from threading import Thread,Lock
# 建立一個互斥鎖
mutex = Lock()

def task1():
# 鎖定
mutex.acquire()
for i in range(100):
print("===================")
# 開啟
mutex.release()
def task2():
mutex.acquire()
for i in range(100):
print("!!!!!!!!!!!!!!!!!!")
mutex.release()

def task3():
mutex.acquire()
for i in range(100):
print("********************")
mutex.release()

t1 = Thread(target=task1)
t2 = Thread(target=task2)
t3 = Thread(target=task3)

t1.start()
t2.start()
t3.start()

3.1死鎖
from threading import Thread,Lock,current_thread,RLock
import time
# 叉子
locka = RLock()
# 盤子
lockb = RLock()

def task1():
print(current_thread())
locka.acquire()
print("搶到叉子 需要盤子")
time.sleep(0.1)
lockb.acquire()
print("吃飯")

lockb.release()
locka.release()

def task2():
print(current_thread())
lockb.acquire()
print("搶到盤子 需要叉子")
time.sleep(0.1)
locka.acquire()
print("吃飯")
locka.release()
lockb.release()


t1 = Thread(target=task1)
t1.start()
t2 = Thread(target=task2)
t2.start()
#死鎖是task1搶到了lock a鎖,task2搶到了lock b 鎖,task1 的lock a嚮往下走但是 但是被task2的
lock b鎖住了 反之task2 lock b也是被task1 的lock a鎖住 從而導致死鎖
# 死鎖發生的條件 有多個執行緒 多個鎖 如果只有一個鎖 無論是LOCK RLOK 卡不死(前提是邏輯上沒有錯誤)
# RLock 就算你的程式碼邏輯不對 同一個執行緒多次對一個鎖執行acquire 也不會卡死

3.2RLock方法
# RLock 遞迴鎖 重入鎖 可以多次執行acquire
# 對於同一個執行緒而言 可以多次acquire 其他執行緒會被阻塞
from threading import Thread,Lock,RLock,current_thread

# lock = RLock()
#
# lock.acquire()
# print("aaaaaaaaaaa")
# lock.acquire()
# print("bbbbbbbbbbb")

 

import time
lock = RLock()
# 對於同一個執行緒而言 可以多次acquire 其他執行緒會被阻塞
def task():
lock.acquire()
for i in range(5):
time.sleep(1)
print(current_thread())
lock.release()

Thread(target=task).start()
Thread(target=task).start()

4訊號量 Semaphore
from threading import Thread,Semaphore,current_thread,active_count
import time
# 用於控制 同時執行被鎖定程式碼的執行緒數量 也就是執行緒的併發數量
# 也是一種鎖
sm = Semaphore(1)

def task():
sm.acquire()
for i in range(10):
print(current_thread())
time.sleep(0.5)
sm.release()

def task2():
for i in range(10):
print(current_thread())
time.sleep(0.5)


for i in range(5):
Thread(target=task).start()
Thread(target=task2).start()
print(active_count())