SpringMVC中非同步處理的幾種方式
關於非同步的好處我在這裡就不多說了,自從servlet3.1規範釋出以來,控制層的非同步處理也越來越多的被人提及。而Spring5的webflux誕生也意味著Spring全方位對非同步提供了支援。其實早在SpringMVC3.2版本就開始支援非同步了,那麼這篇文章我們就來探討一下SpringMVC使用非同步的方式。
一、DeferredResult
DeferredResult這個類代表延遲結果,我們先看一看spring的API文件給我們的解釋:
{@code DeferredResult} provides an alternative to using a {@link Callable} for asynchronous request processing. While a {@code Callable} is executed concurrently on behalf of the application, with a {@code DeferredResult} the application can produce the result from a thread of its choice.
根據文件說明DeferredResult
可以替代Callable
來進行非同步的請求處理。只不過這個類可以從其他執行緒裡拿到對應的結果。當使用DeferredResult
,我們可以將DefferedResult的型別並將其儲存到可以獲取到該物件的地方,比如說佇列或者集合當中,這樣方便其它執行緒能夠取到並設定DefferedResult
的值。
1.1、示例
我們先定義一個Controller,程式碼內容如下:
package com.bdqn.lyrk.ssm.study.web.controller; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.context.request.async.DeferredResult; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; /** * 非同步任務的控制器 * * @author chen.nie * @date 2018/8/2 **/ @RestController public class AsyncController { private BlockingQueue<DeferredResult<String>> blockingQueue = new ArrayBlockingQueue(1024); /** * 返回值是DeferredResult型別,如果沒有結果請求阻塞 * * @return */ @GetMapping("/quotes") public DeferredResult<String> quotes() { //指定超時時間,及出錯時返回的值 DeferredResult<String> result = new DeferredResult(3000L,"error"); blockingQueue.add(result); return result; } /** * 另外一個請求(新的執行緒)設定值 * * @throws InterruptedException */ @GetMapping("take") public void take() throws InterruptedException { DeferredResult<String> result = blockingQueue.take(); result.setResult("route"); } @GetMapping public Callable<String> callable() { return () -> "callable"; } }
控制器可以從不同的執行緒非同步生成返回值,例如響應外部事件(JMS訊息)、計劃任務等,那麼在這裡我先使用另外一個請求來模擬這個過程
此時我們啟動tomcat,先訪問地址http://localhost:8080/quotes ,此時我們會看到傳送的請求由於等待響應遭到了阻塞:
1.2、DeferredResult處理流程
根據官網描述:
DeferredResult processing:
- Controller returns a DeferredResult and saves it in some in-memory queue or list where it can be accessed.
- Spring MVC calls request.startAsync().
- Meanwhile the DispatcherServlet and all configured Filter’s exit the request processing thread but the response remains open.
- The application sets the DeferredResult from some thread and Spring MVC dispatches the request back to the Servlet container.
- The DispatcherServlet is invoked again and processing resumes with the asynchronously produced return value.
將Controller返回的DeferredResult
值儲存到記憶體佇列或集合當中,緊接著SpringMVC呼叫HttpServletRequest
的startAsync()
方法,與此同時DispatcherServlet
和所有配置的Filter
退出當前的請求執行緒(不過響應時開放的),當其他執行緒裡設定DeferredResult的值時將重新發送請求,此時DispatcherServlet使用非同步生成的返回值繼續處理。
在這裡一切的一切還需要通過原始碼來解釋:
- 當一個請求被
DispatcherServlet
處理時,會試著獲取一個WebAsyncManager
物件
protected void doDispatch(HttpServletRequest request, HttpServletResponse response) throws Exception {
HttpServletRequest processedRequest = request;
HandlerExecutionChain mappedHandler = null;
boolean multipartRequestParsed = false;
WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request);
try {
// ......省略部分程式碼
// 執行子控制器的方法
mv = ha.handle(processedRequest, response, mappedHandler.getHandler());
//如果當前的請求需要非同步處理,則終止當前請求,但是響應是開放的
if (asyncManager.isConcurrentHandlingStarted()) {
return;
}
//....省略部分程式碼
}
catch (Exception ex) {
triggerAfterCompletion(processedRequest, response, mappedHandler, ex);
}
catch (Throwable err) {
triggerAfterCompletion(processedRequest, response, mappedHandler,
new NestedServletException("Handler processing failed", err));
}
finally {
if (asyncManager.isConcurrentHandlingStarted()) {
// Instead of postHandle and afterCompletion
if (mappedHandler != null) {
mappedHandler.applyAfterConcurrentHandlingStarted(processedRequest, response);
}
}
else {
// Clean up any resources used by a multipart request.
if (multipartRequestParsed) {
cleanupMultipart(processedRequest);
}
}
}
}
- 對於每一個子控制器的方法返回值,都是
HandlerMethodReturnValueHandler
介面處理的,其中有一個實現類是DeferredResultMethodReturnValueHandler
,關鍵程式碼如下:
package org.springframework.web.servlet.mvc.method.annotation;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletionStage;
import java.util.function.BiFunction;
import org.springframework.core.MethodParameter;
import org.springframework.lang.UsesJava8;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.context.request.NativeWebRequest;
import org.springframework.web.context.request.async.DeferredResult;
import org.springframework.web.context.request.async.WebAsyncUtils;
import org.springframework.web.method.support.AsyncHandlerMethodReturnValueHandler;
import org.springframework.web.method.support.ModelAndViewContainer;
/**
* Handler for return values of type {@link DeferredResult}, {@link ListenableFuture},
* {@link CompletionStage} and any other async type with a {@link #getAdapterMap()
* registered adapter}.
*
* @author Rossen Stoyanchev
* @since 3.2
*/
@SuppressWarnings("deprecation")
public class DeferredResultMethodReturnValueHandler implements AsyncHandlerMethodReturnValueHandler {
//存放DeferredResult的適配集合
private final Map<Class<?>, DeferredResultAdapter> adapterMap;
public DeferredResultMethodReturnValueHandler() {
this.adapterMap = new HashMap<Class<?>, DeferredResultAdapter>(5);
this.adapterMap.put(DeferredResult.class, new SimpleDeferredResultAdapter());
this.adapterMap.put(ListenableFuture.class, new ListenableFutureAdapter());
if (ClassUtils.isPresent("java.util.concurrent.CompletionStage", getClass().getClassLoader())) {
this.adapterMap.put(CompletionStage.class, new CompletionStageAdapter());
}
}
/**
* Return the map with {@code DeferredResult} adapters.
* <p>By default the map contains adapters for {@code DeferredResult}, which
* simply downcasts, {@link ListenableFuture}, and {@link CompletionStage}.
* @return the map of adapters
* @deprecated in 4.3.8, see comments on {@link DeferredResultAdapter}
*/
@Deprecated
public Map<Class<?>, DeferredResultAdapter> getAdapterMap() {
return this.adapterMap;
}
private DeferredResultAdapter getAdapterFor(Class<?> type) {
for (Class<?> adapteeType : getAdapterMap().keySet()) {
if (adapteeType.isAssignableFrom(type)) {
return getAdapterMap().get(adapteeType);
}
}
return null;
}
@Override
public boolean supportsReturnType(MethodParameter returnType) {
return (getAdapterFor(returnType.getParameterType()) != null);
}
@Override
public boolean isAsyncReturnValue(Object returnValue, MethodParameter returnType) {
return (returnValue != null && (getAdapterFor(returnValue.getClass()) != null));
}
@Override
public void handleReturnValue(Object returnValue, MethodParameter returnType,
ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {
if (returnValue == null) {
mavContainer.setRequestHandled(true);
return;
}
//根據返回值的型別獲取對應的DeferredResult介面卡
DeferredResultAdapter adapter = getAdapterFor(returnValue.getClass());
if (adapter == null) {
throw new IllegalStateException(
"Could not find DeferredResultAdapter for return value type: " + returnValue.getClass());
}
DeferredResult<?> result = adapter.adaptToDeferredResult(returnValue);
//開啟非同步請求
WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing(result, mavContainer);
}
}
在這裡我們關注handleReturnValue
的方法,在經過適配包裝後獲取DeferredResult
開啟了非同步之旅
- 緊接著我們關注一下
WebAsyncManager
的startDeferredResultProcessing
方法
/**
* Start concurrent request processing and initialize the given
* {@link DeferredResult} with a {@link DeferredResultHandler} that saves
* the result and dispatches the request to resume processing of that
* result. The {@code AsyncWebRequest} is also updated with a completion
* handler that expires the {@code DeferredResult} and a timeout handler
* assuming the {@code DeferredResult} has a default timeout result.
* @param deferredResult the DeferredResult instance to initialize
* @param processingContext additional context to save that can be accessed
* via {@link #getConcurrentResultContext()}
* @throws Exception if concurrent processing failed to start
* @see #getConcurrentResult()
* @see #getConcurrentResultContext()
*/
public void startDeferredResultProcessing(
final DeferredResult<?> deferredResult, Object... processingContext) throws Exception {
Assert.notNull(deferredResult, "DeferredResult must not be null");
Assert.state(this.asyncWebRequest != null, "AsyncWebRequest must not be null");
//設定超時時間
Long timeout = deferredResult.getTimeoutValue();
if (timeout != null) {
this.asyncWebRequest.setTimeout(timeout);
}
//獲取所有的延遲結果攔截器
List<DeferredResultProcessingInterceptor> interceptors = new ArrayList<DeferredResultProcessingInterceptor>();
interceptors.add(deferredResult.getInterceptor());
interceptors.addAll(this.deferredResultInterceptors.values());
interceptors.add(timeoutDeferredResultInterceptor);
final DeferredResultInterceptorChain interceptorChain = new DeferredResultInterceptorChain(interceptors);
this.asyncWebRequest.addTimeoutHandler(new Runnable() {
@Override
public void run() {
try {
interceptorChain.triggerAfterTimeout(asyncWebRequest, deferredResult);
}
catch (Throwable ex) {
setConcurrentResultAndDispatch(ex);
}
}
});
this.asyncWebRequest.addCompletionHandler(new Runnable() {
@Override
public void run() {
interceptorChain.triggerAfterCompletion(asyncWebRequest, deferredResult);
}
});
interceptorChain.applyBeforeConcurrentHandling(this.asyncWebRequest, deferredResult);
//開始非同步處理
startAsyncProcessing(processingContext);
try {
interceptorChain.applyPreProcess(this.asyncWebRequest, deferredResult);
deferredResult.setResultHandler(new DeferredResultHandler() {
@Override
public void handleResult(Object result) {
result = interceptorChain.applyPostProcess(asyncWebRequest, deferredResult, result);
//設定結果並轉發
setConcurrentResultAndDispatch(result);
}
});
}
catch (Throwable ex) {
setConcurrentResultAndDispatch(ex);
}
}
private void startAsyncProcessing(Object[] processingContext) {
clearConcurrentResult();
this.concurrentResultContext = processingContext;
//實際上是執行的是HttpServletRequest對應方法
this.asyncWebRequest.startAsync();
if (logger.isDebugEnabled()) {
HttpServletRequest request = this.asyncWebRequest.getNativeRequest(HttpServletRequest.class);
String requestUri = urlPathHelper.getRequestUri(request);
logger.debug("Concurrent handling starting for " + request.getMethod() + " [" + requestUri + "]");
}
}
在這裡首先收集所有配置好的DeferredResultProcessingInterceptor
,然後設定asyncRequest的超時處理,完成時的處理等,同時會分階段執行攔截器中的各個方法。在這裡真的佩服Spring框架的擴充套件機制做的實在是太好了。最後我們關注一下如下程式碼:
deferredResult.setResultHandler(new DeferredResultHandler() {
@Override
public void handleResult(Object result) {
result = interceptorChain.applyPostProcess(asyncWebRequest, deferredResult, result);
//設定結果並轉發
setConcurrentResultAndDispatch(result);
}
});
其最終還是要呼叫AsyncWebRequest
介面中的dispatch
方法進行轉發,讓DispatcherServlet
重新處理非同步結果:
/**
* Dispatch the request to the container in order to resume processing after
* concurrent execution in an application thread.
*/
void dispatch();
其實在這裡都是封裝自HttpServletRequest
的非同步操作,我們可以看一下StandardServletAsyncWebRequest
的類結構圖:
我們可以在其父類ServletRequestAttributes
裡找到對應的實現:
private final HttpServletRequest request;
/**
* Exposes the native {@link HttpServletRequest} that we're wrapping.
*/
public final HttpServletRequest getRequest() {
return this.request;
}
最後我在貼出一段StandardServletAsyncWebRequest
程式碼,大家就應該知道整個非同步是怎麼執行的了:
//java.servlet.AsnycContext
private AsyncContext asyncContext;
@Override
public void startAsync() {
Assert.state(getRequest().isAsyncSupported(),
"Async support must be enabled on a servlet and for all filters involved " +
"in async request processing. This is done in Java code using the Servlet API " +
"or by adding \"<async-supported>true</async-supported>\" to servlet and " +
"filter declarations in web.xml.");
Assert.state(!isAsyncComplete(), "Async processing has already completed");
if (isAsyncStarted()) {
return;
}
this.asyncContext = getRequest().startAsync(getRequest(), getResponse());
this.asyncContext.addListener(this);
if (this.timeout != null) {
this.asyncContext.setTimeout(this.timeout);
}
}
@Override
public void dispatch() {
Assert.notNull(this.asyncContext, "Cannot dispatch without an AsyncContext");
this.asyncContext.dispatch();
}
二、使用Callable作為返回值
使用Callable
作為返回值來實現非同步與DeferredResult
類似,我們先看一看官網描述的具體流程:
Callable processing:
- Controller returns a Callable.
- Spring MVC calls request.startAsync() and submits the Callable to a TaskExecutor for processing in a separate thread.
- Meanwhile the DispatcherServlet and all Filter’s exit the Servlet container thread but the response remains open.
- Eventually the Callable produces a result and Spring MVC dispatches the request back to the Servlet container to complete processing.
- The DispatcherServlet is invoked again and processing resumes with the asynchronously produced return value from the Callable.
流程上大體與DeferredResult
類似,只不過Callable
是由TaskExecutor
來處理的,而TaskExecutor
繼承自java.util.concurrent.Executor
。我們來看一下它的原始碼,它也是在WebAysncManager
中處理的: