多執行緒異常 和 事務(二)
阿新 • • 發佈:2019-01-24
1.接著上一篇程式碼變形一下
首先我們在上一篇文章的基礎上把程式碼變形的面向物件一些
package com.wei.service.impl; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import com.google.common.util.concurrent.ThreadFactoryBuilder; public class MultiThreadExceptionAndReturnValSHow { public static void main(String[] args) { final ThreadFactory threadFactory = new ThreadFactoryBuilder() .setNameFormat("Orders-%d-Thread:").build(); int minPoolSize = 5; int maxPoolSize = 10; //SynchronousQueue同步佇列容量0,就是說執行緒池沒有等待佇列 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( minPoolSize, maxPoolSize, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory); //測試一個Runnable for (int i = 0; i < 5; i++) { ExceptionRunThread thread = new ExceptionRunThread(); threadPoolExecutor.submit(thread); } //測試一個callable for (int i = 0; i < 5; i++) { ExceptionCallableThread call = new ExceptionCallableThread(); threadPoolExecutor.submit(call); } //測試多個callable List<Callable<String>> callList = new ArrayList<Callable<String>>(); for (int i = 0; i < 5; i++) { ExceptionCallable call = new ExceptionCallable(); callList.add(call); } invokeAllTask(threadPoolExecutor, callList); //等待任務完成後,關閉執行緒池 threadPoolExecutor.shutdown(); } private static <T> List<T> invokeAllTask( ThreadPoolExecutor threadPoolExecutor, List<Callable<T>> callList) { List<Future<T>> futureAll = null; List<T> resultList = new ArrayList<T>(); try { futureAll = threadPoolExecutor.invokeAll(callList); } catch (InterruptedException e) { e.printStackTrace(); } AtomicInteger in = new AtomicInteger(1); for (Future<T> f : futureAll) { try { T result = f.get(); System.out.println("count:" + in.getAndIncrement() + ":" + result); resultList.add(result); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } return resultList; } } class ExceptionRunThread implements Runnable { @Override public void run() { try { System.out.println(Thread.currentThread().getName() + "do something start......"); if (Thread.currentThread().getName().contains("2")) { throw new RuntimeException("測試 uncheck exception"); } Thread.sleep(100);// do something System.out.println(Thread.currentThread().getName() + "do something end......"); } catch (InterruptedException e) { e.printStackTrace(); } } } class ExceptionCallableThread implements Callable<String> { @Override public String call() { try { System.out.println(Thread.currentThread().getName() + "do something start......"); if (Thread.currentThread().getName().contains("2")) { throw new RuntimeException("測試 uncheck exception"); } Thread.sleep(100);// do something System.out.println(Thread.currentThread().getName() + "do something end......"); } catch (InterruptedException e) { e.printStackTrace(); } return Thread.currentThread().getName(); } }
這裡面有有一個重要的地方,就是執行緒的命名,我們藉助google執行緒工廠的一個類,執行緒的命名很重要,大家可以把執行緒數量修改到20,
Run結果
Orders-0-Thread:do something start...... Orders-1-Thread:do something start...... Orders-2-Thread:do something start...... Orders-3-Thread:do something start...... Orders-4-Thread:do something start...... Orders-5-Thread:do something start...... Orders-6-Thread:do something start...... Orders-7-Thread:do something start...... Orders-8-Thread:do something start...... Orders-9-Thread:do something start...... Orders-10-Thread:do something start...... Orders-11-Thread:do something start...... Orders-12-Thread:do something start...... Orders-13-Thread:do something start...... Orders-14-Thread:do something start...... Orders-0-Thread:do something end...... Orders-1-Thread:do something end...... Orders-4-Thread:do something end...... Orders-3-Thread:do something end...... Orders-6-Thread:do something end...... Orders-5-Thread:do something end...... Orders-8-Thread:do something end...... Orders-7-Thread:do something end...... Orders-9-Thread:do something end...... Orders-14-Thread:do something end...... Orders-11-Thread:do something end...... Orders-10-Thread:do something end...... Orders-13-Thread:do something end...... count:1:Orders-10-Thread: count:2:Orders-11-Thread: java.util.concurrent.ExecutionException: java.lang.RuntimeException: 測試 uncheck exception at java.util.concurrent.FutureTask$Sync.innerGet(Unknown Source) at java.util.concurrent.FutureTask.get(Unknown Source) at com.wei.service.impl.MultiThreadExceptionAndReturnValSHow.invokeAllTask(MultiThreadExceptionAndReturnValSHow.java:63) at com.wei.service.impl.MultiThreadExceptionAndReturnValSHow.main(MultiThreadExceptionAndReturnValSHow.java:46) Caused by: java.lang.RuntimeException: 測試 uncheck exception at com.wei.service.impl.ExceptionCallable.call(MultiThreadExceptionAndReturnVal.java:138) at com.wei.service.impl.ExceptionCallable.call(MultiThreadExceptionAndReturnVal.java:1) at java.util.concurrent.FutureTask$Sync.innerRun(Unknown Source) at java.util.concurrent.FutureTask.run(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source) count:3:Orders-13-Thread: count:4:Orders-14-Thread:
輸出不難理解,執行緒2和12執行緒都沒有正常結束,而執行緒12正好執行的是有返回值的所以count:5:orders-12-Thread是異常的。
2下面我們整合到spring中去看,有了上面的程式碼變形,那麼整合到spring中會比較容易
先看執行緒池業務介面類實現類,我們可以看到執行緒執行器被包在了這個裡面package com.wei.service; import java.util.List; import java.util.concurrent.Callable; import com.wei.service.facade.vo.CommonResonse; public interface MultiThreadTaskExecutorService { public <T> CommonResonse<T> invokeAll(List<Callable<T>> taskList); }
package com.wei.service.impl;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import com.alibaba.fastjson.JSON;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.wei.service.MultiThreadTaskExecutorService;
import com.wei.service.facade.vo.CommonResonse;
public class MultiThreadTaskExecutorServiceImpl implements
MultiThreadTaskExecutorService, InitializingBean, DisposableBean {
private Logger logger = LoggerFactory.getLogger(this.getClass());
private ThreadPoolExecutor threadPoolExecutor;
private int minPoolSize;
private int maxPoolSize;
@Override
public <T> CommonResonse<T> invokeAll(List<Callable<T>> taskList) {
try {
List<Future<T>> futures = threadPoolExecutor.invokeAll(taskList);
List<T> resultList = new ArrayList<T>();
for (Future<T> future : futures) {
try {
resultList.add(future.get());
} catch (ExecutionException e) {
e.printStackTrace();
}
}
logger.info("result={}",JSON.toJSONString(resultList));
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
@Override
public void destroy() throws Exception {
threadPoolExecutor.shutdown();
}
@Override
public void afterPropertiesSet() throws Exception {
final ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("Orders-%d")
.build();
threadPoolExecutor = new ThreadPoolExecutor(minPoolSize, maxPoolSize,
60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),threadFactory);
}
public int getMinPoolSize() {
return minPoolSize;
}
public void setMinPoolSize(int minPoolSize) {
this.minPoolSize = minPoolSize;
}
public int getMaxPoolSize() {
return maxPoolSize;
}
public void setMaxPoolSize(int maxPoolSize) {
this.maxPoolSize = maxPoolSize;
}
}
xm裡bean的配置
<bean id="multiThreadTaskExecutorService"
class="com.wei.service.impl.MultiThreadTaskExecutorServiceImpl">
<property name="minPoolSize" value="5" />
<property name="maxPoolSize" value="10" />
</bean>
aop事務的配置
<tx:advice id="txAdvice" transaction-manager="transactionManager">
<tx:attributes>
<tx:method name="*" propagation="REQUIRED" rollback-for="java.lang.Exception"/>
</tx:attributes>
</tx:advice>
<aop:config>
<aop:pointcut id="servicePiontCut"
expression="execution(* com.wei.service..*(..))" />
<aop:advisor advice-ref="txAdvice" pointcut-ref="servicePiontCut" />
</aop:config>
在看業務介面類
package com.wei.service;
import java.util.List;
import java.util.concurrent.Callable;
import com.wei.dao.entity.User;
public interface CallTaskService extends Callable<List<User>>{
List<User> call();
}
package com.wei.service;
import java.util.List;
import java.util.concurrent.Callable;
import com.wei.dao.entity.User;
public interface CallTask2Service extends Callable<List<User>>{
List<User> call();
}
業務介面實現
package com.wei.service.impl;
import java.util.List;
import java.util.concurrent.Callable;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.wei.dao.entity.User;
import com.wei.dao.mapper.UserMapper;
import com.wei.service.CallTaskService;
@Service
public class CallTaskServiceImpl implements CallTaskService ,Callable<List<User>>{
@Autowired
UserMapper userMapper;
@Override
public List<User> call() {
return userMapper.select(null);
}
}
下面是依賴這幾個介面的測試類,介面沒有寫大家自己補一下
package com.wei.service.impl;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Callable;
import org.apache.ibatis.session.RowBounds;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.alibaba.fastjson.JSON;
import com.github.pagehelper.Page;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import com.wei.dao.entity.User;
import com.wei.dao.mapper.UserMapper;
import com.wei.service.CallTask2Service;
import com.wei.service.CallTaskService;
import com.wei.service.MultiThreadTaskExecutorService;
import com.wei.service.UserService;
import com.wei.service.vo.UserVo;
@Service
public class UserServiceImpl implements UserService{
@Autowired
UserMapper userMapper;
@Autowired
CallTask2Service callTask2Service;
@Autowired
CallTaskService callTaskService;
@Autowired
MultiThreadTaskExecutorService multiThreadTaskExecutorService;
@Override
public void testThreadRollBack(UserVo userVo) {
List<Callable<List<User>>> taskList=new ArrayList<Callable<List<User>>>();
// taskList.add(new Callable<List<User>>() {
// @Override
// public List<User> call() throws Exception {
// System.out.println("---------->"+"select");
// return userMapper.select(null);
// }
// });
// taskList.add(new Callable<List<User>>() {
// @Override
// public List<User> call() throws Exception {
// User user=new User();
// user.setCreateDate(new Date());
// SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// user.setId("rollbank20160421-"+sdf.format(new Date()));
// user.setName("rollback"+sdf.format(new Date()));
// user.setPassword("2222222");
// userMapper.insert(user);
// System.out.println("---------->"+"insert and select");
// userMapper.select(null);
// int i=1/0;
// return userMapper.select(null);
// }
// });
taskList.add(callTask2Service);
taskList.add(callTaskService);
multiThreadTaskExecutorService.invokeAll(taskList);
// User user=new User();
// user.setCreateDate(new Date());
// SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// user.setId("rollbank20160421-"+sdf.format(new Date()));
// user.setName("rollback"+sdf.format(new Date()));
// user.setPassword("11111111");
// userMapper.insert(user);
int i=1/0;
}
}
通過測試,結果我就不貼出來了,我們可以看到只要是spring託管的bean切面的事務就是起作用的,new的就沒有事務,runnable和callable也一樣,new出來的匿名內部類spring不管理他的事務,因為他沒有交給spring託管。想開啟事務要自己編碼。 另外執行緒之間也是沒有影響的,一個執行緒異常結束,不會影響其他執行緒,仔細想想道理都知道,還是做實驗區印證了下。再重複下,只有spring託管的bean才會去管理事務,自己new出來的不管理事務,相信自己學的,重要的說三遍,標紅吧。