1. 程式人生 > 程式設計 >Spring-MVC非同步請求之Servlet非同步處理

Spring-MVC非同步請求之Servlet非同步處理

在Servlet3.0的規範中新增了對非同步請求的支援,SpringMVC又在此基礎上對非同步請求提供了方便。

非同步請求是在處理比較耗時的業務時先將request返回,然後另起執行緒處理耗時的業務,處理完後在返回給使用者。

非同步請求可以給我們帶來很多方便,最直接的用法就是處理耗時的業務,比如,需要查詢資料庫,需要呼叫別的伺服器來處理等情況下可以先將請求返回給客戶端,然後啟用新執行緒處理耗時業務。

如果我們合適的擴充套件可以實現訂閱者模式的訊息訂閱功能,比如,當有異常情況發生時可以主動將相關資訊傳送給運維人員,還有現在的很多郵箱自動回覆都是使用這種技術。

Http協議是單向的,只能客戶端自己拉不能伺服器主動推,Servlet對非同步請求的支援並沒有修改Http,而是對Http的巧妙利用。非同步請求的核心原理主要分為兩大類,一類是輪詢,另一類是長連線。

輪詢就是定時自動發起請求檢查有沒有需要返回的資料,這種對資源浪費比較大。長連線的原理是客戶端發起請求,服務端處理並返回後並不結束連線,這樣就可以在後面再次返回給客戶端資料。

Servlet對非同步請求的支援其實採用的是長連線的方式,也就是說,非同步請求中在原始的請求返回的時候並沒有關閉連線,關閉的只是處理請求的那個縣城,只有在非同步請求全部處理完之後才會關閉連線。

Servlet3.0對非同步請求的支援

在Servlet3.0規範中使用非同步處理請求非常簡單,只需要在請求處理過程中呼叫request的startAsync返回AsyncContext。

什麼是AsyncContext在非同步請求中充當著非常重要的角色,可以稱為非同步請求上下文也可以稱為非同步請求容器。類似於ServletContext.我們多次呼叫startAsync都是返回的同一個AsyncContext。程式碼如下:

public interface AsyncContext {
  String ASYNC_REQUEST_URI = "javax.servlet.async.request_uri";
  String ASYNC_CONTEXT_PATH = "javax.servlet.async.context_path";
  String ASYNC_PATH_INFO = "javax.servlet.async.path_info";
  String ASYNC_SERVLET_PATH = "javax.servlet.async.servlet_path";
  String ASYNC_QUERY_STRING = "javax.servlet.async.query_string";
  ServletRequest getRequest();
  ServletResponse getResponse();
  boolean hasOriginalRequestAndResponse();
  void dispatch();
  void dispatch(String var1);
  void dispatch(ServletContext var1,String var2);
  void complete();
  void start(Runnable var1);
  void addListener(AsyncListener var1);
  void addListener(AsyncListener var1,ServletRequest var2,ServletResponse var3);
  <T extends AsyncListener> T createListener(Class<T> var1) throws ServletException;
  void setTimeout(long var1);
  long getTimeout();
}

getResponse() 用於獲取response。dispatch用於分發新地址。complete用於通知容器已經處理完了,start方法用於啟動實際處理執行緒,addListener用於新增監聽器;setTimeout方法用於修改超時時間。

Servlet3.0處理非同步請求例項

@WebServlet(
name = “WorkServlet”,urlPatterns = “/work”,asyncSupported = true
)
public class WorkServlet extends HttpServlet {
private static final long serialVersionUID =1L;

@Override
protected void doGet(HttpServletRequest req,HttpServletResponse resp) throws ServletException,IOException {
  this.doPost(req,resp);
}

@Override
protected void doPost(HttpServletRequest req,IOException {
  //設定ContentType,關閉快取
  resp.setContentType("text/plain;charset=UTF-8");
  resp.setHeader("Cache-Control","private");
  resp.setHeader("Pragma","no-cache");
  final PrintWriter writer= resp.getWriter();
  writer.println("老師檢查作業了");
  writer.flush();
  List<String> zuoyes=new ArrayList<String>();
  for (int i = 0; i < 10; i++) {
    zuoyes.add("zuoye"+i);;
  }
  final AsyncContext ac=req.startAsync();//開啟非同步請求
  doZuoye(ac,zuoyes);
  writer.println("老師佈置作業");
  writer.flush();
}

private void doZuoye(final AsyncContext ac,final List<String> zuoyes) {
  ac.setTimeout(1*60*60*1000L);
  ac.start(new Runnable() {
    @Override
    public void run() {
      //通過response獲得字元輸出流
      try {
        PrintWriter writer=ac.getResponse().getWriter();
        for (String zuoye:zuoyes) {
          writer.println("\""+zuoye+"\"請求處理中");
          Thread.sleep(1*1000L);
          writer.flush();
        }
        ac.complete();
      } catch (Exception e) {
        e.printStackTrace();
      }
    }
  });
}

Spring-MVC非同步請求之Servlet非同步處理

非同步請求監聽器

在上面的程式是我們最基本的非同步請求,不過不夠完善。老師是需要思考巨集觀問題,所以在寫完作業之後需要給老師彙報哪些題難,哪些題目有問題或者自己的這次經驗總結,不過這些事不應該由做作業的學生來做,應該由專門的學習彙報員來統計分析。所以就有了監聽器。

public class TeacherListener implements AsyncListener {
  final SimpleDateFormat formatter=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  @Override
  public void onComplete(AsyncEvent event) throws IOException {
    System.out.println("在"+formatter.format(new Date())+"工作處理完成");
  }

  @Override
  public void onTimeout(AsyncEvent event) throws IOException {
    System.out.println("在"+formatter.format(new Date())+"工作超時");
  }

  @Override
  public void onError(AsyncEvent event) throws IOException {
    System.out.println("在"+formatter.format(new Date())+"工作處理錯誤");
  }

  @Override
  public void onStartAsync(AsyncEvent event) throws IOException {
    System.out.println("在"+formatter.format(new Date())+"工作處理開始");
  }
}

所有程式碼具體參照github地址

https://github.com/lzggsimida123/ServletAsync

補充:SpringMVC對Servlet3非同步請求的支援

SpringMVC對Servlet3非同步請求的支援有兩種方式,分別是通過處理器方法返回Callable和DeferredResult。

按照Servlet3的規範,支援非同步請求時需要配置對應的Servlet和Filter支援非同步請求,為了使SpringMVC支援非同步請求的處理,需要在定義DispatcherServlet時配置其支援非同步請求,在DispatcherServlet之前定義的Filter也需要配置支援非同步請求。

<servlet>
  <servlet-name>springmvc</servlet-name>
  <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
  <init-param>
    <param-name>contextConfigLocation</param-name>
    <param-value>/WEB-INF/applicationContext-mvc.xml</param-value>
  </init-param>
  <load-on-startup>1</load-on-startup>
  <!-- 啟用非同步支援 -->
  <async-supported>true</async-supported>
</servlet>
<servlet-mapping>
  <servlet-name>springmvc</servlet-name>
  <url-pattern>/</url-pattern>
</servlet-mapping>

返回Callable

當處理器的返回方法是Callable型別時會預設發起非同步請求,並使用一個TaskExecutor來呼叫返回的Callable,之後的處理就跟正常的SpringMVC請求是一樣的。Callable的返回結果也跟正常請求SpringMVC的一樣,可以返回Model、ModelAndView、String、Object等,也可以結合@ResponseBody使用,具體可以參考CallableMethodReturnValueHandler的handleReturnValue()。

  @RequestMapping("/callable")
  public Callable<String> forCallable(Model model) throws Exception {
    return () -> {
      TimeUnit.SECONDS.sleep(1);//睡眠1秒,模仿某些業務操作
      model.addAttribute("a","aaaaaaa");
      return "async_request_callable";
    };
  }

如果需要針對於單個Callable請求指定超時時間,我們可以把Callable用一個WebAsyncTask包裹起來。然後還可以指定超時回撥和正常處理完成的回撥。

 @RequestMapping("/callable/timeout")
  public WebAsyncTask<String> forCallableWithTimeout(Model model) throws Exception {
    long timeout = 5 * 1000L;
    WebAsyncTask<String> asyncTask = new WebAsyncTask<>(timeout,() -> {
      TimeUnit.MILLISECONDS.sleep(timeout + 10);
      model.addAttribute("a","aaaaaaa");
      return "async_request_callable";
    });
    asyncTask.onTimeout(() -> {
      System.out.println("響應超時回撥");
      return "async_request_callable_timeout";
    });
    asyncTask.onCompletion(() -> {
      System.out.println("響應callable呼叫完成的回撥");
    });
    return asyncTask;
  }

返回DeferredResult

使用DeferredResult的返回結果的程式設計通常是在處理器方法中建立一個DeferredResult例項,把它儲存起來後再進行返回,比如儲存到一個佇列中,然後在另外的一個執行緒中會從這個佇列中拿到相應的DeferredResult物件進行相應的業務處理後會往DeferredResult中設定對應的返回值。返回了DeferredResult後SpringMVC將建立一個DeferredResultHandler用於監聽DeferredResult,一旦DeferredResult中設定了返回值後,DeferredResultHandler就將對返回值進行處理。DeferredResult的處理過程見DeferredResultMethodReturnValueHandler的handleReturnValue()。

@RequestMapping("/deferredresult")
public DeferredResult<String> forDeferredResult() throws Exception {
  DeferredResult<String> result = new DeferredResult<>();
  new Thread(() -> {
    try {
      TimeUnit.SECONDS.sleep(2);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    result.setResult("async_request_deferredresult");
  }).start();
  return result;
}

對於DeferredResult也是可以單獨指定超時時間和超時後的回撥的,它的超時時間可以直接通過建構函式傳遞,單位是毫秒。

@RequestMapping("/deferredresult/timeout")
public DeferredResult<String> forDeferredResultWithTimeout() throws Exception {
  DeferredResult<String> result = new DeferredResult<>(10 * 1000);
  new Thread(() -> {
    try {
      TimeUnit.SECONDS.sleep(31);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    result.setResult("async_request_deferredresult");
  }).start();
  
  result.onTimeout(() -> {
    System.out.println("響應超時回撥函式");
  });
  
  result.onCompletion(() -> {
    System.out.println("響應完成的回撥函式");
  });
  
  return result;
}

配置

可以通過<mvc:annotation-driven/>的子元素<mvc:async-support/>來定義處理非同步請求預設的超時時間和需要使用的TaskExecutor。如果不指定預設超時時間則預設會使用容器的非同步請求超時時間,如果不指定需要使用的TaskExecutor,則預設會使用一個SimpleAsyncTaskExecutor。在下面的配置中我們就配置了預設的超時時間是15秒,且處理非同步請求的TaskExecutor是bean容器中名為asyncTaskExecutor的TaskExecutor。

<mvc:annotation-driven>
  <mvc:async-support default-timeout="15000" task-executor="asyncTaskExecutor"/>
</mvc:annotation-driven>

攔截器

返回Callable型別的請求可以通過實現CallableProcessingInterceptor介面自定義一個攔截器來攔截,也可以通過繼承CallableProcessingInterceptorAdapter抽象類來定義攔截器,這樣就只需要選擇自己感興趣的方法進行實現。CallableProcessingInterceptor介面定義如下:

public interface CallableProcessingInterceptor { 
	static final Object RESULT_NONE = new Object(); 
	static final Object RESPONSE_HANDLED = new Object();
 
	/**
	 * Invoked <em>before</em> the start of concurrent handling in the original
	 * thread in which the {@code Callable} is submitted for concurrent handling.
	 *
	 * <p>
	 * This is useful for capturing the state of the current thread just prior to
	 * invoking the {@link Callable}. Once the state is captured,it can then be
	 * transferred to the new {@link Thread} in
	 * {@link #preProcess(NativeWebRequest,Callable)}. Capturing the state of
	 * Spring Security's SecurityContextHolder and migrating it to the new Thread
	 * is a concrete example of where this is useful.
	 * </p>
	 *
	 * @param request the current request
	 * @param task the task for the current async request
	 * @throws Exception in case of errors
	 */
	<T> void beforeConcurrentHandling(NativeWebRequest request,Callable<T> task) throws Exception;
 
	/**
	 * Invoked <em>after</em> the start of concurrent handling in the async
	 * thread in which the {@code Callable} is executed and <em>before</em> the
	 * actual invocation of the {@code Callable}.
	 *
	 * @param request the current request
	 * @param task the task for the current async request
	 * @throws Exception in case of errors
	 */
	<T> void preProcess(NativeWebRequest request,Callable<T> task) throws Exception;
 
	/**
	 * Invoked <em>after</em> the {@code Callable} has produced a result in the
	 * async thread in which the {@code Callable} is executed. This method may
	 * be invoked later than {@code afterTimeout} or {@code afterCompletion}
	 * depending on when the {@code Callable} finishes processing.
	 *
	 * @param request the current request
	 * @param task the task for the current async request
	 * @param concurrentResult the result of concurrent processing,which could
	 * be a {@link Throwable} if the {@code Callable} raised an exception
	 * @throws Exception in case of errors
	 */
	<T> void postProcess(NativeWebRequest request,Callable<T> task,Object concurrentResult) throws Exception;
 
	/**
	 * Invoked from a container thread when the async request times out before
	 * the {@code Callable} task completes. Implementations may return a value,* including an {@link Exception},to use instead of the value the
	 * {@link Callable} did not return in time.
	 *
	 * @param request the current request
	 * @param task the task for the current async request
	 * @return a concurrent result value; if the value is anything other than
	 * {@link #RESULT_NONE} or {@link #RESPONSE_HANDLED},concurrent processing
	 * is resumed and subsequent interceptors are not invoked
	 * @throws Exception in case of errors
	 */
	<T> Object handleTimeout(NativeWebRequest request,Callable<T> task) throws Exception;
 
	/**
	 * Invoked from a container thread when async processing completes for any
	 * reason including timeout or network error.
	 *
	 * @param request the current request
	 * @param task the task for the current async request
	 * @throws Exception in case of errors
	 */
	<T> void afterCompletion(NativeWebRequest request,Callable<T> task) throws Exception;
 
}

它的配置是通過<mvc:callable-interceptors/>配置的。

<mvc:annotation-driven>
  <mvc:async-support default-timeout="15000" task-executor="asyncTaskExecutor">
    <mvc:callable-interceptors>
      <bean class="YourCallableProcessingInterceptor"/>
    </mvc:callable-interceptors>
  </mvc:async-support>
</mvc:annotation-driven>

返回DeferredResult的也可以進行攔截,這需要我們實現DeferredResultProcessingInterceptor介面或者繼承自DeferredResultProcessingInterceptorAdapter。DeferredResultProcessingInterceptor介面定義如下:

public interface DeferredResultProcessingInterceptor {
 
	/**
	 * Invoked immediately before the start of concurrent handling,in the same
	 * thread that started it. This method may be used to capture state just prior
	 * to the start of concurrent processing with the given {@code DeferredResult}.
	 *
	 * @param request the current request
	 * @param deferredResult the DeferredResult for the current request
	 * @throws Exception in case of errors
	 */
	<T> void beforeConcurrentHandling(NativeWebRequest request,DeferredResult<T> deferredResult) throws Exception;
 
	/**
	 * Invoked immediately after the start of concurrent handling,in the same
	 * thread that started it. This method may be used to detect the start of
	 * concurrent processing with the given {@code DeferredResult}.
	 *
	 * <p>The {@code DeferredResult} may have already been set,for example at
	 * the time of its creation or by another thread.
	 *
	 * @param request the current request
	 * @param deferredResult the DeferredResult for the current request
	 * @throws Exception in case of errors
	 */
	<T> void preProcess(NativeWebRequest request,DeferredResult<T> deferredResult) throws Exception;
 
	/**
	 * Invoked after a {@code DeferredResult} has been set,via
	 * {@link DeferredResult#setResult(Object)} or
	 * {@link DeferredResult#setErrorResult(Object)},and is also ready to
	 * handle the concurrent result.
	 *
	 * <p>This method may also be invoked after a timeout when the
	 * {@code DeferredResult} was created with a constructor accepting a default
	 * timeout result.
	 *
	 * @param request the current request
	 * @param deferredResult the DeferredResult for the current request
	 * @param concurrentResult the result to which the {@code DeferredResult}
	 * @throws Exception in case of errors
	 */
	<T> void postProcess(NativeWebRequest request,DeferredResult<T> deferredResult,Object concurrentResult) throws Exception;
 
	/**
	 * Invoked from a container thread when an async request times out before
	 * the {@code DeferredResult} has been set. Implementations may invoke
	 * {@link DeferredResult#setResult(Object) setResult} or
	 * {@link DeferredResult#setErrorResult(Object) setErrorResult} to resume processing.
	 *
	 * @param request the current request
	 * @param deferredResult the DeferredResult for the current request; if the
	 * {@code DeferredResult} is set,then concurrent processing is resumed and
	 * subsequent interceptors are not invoked
	 * @return {@code true} if processing should continue,or {@code false} if
	 * other interceptors should not be invoked
	 * @throws Exception in case of errors
	 */
	<T> boolean handleTimeout(NativeWebRequest request,DeferredResult<T> deferredResult) throws Exception;
 
	/**
	 * Invoked from a container thread when an async request completed for any
	 * reason including timeout and network error. This method is useful for
	 * detecting that a {@code DeferredResult} instance is no longer usable.
	 *
	 * @param request the current request
	 * @param deferredResult the DeferredResult for the current request
	 * @throws Exception in case of errors
	 */
	<T> void afterCompletion(NativeWebRequest request,DeferredResult<T> deferredResult) throws Exception;
 
}

自定義的DeferredResultProcessingInterceptor是通過<mvc:deferred-result-interceptors>配置的。

<mvc:annotation-driven>
  <mvc:async-support default-timeout="15000" task-executor="asyncTaskExecutor">
    <mvc:deferred-result-interceptors>
      <bean class="YourDeferredResultProcessingInterceptor"/>
    </mvc:deferred-result-interceptors>
  </mvc:async-support>
</mvc:annotation-driven>

當發起非同步請求時,SpringMVC傳統的HandlerInterceptor的postHandle()和afterCompletion()不會執行,但是等非同步請求結束後它們還是會執行的。如果需要在非同步處理完成之後做一些事情,也可以選擇實現AsyncHandlerInterceptor介面的afterConcurrentHandlingStarted(),AsyncHandlerInterceptor介面繼承了HandlerInterceptor。

(注:本文是基於Spring4.1.0所寫)

以上為個人經驗,希望能給大家一個參考,也希望大家多多支援我們。如有錯誤或未考慮完全的地方,望不吝賜教。