1. 程式人生 > >Spring非同步傳送http請求

Spring非同步傳送http請求

背景

目前系統中,有個別的查詢比較慢,大概需要幾秒才能返回結果。
使用者查詢開始到返回結果到頁面,此處是一個同步的過程,如果做成非同步的能提高系統響應的效能。
最近發現servlet3.0有一個新的特性,新增HTTP請求的非同步處理,詳細請參考。
由於專案採用的SpringMVC做的,所以查看了下SpringMVC的資料,發現3.2版本對於非同步處理有良好的封裝。

初識@Async

一個簡單的demo
@Component //Component Service Controller在Spring 3.2中是等價的
class AsyncMethods{
    @Async //非同步標籤
public void testAsyn(){ long time = System.currentTimeMillis(); System.err.println(Thread.currentThread().getId()); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } System.err.println("\nasyn total time:"
+(System.currentTimeMillis()-time)); } }
@Service //Component Service Controller在Spring 3.2中是等價的
class ClassA{
    @Autowired
    AsyncMethods asyncMethods; // 例項的名字必須和類名完全一樣,然後首字母小寫

    public testAsync(){
        System.err.println(Thread.currentThread().getId());
        logger.info("enter time:"
+ System.currentTimeMillis()); asyncMethods.testAsyn() logger.info("leave time:" + System.currentTimeMillis()); } }
spring配置檔案
<?xml version="1.0" encoding="UTF-8"?>
<beans default-autowire="byName"
    xmlns="http://www.springframework.org/schema/beans"    
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context" 
    xmlns:task="http://www.springframework.org/schema/task"     
    xmlns:mvc="http://www.springframework.org/schema/mvc"
    xsi:schemaLocation="
       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd
       http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
       http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd
       ">
    <!--掃描專案例項化@Component,@Service,@Controller修飾的類-->
    <context:component-scan base-package="com.your_app" /> 
    <task:annotation-driven /> <!--加入註解驅動 允許@Async-->
</beans>

@Async的實現機制

以下引自Sping3.2.x文件

The Spring Framework provides abstractions for asynchronous execution and scheduling of tasks with theTaskExecutorand TaskScheduler interfaces.To use Servlet 3 async request processing, you need to update web.xml to version 3.0

web.xml 3.0才支援非同步。

以下是官方已經實現的全部7個TaskExecuter。Spring宣稱對於任何場景,這些TaskExecuter完全夠用了:
名字 特點
SimpleAsyncTaskExecutor 每次請求新開執行緒,沒有最大執行緒數設定
SyncTaskExecutor 不是非同步的執行緒
ConcurrentTaskExecutor 少用這個,多用ThreadPoolTaskExecutor
SimpleThreadPoolTaskExecutor 監聽Spring’s lifecycle callbacks,並且可以和Quartz的Component相容
ThreadPoolTaskExecutor 最常用。要求jdk版本大於等於5。可以在程式而不是xml裡修改執行緒池的配置
TimerTaskExecutor
WorkManagerTaskExecutor

使用ThreadPoolTaskExecutor(傳統方式)

官方demo:
比起從執行緒池取一個執行緒再執行, 你僅僅需要把你的Runnable類加入到佇列中,然後TaskExecutor用它內建的規則決定何時開始取一個執行緒並執行該Runnable類

先在xml中新增bean的配置:

<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
  <property name="corePoolSize" value="5" />
  <property name="maxPoolSize" value="10" />
  <property name="queueCapacity" value="25" />
</bean>

<bean id="taskExecutorExample" class="TaskExecutorExample">
  <constructor-arg ref="taskExecutor" />
</bean>
  配置解釋 
 int corePoolSize:執行緒池維護執行緒的最小數量. 
 int maximumPoolSize:執行緒池維護執行緒的最大數量. 
 long keepAliveTime:空閒執行緒的存活時間. 
 TimeUnit unit: 時間單位,現有納秒,微秒,毫秒,秒列舉值. 
 BlockingQueue<Runnable> workQueue:持有等待執行的任務佇列. 
 RejectedExecutionHandler handler: 用來拒絕一個任務的執行,有兩種情況會發生這種情況。 
     一是在execute方法中若addIfUnderMaximumPoolSize(command)為false,即執行緒池已經飽和; 
     二是在execute方法中, 發現runState!=RUNNING || poolSize == 0,即已經shutdown,
         就呼叫ensureQueuedTaskHandled(Runnable command),在該方法中有可能呼叫reject。

1. 當池子大小小於corePoolSize,就新建執行緒,並處理請求
2. 當池子大小等於corePoolSize,把請求放入workQueue中,池子裡的空閒執行緒就去workQueue中取任務並處理
3. 當workQueue放不下任務時,就新建執行緒入池,並處理請求,如果池子大小撐到了maximumPoolSize,就用RejectedExecutionHandler來做拒絕處理
4. 當池子的執行緒數大於corePoolSize時,多餘的執行緒會等待keepAliveTime長時間,如果無請求可處理就自行銷燬其會優先建立  CorePoolSiz 執行緒, 當繼續增加執行緒時,先放入Queue中,當 CorePoolSiz  和 Queue 都滿的時候,就增加建立新執行緒,當執行緒達到MaxPoolSize的時候,就會丟擲錯 誤 org.springframework.core.task.TaskRejectedException
5. 另外MaxPoolSize的設定如果比系統支援的執行緒數還要大時,會丟擲java.lang.OutOfMemoryError: unable to create new native thread 異常。

