SpringBoot多執行緒生產者消費者模型應用——排隊叫號實驗模擬(一)
阿新 • • 發佈:2022-03-02
1. 需求說明
目前的需求是在web端做一個排隊叫號系統的過程模擬,目前實現了前半部分,使用到了生產者消費者模型,雖然比較簡單,但還是記錄一下。
2. 目前實現進度
完成了Thread A放客戶到緩衝區,Thread B從緩衝區取客戶並放入redis佇列的過程。
實現效果圖:
3.關鍵程式碼
3.1 緩衝區實現
因為客戶有時間標籤,每個人的標籤基本上不一樣,所以緩衝區考慮併發的優先順序佇列。
@Component public class PatientTimeSeqBuffer { private static final PriorityBlockingQueue<PatientDto> BUFFER_QUEUE = new PriorityBlockingQueue<>(200); // 放入快取區 public void putPatientDto2Buffer(PatientDto patient) { try { BUFFER_QUEUE.put(patient); }catch (Exception e) { e.printStackTrace(); } } // 取出優先順序最高的元素 public PatientDto getPatientDto() { try { return BUFFER_QUEUE.take(); }catch (InterruptedException e) { e.printStackTrace(); } return null; } }
3.2 生產者Async流程
@Autowired private PatientInfoService patientInfoService; // 查客戶資訊 @Autowired private PatientTimeSeqBuffer buffer; // 緩衝池 @Async("arriveTaskExecutor") @Override public void timeSleepPut2Buffer(long startTime, List<PatientDto> patientDtoList) { while (!patientDtoList.isEmpty()) { // 給的總列表不空,就一直準備出隊 PatientDto dto = patientDtoList.remove(0); // 移除隊首 Long addBufferTime = dto.getCurrentTime(); // 以下皆為業務程式碼 if (addBufferTime - startTime > 0) { try { Thread.sleep(addBufferTime - startTime); // 這裡寫的不好,但沒想到別的解決方案,這裡執行緒池最多隻有一個執行緒 } catch (InterruptedException e) { e.printStackTrace(); } } buffer.putPatientDto2Buffer(dto); log.info("{}進入了緩衝區at{}", dto, new Date()); } }
3.3 消費者Async流程
@Autowired private PatientInfoService patientInfoService; @Autowired private WeightedQueuing algorithmService; @Autowired private final QueueUtil queueUtil; public AsyncServiceImpl(QueueUtil queueUtil) { this.queueUtil = queueUtil; } @Async("computeTaskExecutor") @Override public void computeNextRoom() { // 依靠別的執行緒來打斷自己 while (true) { // 先從快取區中取客戶,若無客戶則阻塞自己 PatientDto patientDto = buffer.getPatientDto(); // 計算 PatientInfo patientInfo = patientInfoService.getById(patientDto.getPatientId()); RoomInfo result = algorithmService.computeNextRoom(patientInfo); // 準備插入資料 PatientVo vo = new PatientVo(patientInfo); synchronized (queueUtil) { queueUtil.addPatient2Queue(result.getRoomCode(), vo); log.info("{} 去 {}, 在{}時", patientInfo.getPatientName(), result.getRoomName(), new Date()); } } }
3.4 PatientDto及測試程式碼
/*
* PatientDto.java
*/
@Setter
@Getter
@NoArgsConstructor
@AllArgsConstructor
public class PatientDto implements Comparable<PatientDto>{
private Long patientId;
private Integer queueNum;
private String patientName;
private String patientGender;
private Long currentTime;
public PatientDto(PatientVo vo, long time) {
this.patientId = vo.getPatientId();
this.queueNum = vo.getQueueNum();
this.patientName = vo.getPatientName();
this.patientGender = vo.getPatientGender();
this.currentTime = time;
}
public PatientDto(PatientInfo info, long time) {
this.patientId = info.getPatientId();
this.queueNum = info.getQueueNum();
this.patientName = info.getPatientName();
this.patientGender = info.getPatientSex();
this.currentTime = time;
}
@Override
public String toString() {
return "PatientDto{" +
patientId +
", " + patientName +
", " + currentTime +
'}';
}
/**
* 令時間早的優先順序高
* @param o
* @return
*/
@Override
public int compareTo(PatientDto o) {
int compareTo = this.currentTime.compareTo(o.currentTime);
compareTo = -compareTo;
return compareTo;
}
}
/*
* 測試Controller方法
*/
@GetMapping("/simu/start")
public Result ScheduleService() {
QueryWrapper qw = new QueryWrapper(); // 篩選時間欄位
qw.eq("appointment_time", "2022-02-26 07:00:00");
List<PatientInfo> patientInfoList = patientInfoService.list(qw);
// 製作隨機間隔時間
// TODO:隨機間隔時間可以更貼近現實
Random random = new Random();
LongStream longs = random.longs(patientInfoList.size(), 1000L, 3000L);
long[] randomTimes = longs.toArray();
List<PatientDto> patientDtoList = new LinkedList<>();
long startTime = System.currentTimeMillis(); // 實驗開始時間記錄
log.info("實驗開時間是: {}", startTime);
// 裝載dto物件,時間為當前時間加上隨機時間
for (int i = 0; i < patientInfoList.size(); i++) {
patientDtoList.add(new PatientDto(patientInfoList.get(i), startTime + randomTimes[i]));
}
// 開始非同步執行加入緩衝區操作
asyncService.timeSleepPut2Buffer(startTime, patientDtoList);
// 開始非同步執行計算並分配, 希望分配的時候同一時刻僅有一個在分配
asyncService.computeNextRoom();
// 開啟websocket傳送定時任務
asyncService.websocketScheduling();
return Result.success();
}
執行緒池配置及其他業務解釋略.
4. 結語
自己今天終於勉強客服拖延症,嘗試做五分鐘,然後真正投入狀態,另外不要給自己太大壓力,該睡覺睡覺,該娛樂娛樂,把控好學習、放鬆和健身的度。目前自己技術水平還是很菜,只能做一個拼合怪。基礎方面,資料結構演算法還沒怎麼刷題,作業系統網路幾乎沒看,java八股看了一點多執行緒也看不下去了。希望這周開始是個充實而平衡的一週。晚安。
5. 參考文章
1.深入理解 JUC:PriorityBlockingQueue
2.Java併發程式設計之PriorityBlockingQueue阻塞佇列詳解
3. csdn——PriorityBlockingQueue使用小結
4.生產者消費者模式——BlockingQueue
5.SpringBoot定時任務+自定義執行緒池
6.spring boot:使用多個執行緒池實現實現任務的執行緒池隔離(spring boot 2.3.2)