Lab-讀取Excel作柱狀圖
1.什麼是生產者消費者模式
生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞佇列來進行通訊,所以生產者生產完資料之後不用等待消費者處理,直接扔給阻塞佇列,消費者不找生產者要資料,而是直接從阻塞佇列裡取,阻塞佇列就相當於一個緩衝區,平衡了生產者和消費者的處理能力。
這個阻塞佇列就是用來給生產者和消費者解耦的。縱觀大多數設計模式,都會找一個第三者出來進行解耦,如工廠模式的第三者是工廠類,模板模式的第三者是模板類。在學習一些設計模式的過程中,如果先找到這個模式的第三者,能幫助我們快速熟悉一個設計模式。
2.生產消費者模型
生產者消費者模型具體來講,就是在一個系統中,存在生產者和消費者兩種角色,他們通過記憶體緩衝區進行通訊,生產者生產消費者需要的資料,消費者把資料做成產品。生產消費者模式如下圖。
在日益發展的服務型別中,譬如註冊使用者這種服務,它可能解耦成好幾種獨立的服務(賬號驗證,郵箱驗證碼,手機簡訊碼等)。它們作為消費者,等待使用者輸入資料,在前臺資料提交之後會經過分解併發送到各個服務所在的url,分發的那個角色就相當於生產者。消費者在獲取資料時候有可能一次不能處理完,那麼它們各自有一個請求佇列,那就是記憶體緩衝區了。做這項工作的框架叫做訊息佇列。
3.生產者消費者模型的實現
生產者是一堆執行緒,消費者是另一堆執行緒,記憶體緩衝區可以使用List陣列佇列,資料型別只需要定義一個簡單的類就好。關鍵是如何處理多執行緒之間的協作。這其實也是多執行緒通訊的一個範例。
在這個模型中,最關鍵就是記憶體緩衝區為空的時候消費者必須等待,而記憶體緩衝區滿的時候,生產者必須等待。其他時候可以是個動態平衡。值得注意的是多執行緒對臨界區資源的操作時候必須保證在讀寫中只能存在一個執行緒,所以需要設計鎖的策略。
4.為什麼要使用生產者和消費者模式
線上程世界裡,生產者就是生產資料的執行緒,消費者就是消費資料的執行緒。在多執行緒開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產資料。同樣的道理,如果消費者的處理能力大於生產者,那麼消費者就必須等待生產者。為了解決這種生產消費能力不均衡的問題,所以便有了生產者和消費者模式。
為了不至於太抽象,我們舉一個寄信的例子(雖說這年頭寄信已經不時興,但這個例子還是比較貼切的)。假設你要寄一封平信,大致過程如下:
1、你把信寫好——相當於生產者製造資料
2、你把信放入郵筒——相當於生產者把資料放入緩衝區
3、郵遞員把信從郵筒取出——相當於消費者把資料取出緩衝區
4、郵遞員把信拿去郵局做相應的處理——相當於消費者處理資料
4.1優點
解耦
假設生產者和消費者分別是兩個類。如果讓生產者直接呼叫消費者的某個方法,那麼生產者對於消費者就會產生依賴(也就是耦合)。將來如果消費者的程式碼發生變化,可能會影響到生產者。而如果兩者都依賴於某個緩衝區,兩者之間不直接依賴,耦合也就相應降低了。
接著上述的例子,如果不使用郵筒(也就是緩衝區),你必須得把信直接交給郵遞員。有同學會說,直接給郵遞員不是挺簡單的嘛?其實不簡單,你必須得認識誰是郵遞員,才能把信給他(光憑身上穿的制服,萬一有人假冒,就慘了)。這就產生和你和郵遞員之間的依賴(相當於生產者和消費者的強耦合)。萬一哪天郵遞員換人了,你還要重新認識一下(相當於消費者變化導致修改生產者程式碼)。而郵筒相對來說比較固定,你依賴它的成本就比較低(相當於和緩衝區之間的弱耦合)。
支援併發(concurrency)
生產者直接呼叫消費者的某個方法,還有另一個弊端。由於函式呼叫是同步的(或者叫阻塞的),在消費者的方法沒有返回之前,生產者只好一直等在那邊。萬一消費者處理資料很慢,生產者就會白白糟蹋大好時光。
使用了生產者/消費者模式之後,生產者和消費者可以是兩個獨立的併發主體(常見併發型別有程序和執行緒兩種)。生產者把製造出來的資料往緩衝區一丟,就可以再去生產下一個資料。基本上不用依賴消費者的處理速度。
支援忙閒不均
緩衝區還有另一個好處。如果製造資料的速度時快時慢,緩衝區的好處就體現出來了。當資料製造快的時候,消費者來不及處理,未處理的資料可以暫時存在緩衝區中。等生產者的製造速度慢下來,消費者再慢慢處理掉。
為了充分複用,我們再拿寄信的例子來說事。假設郵遞員一次只能帶走1000封信。萬一某次碰上情人節(也可能是聖誕節)送賀卡,需要寄出去的信超過1000封,這時候郵筒這個緩衝區就派上用場了。郵遞員把來不及帶走的信暫存在郵筒中,等下次過來時再拿走。
5.多生產者和多消費者場景
在多核時代,多執行緒併發處理速度比單執行緒處理速度更快,所以我們可以使用多個執行緒來生產資料,同樣可以使用多個消費執行緒來消費資料。而更復雜的情況是,消費者消費的資料,有可能需要繼續處理,於是消費者處理完資料之後,它又要作為生產者把資料放在新的佇列裡,交給其他消費者繼續處理。如下圖:
6.執行緒池與生產消費者模式
Java中的執行緒池類其實就是一種生產者和消費者模式的實現方式,但是我覺得其實現方式更加高明。生產者把任務丟給執行緒池,執行緒池建立執行緒並處理任務,如果將要執行的任務數大於執行緒池的基本執行緒數就把任務扔到阻塞佇列裡,這種做法比只使用一個阻塞佇列來實現生產者和消費者模式顯然要高明很多,因為消費者能夠處理直接就處理掉了,這樣速度更快,而生產者先存,消費者再取這種方式顯然慢一些。
我們的系統也可以使用執行緒池來實現多生產者消費者模式。比如建立N個不同規模的Java執行緒池來處理不同性質的任務,比如執行緒池1將資料讀到記憶體之後,交給執行緒池2裡的執行緒繼續處理壓縮資料。執行緒池1主要處理IO密集型任務,執行緒池2主要處理CPU密集型任務。
7.記憶體緩衝區
最傳統、最常見的方式:佇列(FIFO)作緩衝。
7.1 執行緒方式
併發執行緒中使用佇列的優缺點
記憶體分配的效能
線上程方式下,生產者和消費者各自是一個執行緒。生產者把資料寫入佇列頭(以下簡稱push),消費者從佇列尾部讀出資料(以下簡稱pop)。當佇列為空,消費者就稍息(稍事休息);當佇列滿(達到最大長度),生產者就稍息。整個流程並不複雜。
上述過程會有一個主要的問題是關於記憶體分配的效能開銷。對於常見的佇列實現:在每次push時,可能涉及到堆記憶體的分配;在每次pop時,可能涉及堆記憶體的釋放。假如生產者和消費者都很勤快,頻繁地push、pop,那記憶體分配的開銷就很可觀了。對於記憶體分配的開銷,可查詢Java效能優化相關知識。
解決辦法:環形緩衝區。
同步和互斥的效能
另外,由於兩個執行緒共用一個佇列,自然就會涉及到執行緒間諸如同步、互斥、死鎖等等。這會兒要細談的是,同步和互斥的效能開銷。在很多場合中,諸如訊號量、互斥量等的使用也是有不小的開銷的(某些情況下,也可能導致使用者態/核心態切換)。如果像剛才所說,生產者和消費者都很勤快,那這些開銷也不容小覷。
解決辦法:雙緩衝區。
適用於佇列的場合
由於佇列是很常見的資料結構,大部分程式語言都內建了佇列的支援,有些語言甚至提供了執行緒安全的佇列(比如JDK 1.5引入的ArrayBlockingQueue)。因此,開發人員可以撿現成,避免了重新發明輪子。
所以,假如你的資料流量不是很大,採用佇列緩衝區的好處還是很明顯的:邏輯清晰、程式碼簡單、維護方便。比較符合KISS原則。
7.2 程序方式
跨程序的生產者/消費者模式,非常依賴於具體的程序間通訊(IPC)方式。而IPC的種類很多。下面介紹比較常用的跨平臺、且程式語言支援較多的IPC方式。
匿名管道
感覺管道是最像佇列的IPC型別。生產者程序在管道的寫端放入資料;消費者程序在管道的讀端取出資料。整個的效果和執行緒中使用佇列非常類似,區別在於使用管道就無需操心執行緒安全、記憶體分配等瑣事(作業系統暗中都幫你搞定了)。
管道又分命名管道和匿名管道兩種,今天主要聊匿名管道。因為命名管道在不同的作業系統下差異較大(比如Win32和POSIX,在命名管道的API介面和功能實現上都有較大差異;有些平臺不支援命名管道,比如Windows CE)。除了作業系統的問題,對於有些程式語言(比如Java)來說,命名管道是無法使用的。
其實匿名管道在不同平臺上的API介面,也是有差異的(比如Win32的CreatePipe和POSIX的pipe,用法就很不一樣)。但是我們可以僅使用標準輸入和標準輸出(以下簡稱stdio)來進行資料的流入流出。然後利用shell的管道符把生產者程序和消費者程序關聯起來。實際上,很多作業系統(尤其是POSIX風格的)自帶的命令都充分利用了這個特性來實現資料的傳輸(比如more、grep等),如此優點:
1、基本上所有作業系統都支援在shell方式下使用管道符。因此很容易實現跨平臺。
2、大部分程式語言都能夠操作stdio,因此跨程式語言也就容易實現。
3、管道方式省卻了執行緒安全方面的瑣事。有利於降低開發、除錯成本。
當然,這種方式也有自身的缺點:
1、生產者程序和消費者程序必須得在同一臺主機上,無法跨機器通訊。這個缺點比較明顯。
2、在一對一的情況下,這種方式挺合用。但如果要擴充套件到一對多或者多對一,那就有點棘手了。所以這種方式的擴充套件性要打個折扣。假如今後要考慮類似的擴充套件,這個缺點就比較明顯。
3、由於管道是shell建立的,對於兩邊的程序不可見(程式看到的只是stdio)。在某些情況下,導致程式不便於對管道進行操縱(比如調整管道緩衝區尺寸)。這個缺點不太明顯。
4、最後,這種方式只能單向傳資料。好在大多數情況下,消費者程序不需要傳資料給生產者程序。萬一你確實需要資訊反饋(從消費者到生產者),那就費勁了。可能得考慮換種IPC方式。
注意事項:
1、對stdio進行讀寫操作是以阻塞方式進行。比如管道中沒有資料,消費者程序的讀操作就會一直停在哪兒,直到管道中重新有資料。
2、由於stdio內部帶有自己的緩衝區(這緩衝區和管道緩衝區是兩碼事),有時會導致一些不太爽的現象(比如生產者程序輸出了資料,但消費者程序沒有立即讀到)。
SOCKET(TCP方式)
基於TCP方式的SOCKET通訊是又一個類似於佇列的IPC方式。它同樣保證了資料的順序到達;同樣有緩衝的機制。而且跨平臺和跨語言,和剛才介紹的shell管道符方式類似。
SOCKET相比shell管道符的方式,主要有如下幾個優點:
1、SOCKET方式可以跨機器(便於實現分散式)。這是主要優點。
2、SOCKET方式便於將來擴充套件成為多對一或者一對多。這也是主要優點。
3、SOCKET可以設定阻塞和非阻塞方法,用起來比較靈活。這是次要優點。
4、SOCKET支援雙向通訊,有利於消費者反饋資訊。
當然有利就有弊。相對於上述shell管道的方式,使用SOCKET在程式設計上會更復雜一些。好在前人已經做了大量的工作,可藉助於這些第三方的庫和框架,比如C++的ACE庫、Python的Twisted。
雖然TCP在很多方面比UDP可靠,但鑑於跨機器通訊先天的不可預料性,可以在生產者程序和消費者程序內部各自再引入基於執行緒的"生產者/消費者模式",如下圖:
這麼做的關鍵點在於把程式碼分為兩部分:生產執行緒和消費執行緒屬於和業務邏輯相關的程式碼(和通訊邏輯無關);傳送執行緒和接收執行緒屬於通訊相關的程式碼(和業務邏輯無關)。
這樣的好處是很明顯的,具體如下:
1、能夠應對暫時性的網路故障。並且在網路故障解除後,能夠繼續工作。
2、網路故障的應對處理方式(比如斷開後的嘗試重連),隻影響傳送和接收執行緒,不會影響生產執行緒和消費執行緒(業務邏輯部分)。
3、具體的SOCKET方式(阻塞和非阻塞)隻影響傳送和接收執行緒,不影響生產執行緒和消費執行緒(業務邏輯部分)。
4、不依賴TCP自身的傳送緩衝區和接收緩衝區。(預設的TCP緩衝區的大小可能無法滿足實際要求)
5、業務邏輯的變化(比如業務需求變更)不影響傳送執行緒和接收執行緒。
針對上述的最後一條,如果整個業務系統中有多個程序是採用上述的模式,那或許可以重構:在業務邏輯程式碼和通訊邏輯程式碼之間,把業務邏輯無關的部分封裝成一個通訊中介軟體。
7.3 環形緩衝區
使用場景:當儲存空間(不僅包括記憶體,還可能包括諸如硬碟之類的儲存介質)的分配/釋放非常頻繁並且確實產生了明顯的影響,才應該考慮環形緩衝區的使用。否則的話,還是選用最基本、最簡單的佇列緩衝區。
環形緩衝區 vs 佇列緩衝區
1.外部介面相似
普通的佇列有一個寫入端和一個讀出端。佇列為空的時候,讀出端無法讀取資料;當佇列滿(達到最大尺寸)時,寫入端無法寫入資料。
對於使用者來講,環形緩衝區和佇列緩衝區是一樣的。它也有一個寫入端(用於push)和一個讀出端(用於pop),也有緩衝區“滿”和“空”的狀態。所以,從佇列緩衝區切換到環形緩衝區,對於使用者來說能比較平滑地過渡。
2.內部結構迥異
雖然兩者的對外介面差不多,但是內部結構和運作機制有很大差別。重點介紹一下環形緩衝區的內部結構。
可以把環形緩衝區的讀出端(以下簡稱R)和寫入端(以下簡稱W)想象成是兩個人在體育場跑道上追逐(R追W)。當R追上W的時候,就是緩衝區為空;當W追上R的時候(W比R多跑一圈),就是緩衝區滿。
為了形象起見,如下:
從上圖可以看出,環形緩衝區所有的push和pop操作都是在一個固定的儲存空間內進行。而佇列緩衝區在push的時候,可能會分配儲存空間用於儲存新元素;在pop時,可能會釋放廢棄元素的儲存空間。所以環形方式相比佇列方式,少掉了對於緩衝區元素所用儲存空間的分配、釋放。這是環形緩衝區的一個主要優勢。
環形緩衝區的實現
1.陣列方式 vs 連結串列方式
環形緩衝區的內部實現,即可基於陣列(此處的陣列,泛指連續儲存空間)實現,也可基於連結串列實現。
陣列在物理儲存上是一維的連續線性結構,可以在初始化時,把儲存空間一次性分配好,這是陣列方式的優點。但是要使用陣列來模擬環,你必須在邏輯上把陣列的頭和尾相連。在順序遍歷陣列時,對尾部元素(最後一個元素)要作一下特殊處理。訪問尾部元素的下一個元素時,要重新回到頭部元素(第0個元素)。如下圖所示:
使用連結串列的方式,正好和陣列相反:連結串列省去了頭尾相連的特殊處理。但是連結串列在初始化的時候比較繁瑣,而且在有些場合(比如跨程序的IPC)不太方便使用。
2.讀寫操作
環形緩衝區要維護兩個索引,分別對應寫入端(W)和讀取端(R)。寫入(push)的時候,先確保環沒滿,然後把資料複製到W所對應的元素,最後W指向下一個元素;讀取(pop)的時候,先確保環沒空,然後返回R對應的元素,最後R指向下一個元素。
3.判斷“空”和“滿”
上述的操作並不複雜,不過有一個小小的麻煩:空環和滿環的時候,R和W都指向同一個位置!這樣就無法判斷到底是“空”還是“滿”。大體上有兩種方法可以解決該問題。
辦法1:始終保持一個元素不用
當空環的時候,R和W重疊。當W比R跑得快,追到距離R還有一個元素間隔的時候,就認為環已經滿。當環內元素佔用的儲存空間較大的時候,這種辦法顯得很土(浪費空間)。
辦法2:維護額外變數
如果不喜歡上述辦法,還可以採用額外的變數來解決。比如可以用一個整數記錄當前環中已經儲存的元素個數(該整數>=0)。當R和W重疊的時候,通過該變數就可以知道是“空”還是“滿”。
4.元素的儲存
由於環形緩衝區本身就是要降低儲存空間分配的開銷,因此緩衝區中元素的型別要選好。儘量儲存值型別的資料,而不要儲存指標(引用)型別的資料。因為指標型別的資料又會引起儲存空間(比如堆記憶體)的分配和釋放,使得環形緩衝區的效果打折扣。
應用場合
如果所使用的程式語言和開發庫中帶有現成的、成熟的環形緩衝區,建議使用現成的庫,不要重新制造輪子;確實找不到現成的,才考慮自己實現。
1.用於併發執行緒
和執行緒中的佇列緩衝區類似,執行緒中的環形緩衝區也要考慮執行緒安全的問題。除非使用的環形緩衝區的庫已經實現了執行緒安全,否則還是得自己動手搞定。執行緒方式下的環形緩衝區用得比較多,相關的網上資料也多,下面就大致介紹幾個。
對於C++的程式設計師,強烈推薦使用boost提供的circular_buffer模板,該模板最開始是在boost 1.35版本中引入的。鑑於boost在C++社群中的地位,大夥兒應該可以放心使用該模板。
對於C程式設計師,可以去看看開源專案circbuf,不過該專案是GPL協議的,不太爽;而且活躍度不太高;而且只有一個開發人員。大夥兒慎用!建議只拿它當參考。
對於C#程式設計師,可以參考CodeProject上的一個示例。
2.用於併發程序
程序間的環形緩衝區,似乎少有現成的庫可用。
適用於程序間環形緩衝的IPC型別,常見的有共享記憶體和檔案。在這兩種方式上進行環形緩衝,通常都採用陣列的方式實現。程式事先分配好一個固定長度的儲存空間,然後具體的讀寫操作、判斷“空”和“滿”、元素儲存等細節就可參照前面所說的來進行。
共享記憶體方式的效能很好,適用於資料流量很大的場景。但是有些語言(比如Java)對於共享記憶體不支援。因此,該方式在多語言協同開發的系統中,會有一定的侷限性。
而檔案方式在程式語言方面支援很好,幾乎所有程式語言都支援操作檔案。但它可能會受限於磁碟讀寫(Disk I/O)的效能。所以檔案方式不太適合於快速資料傳輸;但是對於某些“資料單元”很大的場合,檔案方式是值得考慮的。
對於程序間的環形緩衝區,同樣要考慮好程序間的同步、互斥等問題。
8.生產者消費者模式三種實現方式程式碼示例
8.1synchronized、wait和notify
public class SynchronizedProducerConsumerDemo { public static void main(String[] args) { MyResource myResource = new MyResource(); Thread thread = new Thread(new myProducer(myResource),"producerAAAAA"); Thread thread1 = new Thread(new myProducer(myResource),"producerBBBBB"); Thread thread2 = new Thread(new myProducer(myResource),"producerCCCCC"); Thread thread3 = new Thread(new myConsumer(myResource),"consumerDDDDD"); Thread thread4 = new Thread(new myConsumer(myResource),"consumerEEEEE"); thread.start(); thread1.start(); thread2.start(); thread3.start(); thread4.start(); } } /** * 資源類 */ class MyResource{ //當前的資源數 private int num = 0; //最大資源數 private int MAX_RESOURCE = 10; //生產方法 public synchronized void add(){ //資源沒有滿就生產 if (num < MAX_RESOURCE){ num ++ ; System.out.println(Thread.currentThread().getName() +"\t produce resource" +num); notifyAll(); } else { //資源滿就等待 try { wait(); System.out.println(Thread.currentThread().getName() +"\t wait to produce"); } catch (InterruptedException e) { e.printStackTrace(); } } } //消費方法 public synchronized void remove(){ //不空就消費 if (num > 0){ num -- ; System.out.println(Thread.currentThread().getName() +"\t consume resource "+(num+1)); notifyAll(); } else{ //資源空,消費就等待 try { wait(); System.out.println(Thread.currentThread().getName() +"\t wait to consume"); } catch (InterruptedException e) { e.printStackTrace(); } } } } /** * 消費者 */ class myProducer implements Runnable{ private MyResource myResource; public myProducer(MyResource myResource) { this.myResource = myResource; } @Override public void run() { while (true){ try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } myResource.add(); } } } /** * 生產者 */ class myConsumer implements Runnable{ private MyResource myResource; public myConsumer(MyResource myResource) { this.myResource = myResource; } @Override public void run() { while (true){ try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } myResource.remove(); } } }