Reject策略預定義有四種:
(1)ThreadPoolExecutor.AbortPolicy策略,是預設的策略,處理程式遭到拒絕將丟擲執行時 RejectedExecutionException。
(2)ThreadPoolExecutor.CallerRunsPolicy策略 ,呼叫者的執行緒會執行該任務,如果執行器已關閉,則丟棄.
(3)ThreadPoolExecutor.DiscardPolicy策略,不能執行的任務將被丟棄.
(4)ThreadPoolExecutor.DiscardOldestPolicy策略,如果執行程式尚未關閉,則位於工作佇列頭部的任務將被刪除,然後重試執行程式(如果再次失敗,則重複此過程).

呼叫例項

import org.springframework.core.task.TaskExecutor;

public class TaskExecutorExample {

  private class MessagePrinterTask implements Runnable {

    private String message;

    public MessagePrinterTask(String message) {
      this.message = message;
    }

    public void run() {
      System.out.println(message);
    }

  }

  private TaskExecutor taskExecutor;

  public TaskExecutorExample(TaskExecutor taskExecutor) {
    this.taskExecutor = taskExecutor;
  }

  public void printMessages() {
    for(int i = 0; i < 25; i++) {
      taskExecutor.execute(new MessagePrinterTask("Message" + i));
    }
  }
}

推薦 - 使用ThreadPoolTaskExecutor(註解方式)

首先,為了以註解方式使用非同步功能,我們需要在Spring的xml配置中定義相關的bean:

In short, the context loaded by the ContextLoaderListener (generally from applicationContext.xml) is the parent of the context loaded by the DispatcherServlet (generally from -servlet.xml). If you have the bean with the @Async method declared/component-scanned in both contexts, the version from the child context (DispatcherServlet) will override the one in the parent context (ContextLoaderListener). I verified this by excluding that component from component scanning in the -servlet.xml – it now works as expected.

  1. 不推薦使用執行緒池版本

Specifies the Java.util.Executor instance to use when invoking asynchronous methods. If not provided, an instance of org.springframework.core.task.SimpleAsyncTaskExecutor will be used by default. Note that as of Spring 3.1.2, individual @Async methods may qualify which executor to use, meaning that the executor specified here acts as a default for all non-qualified @Async methods.

所以,如果我們僅僅新增,也可以使用@Async標籤。然而,此時使用的是SimpleAsyncTaskExecutor。如“官方文件27章:Task Execution”中所述,SimpleAsyncTaskExecutor不會使用執行緒池,僅僅是為每一個請求新開一個執行緒。這樣在大併發的業務場景下,發生OutOfMemory是不足為奇的。

<?xml version="1.0" encoding="UTF-8"?>
<!--Spring框架的xml標籤定義文件, 可訪問http://www.springframework.org/schema/task/檢視最新task元件的xml標籤文件-->
<beans xmlns:task="http://www.springframework.org/schema/task"
       xsi:schemaLocation="
                http://www.springframework.org/schema/task
                http://www.springframework.org/schema/task/spring-task-3.2.xsd">
    <!--掃描專案例項化@Component,@Service,@Controller修飾的類-->
    <context:component-scan base-package="com.your_app" /> 

    <!--create a SimpleAsyncTaskExecutor instance-->
    <task:annotation-driven/>
</beans>
  1. 推薦 - 使用執行緒池版本
<?xml version="1.0" encoding="UTF-8"?>
<!--Spring框架的xml標籤定義文件, 可訪問http://www.springframework.org/schema/task/檢視最新task元件的xml標籤文件-->
<beans xmlns:task="http://www.springframework.org/schema/task"
       xsi:schemaLocation="
                http://www.springframework.org/schema/task
                http://www.springframework.org/schema/task/spring-task-3.2.xsd">

    <!--掃描專案例項化@Component,@Service,@Controller修飾的類-->
    <context:component-scan base-package="com.your_app" />

    <!-- 在程式碼中@Async不加引數就會使用task:annotation-driven標籤定義的executor-->
    <task:annotation-driven executor="myExecutor"/>
    <!-- 在程式碼中@Async("myExecutor")可以顯式指定executor為"myExecutor"-->
    <task:executor id="myExecutor"
               pool-size="5-25"
               queue-capacity="100"
               rejection-policy="CALLER_RUNS"/>
</beans>

其中,注意到屬性pool-size的值”5-25”是一個範圍,這對應的是執行緒池的min和max容量,它們的意義請參考本文上一節的“配置說明”裡的第3、4點。如果只有一個值,如pool-size=n, 意味著minSize==maxSize==n

