生產消費者模式與python+redis例項運用(中級篇)
上一篇文章介紹了生產消費者模式與python+redis例項運用(基礎篇),但是依舊遺留了一個問題,就是如果消費者消費的速度跟不上生產者,依舊會浪費我們大量的時間去等待,這時候我們就可以考慮使用多程序去解決這個問題。舉一個類似上面廚師做菜的例子,廚師上菜速度又快又多,你一個人吃不完,怎麼辦?那就可以請幾個朋友幫你一起吃!這就是我們今天要講的多程序和協程。
先說說多程序和協程的好處,它可以最大的利用你cpu的資源和網路頻寬,這樣可以充分的節省程式消費的時間。在生產消費者模式中引入多程序和協程之前我們先簡單入手一個例項(供初學者可以看懂之後的程式碼):
from multiprocessing import Process import time def test(i): while True: print("我是子程序"+str(i)) time.sleep(2) if i==1: time.sleep(10) print("我是程序" + str(i)) # print[(x,y) for x in range(10) if x%2 if x>3 for y in range(10) if y > 7 if y != 8]if __name__ == '__main__': processes=[] for i in range(3): p = Process(target=test,args=(i,)) p.start() processes.append(p) print(processes) for p in processes: p.join()
以上的程式碼是一段簡單的多程序,我們可以看到執行後三個程序都在執行,由於我這裡讓程序1睡眠10s,程序1和程序0,2是相對獨立的,這裡1的睡眠不會影響到程序0和2的執行,我們可以看到他們依舊在控制檯不斷的輸出,這樣,我們的多程序+消費者模式的雛形就出來了,現在只需要把我們消費者的業務邏輯替換進去就可以了。這裡多說一句,有時候我們會不清楚多程序和多執行緒的概念,這個可以這麼理解,程序消費的資源是一個大類,而執行緒消費的資源是從程序裡面來的,一個執行緒死了會影響到其他執行緒,但多程序之間是相對獨立的,一個程序死了不會影響其他程序執行,這就是為什麼我會在這裡使用多程序而不是多執行緒,當然,對於多執行緒還有一定的優化,那就是協程,這個點我們會在生產消費者模式與python+redis例項運用(次高階篇)中介紹到。
好了,有了多程序的概念我們就可以開始生消模式的實現了:
這裡我們的生產者與上一篇的不變:
from DBUtil import * import time from pandasql import sqldf import redis def product(i): length=r.llen("goods2") print(length) if length>5000: print("長度過大睡一會") time.sleep(1) product(i) else: #生產者 r.lpush("goods2", "good1"+str(i)) print("加入一個值睡一會") # time.sleep(5) if __name__ == '__main__': # 此處表示迴圈10000次,往redis裡面放10000次資料 for i in range(10000): product(i)
消費者開啟多程序,在消費者開啟多程序的時候我們會遇到一個問題,就是多個程序同時去搶同一個資源的情況,這個時候我們可以選擇加鎖到資源,也就是redis會話佇列上,當某個程序拿資源的時候redis會話佇列加上鎖,保證其他程序拿不到這個資源,當這個程序拿完資源後,釋放鎖,讓其他程序去搶佔資源:
import time import redis from multiprocessing import Process,Lock pool=redis.ConnectionPool(host='localhost', port=6379,db=1,decode_responses=True) r=redis.Redis(connection_pool=pool) def users(lock): length = r.llen("goods2") print(length) while True: # print(1) # 對資源進行加鎖 lock.acquire() if length > 0: goods = r.lpop("goods2") # 獲得資源後釋放鎖 lock.release() #以下可以寫自己的業務邏輯操作 try: data = goods print(data) if str(goods) == "None": print("無值多等等") time.sleep(2) except: print("無值等等") time.sleep(2) users() else: print("無值等等") time.sleep(10) users(lock) if __name__ == '__main__': lock = Lock() processes = [] for i in range(20): p = Process(target=users, args=(lock,)) p.start() processes.append(p) for p in processes: p.join() print('處理完成')
以上就是多程序+生消模式,接下來會介紹生產消費者模式與python+redis例項運用(次高階篇),這裡會加上協程的概念。歡迎收看。