併發框架Disruptor場景應用
阿新 • • 發佈:2018-11-03
轉自 https://blog.csdn.net/qq_19558705/article/details/77247912
今天用一個停車場問題來加深對Disruptor的理解。一個有關汽車進入停車場的問題。當汽車進入停車場時,系統首先會記錄汽車資訊。同時也會發送訊息到其他系統處理相關業務,最後傳送簡訊通知車主收費開始。看了很多文章,裡面的程式碼都是大同小異的,可能程式碼真的是很經典。以下程式碼也是來源網路,只是自己手動敲的,加了一些註釋。
程式碼包含以下內容:
1) 事件物件Event
2)三個消費者Handler
3)一個生產者Processer
4)執行Main方法
Event類:汽車資訊
public class MyInParkingDataEvent {
private String carLicense; // 車牌號
public String getCarLicense() {
return carLicense;
}
public void setCarLicense(String carLicense) {
this.carLicense = carLicense;
}
}
Handler類:一個負責儲存汽車資料,一個負責傳送kafka資訊到其他系統中,最後一個負責給車主發簡訊通知
import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.WorkHandler; /** * Handler 第一個消費者,負責儲存進場汽車的資訊 * */ public class MyParkingDataInDbHandler implements EventHandler<MyInParkingDataEvent> , WorkHandler<MyInParkingDataEvent>{ @Override public void onEvent(MyInParkingDataEvent myInParkingDataEvent) throws Exception { long threadId = Thread.currentThread().getId(); // 獲取當前執行緒id String carLicense = myInParkingDataEvent.getCarLicense(); // 獲取車牌號 System.out.println(String.format("Thread Id %s 儲存 %s 到資料庫中 ....", threadId, carLicense)); } @Override public void onEvent(MyInParkingDataEvent myInParkingDataEvent, long sequence, boolean endOfBatch) throws Exception { this.onEvent(myInParkingDataEvent); } }
import com.lmax.disruptor.EventHandler; /** * 第二個消費者,負責傳送通知告知工作人員(Kafka是一種高吞吐量的分散式釋出訂閱訊息系統) */ public class MyParkingDataToKafkaHandler implements EventHandler<MyInParkingDataEvent>{ @Override public void onEvent(MyInParkingDataEvent myInParkingDataEvent, long sequence, boolean endOfBatch) throws Exception { long threadId = Thread.currentThread().getId(); // 獲取當前執行緒id String carLicense = myInParkingDataEvent.getCarLicense(); // 獲取車牌號 System.out.println(String.format("Thread Id %s 傳送 %s 進入停車場資訊給 kafka系統...", threadId, carLicense)); } }
import com.lmax.disruptor.EventHandler;
/**
* 第三個消費者,sms簡訊服務,告知司機你已經進入停車場,計費開始。
*/
public class MyParkingDataSmsHandler implements EventHandler<MyInParkingDataEvent>{
@Override
public void onEvent(MyInParkingDataEvent myInParkingDataEvent, long sequence, boolean endOfBatch)
throws Exception {
long threadId = Thread.currentThread().getId(); // 獲取當前執行緒id
String carLicense = myInParkingDataEvent.getCarLicense(); // 獲取車牌號
System.out.println(String.format("Thread Id %s 給 %s 的車主傳送一條簡訊,並告知他計費開始了 ....", threadId, carLicense));
}
}
Producer類:負責上報停車資料
import java.util.concurrent.CountDownLatch;
import com.lmax.disruptor.EventTranslator;
import com.lmax.disruptor.dsl.Disruptor;
/**
* 生產者,進入停車場的車輛
*/
public class MyInParkingDataEventPublisher implements Runnable{
private CountDownLatch countDownLatch; // 用於監聽初始化操作,等初始化執行完畢後,通知主執行緒繼續工作
private Disruptor<MyInParkingDataEvent> disruptor;
private static final Integer NUM = 1; // 1,10,100,1000
public MyInParkingDataEventPublisher(CountDownLatch countDownLatch,
Disruptor<MyInParkingDataEvent> disruptor) {
this.countDownLatch = countDownLatch;
this.disruptor = disruptor;
}
@Override
public void run() {
MyInParkingDataEventTranslator eventTranslator = new MyInParkingDataEventTranslator();
try {
for(int i = 0; i < NUM; i ++) {
disruptor.publishEvent(eventTranslator);
Thread.sleep(1000); // 假設一秒鐘進一輛車
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
countDownLatch.countDown(); // 執行完畢後通知 await()方法
System.out.println(NUM + "輛車已經全部進入進入停車場!");
}
}
}
class MyInParkingDataEventTranslator implements EventTranslator<MyInParkingDataEvent> {
@Override
public void translateTo(MyInParkingDataEvent myInParkingDataEvent, long sequence) {
this.generateData(myInParkingDataEvent);
}
private MyInParkingDataEvent generateData(MyInParkingDataEvent myInParkingDataEvent) {
myInParkingDataEvent.setCarLicense("車牌號: 鄂A-" + (int)(Math.random() * 100000)); // 隨機生成一個車牌號
System.out.println("Thread Id " + Thread.currentThread().getId() + " 寫完一個event");
return myInParkingDataEvent;
}
}
執行的Main方法:
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.EventHandlerGroup;
import com.lmax.disruptor.dsl.ProducerType;
/**
* 執行的Main方法 ,
* 一個生產者(汽車進入停車場);
* 三個消費者(一個記錄汽車資訊,一個傳送訊息給系統,一個傳送訊息告知司機)
* 前兩個消費者同步執行,都有結果了再執行第三個消費者
*/
public class MyInParkingDataEventMain {
public static void main(String[] args) {
long beginTime=System.currentTimeMillis();
int bufferSize = 2048; // 2的N次方
try {
// 建立執行緒池,負責處理Disruptor的四個消費者
ExecutorService executor = Executors.newFixedThreadPool(4);
// 初始化一個 Disruptor
Disruptor<MyInParkingDataEvent> disruptor = new Disruptor<MyInParkingDataEvent>(new EventFactory<MyInParkingDataEvent>() {
@Override
public MyInParkingDataEvent newInstance() {
return new MyInParkingDataEvent(); // Event 初始化工廠
}
}, bufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy());
// 使用disruptor建立消費者組 MyParkingDataInDbHandler 和 MyParkingDataToKafkaHandler
EventHandlerGroup<MyInParkingDataEvent> handlerGroup = disruptor.handleEventsWith(
new MyParkingDataInDbHandler(), new MyParkingDataToKafkaHandler());
// 當上面兩個消費者處理結束後在消耗 smsHandler
MyParkingDataSmsHandler myParkingDataSmsHandler = new MyParkingDataSmsHandler();
handlerGroup.then(myParkingDataSmsHandler);
// 啟動Disruptor
disruptor.start();
CountDownLatch countDownLatch = new CountDownLatch(1); // 一個生產者執行緒準備好了就可以通知主執行緒繼續工作了
// 生產者生成資料
executor.submit(new MyInParkingDataEventPublisher(countDownLatch, disruptor));
countDownLatch.await(); // 等待生產者結束
disruptor.shutdown();
executor.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("總耗時:"+(System.currentTimeMillis()-beginTime));
}
}