而關於rejection-policy,官方文件裡說:

By default, when a task is rejected, a thread pool executor will throw a TaskRejectedException. However, the rejection policy is actually configurable. The exception is thrown when using the default rejection policy which is the AbortPolicy implementation. For applications where some tasks can be skipped under heavy load, either the DiscardPolicy or DiscardOldestPolicy may be configured instead. Another option that works well for applications that need to throttle the submitted tasks under heavy load is the CallerRunsPolicy. Instead of throwing an exception or discarding tasks, that policy will simply force the thread that is calling the submit method to run the task itself. The idea is that such a caller will be busy while running that task and not able to submit other tasks immediately. Therefore it provides a simple way to throttle the incoming load while maintaining the limits of the thread pool and queue. Typically this allows the executor to “catch up” on the tasks it is handling and thereby frees up some capacity on the queue, in the pool, or both. Any of these options can be chosen from an enumeration of values available for the ‘rejection-policy’ attribute on the ‘executor’ element.

總結如下:

池滿時的拒絕策略 效果
AbortPolicy(預設) 拋異常
DiscardPolicy or DiscardOldestPolicy 放棄該執行緒
CallerRunsPolicy 通知該執行緒的建立者,讓其不要提交新的執行緒

如果使用的java當做配置檔案,即spring boot中使用

@Configuration  
@EnableAsync  
public class ExecutorConfig {  

    /** Set the ThreadPoolExecutor's core pool size. */  
    private int corePoolSize = 10;  
    /** Set the ThreadPoolExecutor's maximum pool size. */  
    private int maxPoolSize = 200;  
    /** Set the capacity for the ThreadPoolExecutor's BlockingQueue. */  
    private int queueCapacity = 10;  

    @Bean  
    public Executor mySimpleAsync() {  
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();  
        executor.setCorePoolSize(corePoolSize);  
        executor.setMaxPoolSize(maxPoolSize);  
        executor.setQueueCapacity(queueCapacity);  
        executor.setThreadNamePrefix("MySimpleExecutor-");  
        executor.initialize();  
        return executor;  
    }  

    @Bean  
    public Executor myAsync() {  
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();  
        executor.setCorePoolSize(corePoolSize);  
        executor.setMaxPoolSize(maxPoolSize);  
        executor.setQueueCapacity(queueCapacity);  
        executor.setThreadNamePrefix("MyExecutor-");  

        // rejection-policy:當pool已經達到max size的時候,如何處理新任務  
        // CALLER_RUNS:不在新執行緒中執行任務,而是有呼叫者所在的執行緒來執行  
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());  
        executor.initialize();  
        return executor;  
    }  
}  

這裡定義了兩個不同的Executor,第二個重新設定了pool已經達到max size時候的處理方法;同時指定了執行緒名字的字首。

自定義Executor的使用:

/** 
 * Asynchronous Tasks 
 * @author Xu 
 * 
 */  
@Component  
public class AsyncTask {  
    protected final Logger logger = LoggerFactory.getLogger(this.getClass());  

    @Async("mySimpleAsync")  
    public Future<String> doTask1() throws InterruptedException{  
        logger.info("Task1 started.");  
        long start = System.currentTimeMillis();  
        Thread.sleep(5000);  
        long end = System.currentTimeMillis();  

        logger.info("Task1 finished, time elapsed: {} ms.", end-start);  

        return new AsyncResult<>("Task1 accomplished!");  
    }  

    @Async("myAsync")  
    public Future<String> doTask2() throws InterruptedException{  
        logger.info("Task2 started.");  
        long start = System.currentTimeMillis();  
        Thread.sleep(3000);  
        long end = System.currentTimeMillis();  

        logger.info("Task2 finished, time elapsed: {} ms.", end-start);  

        return new AsyncResult<>("Task2 accomplished!");  
    }  
}  

就是把上面自定義Executor的類名,放進@Async註解中。

  1. @Async的修飾物件是方法而不是類
@Async
void doSomething(String s) { //可以帶引數!
    // this will be executed asynchronously
}

Spring的非同步是基於方法而不是類!
Spring的非同步是基於方法而不是類!
Spring的非同步是基於方法而不是類!

說實話,鄙人認為基於方法是Spring的最大優點。負責Http的攔截器@RequestMapping(“”)是基於方法的,非同步@Async也是基於方法的。不過也有兩個約束:

約束一 呼叫者和被@Async修飾的方法必須定義在兩個類中

@Component
class A{
    @Autowired
    B b;

    public void run(){
        b.doSomething();
    }
}
@Component
class B{
    @Async
    public void doSomething(){
    }
}

約束二 @Async和@PostConstruct不能同時在同一個類中使用. 分別寫在兩個類中,如下:

public class SampleBeanImpl implements SampleBean {

  @Async
  void doSomething() { … }
}


public class SampleBeanInititalizer {

  private final SampleBean bean;

  public SampleBeanInitializer(SampleBean bean) {
    this.bean = bean;
  }

  @PostConstruct
  public void initialize() {
    bean.doSomething();
  }
}