1.1 Spring 執行緒池 --- ThreadPoolTaskExecutor
阿新 • • 發佈:2018-11-26
Spring 擅長對元件的封裝和整合, Spring-context對JDK的併發包做了功能增強。
step 1 :Spring-context.xml 中增加如下程式碼
<bean id="poolTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <!-- 核心執行緒數,預設為1 --> <property name="corePoolSize" value="5" /> <!-- 最大執行緒數,預設為Integer.MAX_VALUE --> <property name="maxPoolSize" value="50" /> <!-- 佇列最大長度,一般需要設定值>=notifyScheduledMainExecutor.maxNum;預設為Integer.MAX_VALUE --> <property name="queueCapacity" value="2000" /> <!-- 執行緒池維護執行緒所允許的空閒時間,預設為60s --> <property name="keepAliveSeconds" value="100" /> <!-- 執行緒池對拒絕任務(無執行緒可用)的處理策略,目前只支援AbortPolicy、CallerRunsPolicy;預設為後者 --> <property name="rejectedExecutionHandler"> <!-- AbortPolicy:直接丟擲java.util.concurrent.RejectedExecutionException異常 --> <!-- CallerRunsPolicy:主執行緒直接執行該任務,執行完之後嘗試新增下一個任務到執行緒池中,可以有效降低向執行緒池內新增任務的速度 --> <!-- DiscardOldestPolicy:拋棄舊的任務、暫不支援;會導致被丟棄的任務無法再次被執行 --> <!-- DiscardPolicy:拋棄當前任務、暫不支援;會導致被丟棄的任務無法再次被執行 --> <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" /> </property> </bean>
step 2 :執行緒池使用
申明和使用
程式碼
@Autowired @Qualifier(value = "poolTaskExecutor") private ThreadPoolTaskExecutor threadPoolTaskExecutor; @RequestMapping(value = "Info", method = RequestMethod.POST) public String passerbyInfo(@RequestBody String json) { threadPoolTaskExecutor.execute(new Runnable() { @Override public void run() { log.info("-----開啟執行緒-----"); // Todo } }); return json; }
以上asyncTaskExecutor,你可以注入到任何一個bean去執行,底層使用JDK的ThreadPoolTaskExecutor來管理執行緒,預設使用的是JDK的執行緒池.
以上只是簡單的應用,非常方便的開發,我們都不用去處理執行緒池的初始化,以及執行緒的管理。
功能 1 :Spring 對執行緒的監聽--成功 / 失敗
ListenableFutureTask<String> f1 = (ListenableFutureTask<String>) asyncTaskExecutor.submitListenable(c1); f1.addCallback(new ListenableFutureCallback<String>() { @Override public void onSuccess(String result) { //成功時 TODO } @Override public void onFailure(Throwable t) { // 失敗時 TODO Auto-generated method stub t.printStackTrace(); } });
功能 2 :擴充套件FutureTask的protected方法
package org.springframework.util.concurrent;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class ListenableFutureTask<T> extends FutureTask<T> implements ListenableFuture<T> {
private final ListenableFutureCallbackRegistry<T> callbacks = new ListenableFutureCallbackRegistry<T>();
/**
* Create a new {@code ListenableFutureTask} that will, upon running,
* execute the given {@link Callable}.
* @param callable the callable task
*/
public ListenableFutureTask(Callable<T> callable) {
super(callable);
}
/**
* Create a {@code ListenableFutureTask} that will, upon running,
* execute the given {@link Runnable}, and arrange that {@link #get()}
* will return the given result on successful completion.
* @param runnable the runnable task
* @param result the result to return on successful completion
*/
public ListenableFutureTask(Runnable runnable, T result) {
super(runnable, result);
}
@Override
public void addCallback(ListenableFutureCallback<? super T> callback) {
this.callbacks.addCallback(callback);
}
@Override
protected final void done() {
Throwable cause;
try {
T result = get();
this.callbacks.success(result);
return;
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
return;
}
catch (ExecutionException ex) {
cause = ex.getCause();
if (cause == null) {
cause = ex;
}
}
catch (Throwable ex) {
cause = ex;
}
this.callbacks.failure(cause);
}
}