DeferredResult非同步處理spring mvc Demo
阿新 • • 發佈:2020-11-27
一、概述
spring mvc同步介面在請求處理過程中一直處於阻塞狀態,而非同步介面可以啟用後臺執行緒去處理耗時任務。簡單來說適用場景:
1.高併發;
2.高IO耗時操作。
二、Demo
Spring MVC3.2之後支援非同步請求,能夠在controller中返回一個Callable或者DeferredResult。
1.Callable例項
@Controller public class CallableController { @RequestMapping(path = "/async1", method = RequestMethod.GET) @ResponseBody public Callable<String> asyncRequest() { return () -> { final long currentThread = Thread.currentThread().getId(); final Date requestProcessingStarted = new Date(); Thread.sleep(6000L); final Date requestProcessingFinished = new Date(); return String.format( "request: [threadId: %s, started: %s - finished: %s]" , currentThread, requestProcessingStarted, requestProcessingFinished); }; } }
2.DeferredResult使用方式與Callable類似,但在返回結果上不一樣,它返回的時候實際結果可能沒有生成,實際的結果可能會在另外的執行緒裡面設定到DeferredResult中去,能實現更加複雜的業務場景。
@Controller public class DeferredResultController { private Map<Integer, DeferredResult<String>> deferredResultMap = new HashMap<>(); @ResponseBody @GetMapping("/get") public DeferredResult<String> getId(@RequestParam Integer id) throws Exception { System.out.println("start hello"); DeferredResult<String> deferredResult = new DeferredResult<>(); //先存起來,等待觸發 deferredResultMap.put(id, deferredResult); System.out.println("end hello"); return deferredResult; } @ResponseBody @GetMapping("/set") public void setId(@RequestParam Integer id) throws Exception { // 讓所有hold住的請求給與響應 if (deferredResultMap.containsKey(id)) { deferredResultMap.get(id).setResult("hello " + id); } } }
當從瀏覽器請求http://localhost:8080/get/1時,頁面處於等待狀態;當訪問http://localhost:8080/set/1,前面的頁面會返回"hello 1"。
處理過程:
- controller 返回一個DeferredResult,我們把它儲存到記憶體裡或者List裡面(供後續訪問)
- Spring MVC呼叫request.startAsync(),開啟非同步處理
- 與此同時將DispatcherServlet裡的攔截器、Filter等等都馬上退出主執行緒,但是response仍然保持開啟的狀態
- 應用通過另外一個執行緒(可能是MQ訊息、定時任務等)給DeferredResult set值。然後Spring MVC會把這個請求再次派發給servlet容器
- DispatcherServlet再次被呼叫,然後處理後續的標準流程
3.模擬場景:介面接收請求,推送到佇列receiveQueue,後臺執行緒處理完成後推送到resultQueue,監聽器監聽resultQueue將結果賦值給DeferredResult,介面響應結果。
首先定義類Task:
public class Task<T> {
private DeferredResult<String> result;
private T message;
private Boolean isTimeout;
定義MockQueue,用於管理佇列及處理資料:
@Component
public class MockQueue {
/**
* 接收佇列
*/
private BlockingQueue<Task<String>> receiveQueue = new LinkedBlockingDeque<>(5000);
/**
* 結果佇列
*/
private BlockingQueue<Task<String>> resultQueue = new LinkedBlockingDeque<>(5000);
public MockQueue() {
this.run();
}
/**
* 接收task
*
* @param task task實體
* @throws InterruptedException
*/
public void put(Task<String> task) throws InterruptedException {
receiveQueue.put(task);
}
/**
* 獲取結果
*
* @return
* @throws InterruptedException
*/
public Task<String> get() throws InterruptedException {
return resultQueue.take();
}
private void run() {
new Thread(() -> {
while (true) {
try {
Task<String> task = receiveQueue.take();
System.out.println("receive data,start process!");
Thread.sleep(1000);
task.setMessage("success");
//任務超時,跳過
if (task.getIsTimeout()) {
continue;
}
resultQueue.put(task);
System.out.println("process done!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
然後實現Controller非同步介面:
@Controller
public class DeferredResultQueueController {
@Autowired
MockQueue queue;
@ResponseBody
@GetMapping("/test")
public DeferredResult<String> test(@RequestParam Integer id) throws InterruptedException {
System.out.println("start test");
DeferredResult<String> deferredResult = new DeferredResult<>();
Task<String> task = new Task<>(deferredResult, "任務", false);
deferredResult.onTimeout(() -> {
System.out.println("任務超時 id=" + id);
task.setMessage("任務超時");
task.setIsTimeout(true);
});
queue.put(task);
return deferredResult;
}
}
最後定義監聽器,將resultQueue的結果寫入DeferredResult。
@Component
public class QueueResultListener implements ApplicationListener<ContextRefreshedEvent> {
@Autowired
MockQueue mockQueue;
@Override
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
new Thread(() -> {
try {
Task<String> task = mockQueue.get();
task.getResult().setResult(task.getMessage());
System.out.println("監聽器獲取到結果:task=" + task);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
三、參考資料
https://www.baeldung.com/spring-deferred-result
https://cloud.tencent.com/developer/article/1497796
https://zhuanlan.zhihu.com/p/31223106