多執行緒抽取資料庫資料,資料遷移
阿新 • • 發佈:2018-12-16
關鍵程式碼
2000萬資料同步,每次查詢20000分頁,一分鐘分鐘之內全部塞入到佇列裡等待
log.info("房屋數倉資料同步排程開始!"); Date yesterday = null; HouseFeedbackCount hfb = getHouseFeedbackCountTime(21); if (hfb != null){ yesterday = DateUtils.addDays(hfb.getEndDate(),1); }else{ yesterday= DateUtils.addDays(new Date(),-1);//今天 } Date today= DateUtils.addDays(new Date(),0);//今天 String beginTime = DateUtils.toString(yesterday,DateUtils.YYYY_MM_DD)+" 00:00:00"; String endTime = DateUtils.toString(today,DateUtils.YYYY_MM_DD)+" 23:59:59"; System.out.println("生成任務開始"+beginTime+endTime); int pageSize = 20000; int size = houseService.getHouseTotal(beginTime, endTime); if(size > pageSize){ int page = size%pageSize==0?(size/pageSize):(size/pageSize)+1; System.out.println(page); int begin = 0; int end = 0; for (int i = 0; i < page; i++) { begin = pageSize*i+1; end = pageSize*(i+1); houseService.updateHouse(beginTime, endTime, begin, end); } }else{ houseService.updateHouse(beginTime, endTime, 1, size); } hfb = new HouseFeedbackCount(); hfb.setBizname("房屋數倉資料同步定時生成任務");
HouseServiceImpl裡面的updateHouse 加上註解@Async
@Async @Transactional @Override public void updateHouse(String beginTime, String endTime, int begin, int end) { //你的遷移程式碼 }
配置 apring類掃描檔案中配置
<task:scheduler id="yuncaiCheduler" pool-size="10"/><!-- 定時器執行緒數 --> <task:annotation-driven scheduler="yuncaiCheduler"/><!-- 開啟定時器註解 -->
假如你的執行緒池配置類,確保你的類能被掃描到 import java.util.concurrent.Executor; import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.AsyncConfigurer; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; /** * @Title: TaskExecutorConfig.java * @Package com.yuncai.scheduled * @Description: 非同步任務支援配置 * @author: LGH * @date: 2018年8月10日 上午10:28:22 */ @Configuration @ComponentScan("com.yuncai.scheduled") @EnableAsync //利用@EnableAsync註解開啟非同步任務支援 public class TaskExecutorConfig implements AsyncConfigurer{ //配置類實現AsyncConfigurer介面並重寫getAsyncExcutor方法,並返回一個ThreadPoolTaskExevutor //這樣我們就獲得了一個基於執行緒池的TaskExecutor @Override public Executor getAsyncExecutor() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); taskExecutor.setCorePoolSize(10);//執行緒池維護執行緒的最少數量 taskExecutor.setMaxPoolSize(35);//執行緒池維護執行緒的最大數量 taskExecutor.initialize(); return taskExecutor; } @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return null; } }
然後你的上面的程式就可以開跑了。十個執行緒跑排程,主要是cpu的問題