Springboot事務回滾(改良版)
阿新 • • 發佈:2020-11-13
前段時間發過此類demo,後經大神改版,學到了一點,遂記錄一下
1 /** 2 * 利用執行緒池起兩個任務 3 */ 4 public String thredSubmit(String uuidStr, List<VariationMapIn> mapIns, List<ApproveHis> hisList, String[] ids, 5 Long guToJuPartyId) { 6 SubmitToPer submitToPerTask = new SubmitToPer(uuidStr, hisList, mapIns,7 JtCommonUtil.getUserView().getPartyId()); 8 9 XianSubmit xianianSubmitTask = new XianSubmit(JtCommonUtil.getUserView().getOrganization().getGroupCode(), 10 mapIns, JtCommonUtil.getUserView().getPartyId()); 11 12 String result = SynTaskUtils.doWork(Arrays.asList(submitToPerTask, xianianSubmitTask), transactionManager);13 return result; 14 }
1 /** 2 * 第一個任務 3 */ 4 public class SubmitToPer extends BaseCallBack { 5 6 private String uuidStr; 7 private List<ApproveHis> hisList; 8 private List<VariationMapIn> mapIns; 9 private Long partyId; 10 11 public SubmitToPer(String uuidStr, List<ApproveHis> hisList, List<VariationMapIn> mapIns, Long partyId) {12 super(); 13 this.uuidStr = uuidStr; 14 this.hisList = hisList; 15 this.mapIns = mapIns; 16 this.partyId = partyId; 17 } 18 19 @Override 20 protected void doWork() { 21 // 業務處理開始 22 MapInDealService mapInDealService = (MapInDealService) ApplicationUtil.getBean("mapInDealService"); 23 mapInDealService.submitToPer(uuidStr, mapIns, hisList, partyId); 24 } 25 26 }
1 /** 2 * 第二個任務 3 */ 4 public class XianSubmit extends BaseCallBack { 5 6 private String groupCode; 7 private List<VariationMapIn> mapIns; 8 private Long partyId; 9 10 public XianSubmit(String groupCode, List<VariationMapIn> mapIns, Long partyId) { 11 super(); 12 this.groupCode = groupCode; 13 this.mapIns = mapIns; 14 this.partyId = partyId; 15 } 16 17 @Override 18 protected void doWork() { 19 SpanCheckService spanCheckService = (SpanCheckService) ApplicationUtil.getBean("spanCheckService"); 20 String str = spanCheckService.xianCheckIn(mapIns, partyId, groupCode); 21 if (!Constant.SUCCESS_STR.equals(str)) 22 throw new RuntimeException(str); 23 } 24 25 }
1 /** 2 * 帶回滾的非同步任務回撥 3 * 基類 4 * @author Administrator 5 * 6 */ 7 public abstract class BaseCallBack implements Callable<String> { 8 9 private static Logger logger = LoggerFactory.getLogger(BaseCallBack.class); 10 /** 11 * 需要回滾計數器 12 */ 13 protected CountDownLatch rollBackLatch; 14 /** 15 * 主執行緒等待計數器 16 */ 17 protected CountDownLatch mainThreadLatch; 18 /** 19 * 是否需要回滾 20 */ 21 protected AtomicBoolean rollbackFlag; 22 /** 23 * 事務 24 */ 25 protected PlatformTransactionManager transactionManager; 26 27 protected abstract void doWork(); 28 29 @Override 30 public String call() throws Exception { 31 if (rollbackFlag.get()) { 32 logger.info("需要回滾,直接不用執行了"); 33 mainThreadLatch.countDown(); 34 return Constant.ERROR_STR; // 如果其他執行緒已經報錯 就停止執行緒 35 } 36 // 設定一個事務 37 DefaultTransactionDefinition def = new DefaultTransactionDefinition(); 38 def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); // 事物隔離級別,開啟新事務,這樣會比較安全些。 39 TransactionStatus status = transactionManager.getTransaction(def); // 獲得事務狀態 40 try { 41 logger.info("業務開始處理:{}", this.getClass().getName()); 42 this.doWork(); 43 logger.info("業務處理結束:{}", this.getClass().getName()); 44 // 業務處理結束 45 mainThreadLatch.countDown(); 46 logger.info("執行緒內正常 mainThreadLatch.countDown();"); 47 rollBackLatch.await();// 執行緒等待 48 if (rollbackFlag.get()) { 49 logger.info("回滾事務:{}", this.getClass().getName()); 50 transactionManager.rollback(status); 51 } else { 52 logger.info("提交事務:{}", this.getClass().getName()); 53 transactionManager.commit(status); 54 } 55 return Constant.SAVE_SUCCESS; 56 } catch (Exception e) { 57 e.printStackTrace(); 58 // 如果出錯了 就放開鎖 讓別的執行緒進入提交/回滾 本執行緒進行回滾 59 rollbackFlag.set(true); 60 transactionManager.rollback(status); 61 rollBackLatch.countDown(); 62 mainThreadLatch.countDown(); 63 logger.info("執行緒內異常 mainThreadLatch.countDown();"); 64 return "操作失敗:" + e.getMessage(); 65 } 66 } 67 68 public CountDownLatch getRollBackLatch() { 69 return rollBackLatch; 70 } 71 72 public void setRollBackLatch(CountDownLatch rollBackLatch) { 73 this.rollBackLatch = rollBackLatch; 74 } 75 76 public CountDownLatch getMainThreadLatch() { 77 return mainThreadLatch; 78 } 79 80 public void setMainThreadLatch(CountDownLatch mainThreadLatch) { 81 this.mainThreadLatch = mainThreadLatch; 82 } 83 84 public AtomicBoolean getRollbackFlag() { 85 return rollbackFlag; 86 } 87 88 public void setRollbackFlag(AtomicBoolean rollbackFlag) { 89 this.rollbackFlag = rollbackFlag; 90 } 91 92 public PlatformTransactionManager getTransactionManager() { 93 return transactionManager; 94 } 95 96 public void setTransactionManager(PlatformTransactionManager transactionManager) { 97 this.transactionManager = transactionManager; 98 } 99 100 }
1 /** 2 * 非同步執行緒執行器 攜帶回滾 3 * 4 * @author Administrator 5 * 6 */ 7 public class SynTaskUtils { 8 9 /** 10 * 日誌 11 */ 12 private static Logger logger = LoggerFactory.getLogger(SynTaskUtils.class); 13 14 public static String doWork(List<? extends BaseCallBack> tasks, PlatformTransactionManager transactionManager) { 15 if (tasks == null || tasks.size() <= 0) { 16 return Constant.SUCCESS_STR; 17 } 18 logger.info("開始執行一組執行緒.........................................................."); 19 CountDownLatch rollBackLatch = new CountDownLatch(1); 20 CountDownLatch mainThreadLatch = new CountDownLatch(tasks.size()); 21 AtomicBoolean rollbackFlag = new AtomicBoolean(false); 22 List<Future<String>> list = new ArrayList<Future<String>>(); 23 for (BaseCallBack task : tasks) { 24 task.setMainThreadLatch(mainThreadLatch); 25 task.setRollbackFlag(rollbackFlag); 26 task.setRollBackLatch(rollBackLatch); 27 task.setTransactionManager(transactionManager); 28 logger.info("新增任務:{}", task.getClass().getName()); 29 Future<String> future = TestExecutorUtil.getInstance().getExecutor().submit(task); 30 list.add(future); 31 } 32 // 主執行緒業務執行完畢 如果其他執行緒也執行完畢 且沒有報異常 正在阻塞狀態中 喚醒其他執行緒 提交所有的事務 33 // 如果其他執行緒或者主執行緒報錯 則不會進入if 會觸發回滾 34 try { 35 logger.info("主執行緒開始等待。"); 36 mainThreadLatch.await(); 37 logger.info("主執行緒等待結束。"); 38 if (!rollbackFlag.get()) { 39 logger.info("不需要回滾。"); 40 rollBackLatch.countDown(); 41 return Constant.SUCCESS_STR; 42 } else { 43 logger.info("需要回滾。"); 44 for (Future<String> f : list) { 45 String result = f.get(); 46 if (!Constant.SAVE_SUCCESS.equals(result)) { 47 logger.info("返回值:{}", result); 48 return result; 49 } 50 } 51 return Constant.SUCCESS_STR; 52 } 53 } catch (Exception e) { 54 return "操作出現異常。"; 55 }finally { 56 logger.info("結束執行一組執行緒.........................................................."); 57 } 58 } 59 60 }
1 /** 2 * 公用執行緒池 3 * 4 * @author Administrator 5 * 6 */ 7 public class TestExecutorUtil { 8 9 /** 10 * 執行緒池 11 */ 12 private ExecutorService executor = Executors.newFixedThreadPool(10); 13 14 /** 15 * 單利 16 * 17 * @author Administrator 18 * 19 */ 20 private static class Instance { 21 private static final TestExecutorUtil instance = new ExecutorUtil(); 22 } 23 24 private TestExecutorUtil() { 25 } 26 27 public static TestExecutorUtil getInstance() { 28 return Instance.instance; 29 } 30 31 public TestExecutorService getExecutor() { 32 return executor; 33 } 34 35 }