JAVA多執行緒(四) Executor併發框架向RabbitMQ推送訊息
阿新 • • 發佈:2018-11-06
github程式碼地址:https://github.com/showkawa/springBoot_2017/tree/master/spb-demo
假設一個需求使用者點選某個頁面,我們後臺需要向MQ推送信資訊
1,模擬的MQ服務,我這邊使用RabbitMQ (關於MQ 傳送和監聽訊息可以參考我的部落格:SpringBoot訊息中介軟體RabbitMQ)
//後臺監聽訊息
@RabbitListener(queues = "brian.test") public void receiveMessage(User user){ logger.info("接收到MQ的訊息體: " + user); }
2.向IOC容器中註冊一個ThreadPoolTaskExecutor例項
@Bean public ThreadPoolTaskExecutor brianThreadPool(){ ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //核心執行緒數 executor.setCorePoolSize(8); //最大執行緒數 executor.setMaxPoolSize(16); //佇列中最大的數executor.setQueueCapacity(8); //縣城名稱字首 executor.setThreadNamePrefix("brianThreadPool_"); //rejectionPolicy:當pool已經達到max的時候,如何處理新任務 //callerRuns:不在新執行緒中執行任務,而是由呼叫者所在的執行緒來執行 //對拒絕task的處理策略 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());//執行緒空閒後最大的存活時間 executor.setKeepAliveSeconds(60); //初始化載入 executor.initialize(); return executor; }
3.實現執行緒池併發推送訊息
/** * 多執行緒推送訊息到MQ服務 */ public void sendMessageByThredPool(User user) throws ExecutionException, InterruptedException { Future<String> future = executor.submit(() -> { sendMessageService.sendMessage("brian","mymq",user);
logger.info("執行緒 [ " + Thread.currentThread().getName() + " ] 推送訊息到MQ成功! " + new Date()); return Thread.currentThread().getName(); }); }
4. Controller層的呼叫
@PostMapping("/loop/sendMsg/userInfo") public ResponseEntity addUserInfo2MQ(@RequestBody User user) throws ExecutionException, InterruptedException { brianService.sendMessageByThredPool(user); return new ResponseEntity(user, HttpStatus.OK); }
5.利用postman 做壓力測試,測試介面
5.1 postman做loop壓力測試,需要單建立一個Collections來測試,並且當前Collections值允許放一個測試用例,比如我下面的loopSendUserInfo
5.2 設定測試規則
點選Preview ,可以預覽測試資料
6.檢視測試結果
6.1 rabbitmq
6.2 log中可以發現多個執行緒在推送訊息
部落格參考來源:1. 可取消的非同步任務——FutureTask用法及解析
2. 多執行緒併發執行任務,取結果歸集。終極總結:Future、FutureTask、CompletionService、CompletableFuture