程式碼編寫生產者與消費者模式思路
## **目前有個任務是建立大量的資料同時向kafka裡寫入,於是之前開了大量的執行緒建立資料並寫入,發現kafka並不能連線那麼多執行緒,後來就用到生產者與消費者模式,大量的執行緒生產資料放入佇列中,然後只開幾個執行緒從佇列中獲取並寫入kafka.**
**
**用到的技術:CountDownLatch,LinkedBlockingQueue,volatile,newFixedThreadPool**
1.執行緒池:
Java通過Executors提供四種執行緒池,分別為:
newCachedThreadPool建立一個可快取執行緒池,如果執行緒池長度超過處理需要,可靈活回收空閒執行緒,若無可回收,則新建執行緒。
newFixedThreadPool 建立一個定長執行緒池,可控制執行緒最大併發數,超出的執行緒會在佇列中等待。
newScheduledThreadPool 建立一個定長執行緒池,支援定時及週期性任務執行。
newSingleThreadExecutor 建立一個單執行緒化的執行緒池,它只會用唯一的工作執行緒來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先順序)執行。
我這裡用到了newFixedThreadPool:
2.volatile
這裡主要是標誌生成是否結束,為了多執行緒的可見性,用到了volatile,主要是當主執行緒改變了flag,會及時的修改主存的值,並把執行緒中flag上效,然後執行緒會再從主存獲取。
3.CountDownLatch
主要是計算執行緒是否多結束了
用到了
countDown() 結束一個執行緒就down一個
await();判斷執行緒的數量是否為0,或則不會執行下的程式碼,一直在這路
4.LinkedBlockingQueue
是一個阻塞的執行緒安全的佇列,底層採用連結串列實現
新增元素的方法有三個:add,put,offer,且這三個元素都是向佇列尾部新增元素的意思。
區別:
add方法在新增元素的時候,若超出了度列的長度會直接丟擲異常:
put方法,若向隊尾新增元素的時候發現佇列已經滿了會發生阻塞一直等待空間,以加入元素。
offer方法在新增元素時,如果發現佇列已滿無法新增的話,會直接返回false。
從佇列中取出並移除頭元素的方法有:poll,remove,take。
poll: 若佇列為空,返回null。
remove:若佇列為空,丟擲NoSuchElementException異常。
take:若佇列為空,發生阻塞,等待有元素。
//主執行緒
try {
CDL=new CountDownLatch(createDateThreadNum);
PDL=new CountDownLatch(kafkaThreadNum);
//佇列
LinkedBlockingQueue<String> concurrentLinkedQueue = new LinkedBlockingQueue<String>();
ExecutorService executorService =
Executors.newFixedThreadPool(createDateThreadNum+kafkaThreadNum);
for(int i=0;i<createDateThreadNum;i++){
//建立資料執行緒 把資料放入concurrentLinkedQueue中
executorService.submit(new
CreateDataRunnable(runThreadNum,concurrentLinkedQueue,settingMap,CDL));
}
for(int i=0;i<kafkaThreadNum;i++){
//kafka執行緒 從concurrentLinkedQueue取資料
executorService.submit(new
ProducerRunnable(concurrentLinkedQueue,settingMap,PDL ));
}
#建立資料執行緒結束
CDL.await();
#計量標誌為false,表示生成結束 volatile型別
ProducerRunnable.flag=false;
PDL.await();
} catch (Exception e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
生產者執行緒
public void run() {
if(concurrentLinkedQueue!=null)
concurrentLinkedQueue.put(json);
if (CDL != null)
CDL.countDown();
}
消費者執行緒
public class ProducerRunnable implements Runnable {
private static Logger LOG = LoggerFactory.getLogger(ProducerRunnable.class);
private LinkedBlockingQueue<String> concurrentLinkedQueue;
private static CountDownLatch PDL;
public static volatile boolean flag=true;
public ProducerRunnable(LinkedBlockingQueue<String> concurrentLinkedQueue,Map<String,Object> settingMap,CountDownLatch PDL) {
this.concurrentLinkedQueue=concurrentLinkedQueue;
this.PDL=PDL;
}
public void run() {
if(concurrentLinkedQueue!=null)
{
//判斷生產者是否結束與佇列是否為空
//如果生產者結束並佇列為空,則結束消費者執行緒
while (flag||!concurrentLinkedQueue.isEmpty()){
String json=concurrentLinkedQueue.take();
}
if(PDL!=null)
PDL.countDown();
}
}
}