springboot設定定時任務(2)
阿新 • • 發佈:2021-07-05
上次瞭解到springboot也內建了對應排程任務的支援,當時就覺得很有用,但也一時沒有想到在生產中具體應該怎麼用。
最近接到一個需求,正好可以用到它。
我需要開發一個介面用來接收其他部門發來的資料,並將資料存入mysql資料庫。
這個需求實現不難,關鍵在於當資料一條一條post過來的時候,一條一條insert到資料庫,可能會把資料庫搞掛!
那麼第一個問題來了:能不能快取多次post的資料,攢到一定數量批次insert?
答案:可以,我們可以在springboot中實現一個Queue,在service層將接收到的資料加入Queue,另起一個執行緒作為消費者來消費Queue
消費者可以無限迴圈消費Queue中的資料,我們用一個List攢夠100條就往mysql批量插入一次
這時產生了一個新問題:我們如何知道對方的資料已經發完了?
如果不確定對方什麼時候發完,快取中的資料不足100條的時候就永遠沒有機會存進mysql,所以這種方式不可行
這個時候我想到了排程的方式,也就是我並不啟動一個常駐的執行緒,而是每隔固定時間啟動一下消費者,消費者設定超時時間(比方1s)消費資料,
消費不到就直接break出while迴圈。
測試了一下,還挺好用,上程式碼:
/** *構造一個Queue供生產者、消費者使用 */ @Configuration public class QueueConfig { @Bean public ArrayBlockingQueue<GasAccumulate> queue() {return new ArrayBlockingQueue<GasAccumulate>(1000); } }
生產者:
@Service public class GasServiceImpl implements GasService { @Autowired private GasAccumulateMapper gasAccumulateMapper; @Autowired private ArrayBlockingQueue<GasAccumulate> queue; @Override publicvoid saveGasAccumulate(GasAccumulateParam gasAccumulateParam) { GasAccumulate gasAccumulate = new GasAccumulate(); //資料加入佇列設定超時時間 try { queue.offer(gasAccumulate, 1, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); //佇列滿時對資料進行簡單單條insert gasAccumulateMapper.insert(gasAccumulate); } }
消費者:
@Component public class GasAccumulateConsumer { private static Logger logger = LoggerFactory.getLogger(GasAccumulateConsumer.class); @Autowired private GasAccumulateMapper gasAccumulateMapper; @Autowired private ArrayBlockingQueue<GasAccumulate> queue; /** * 如果佇列沒有資料,1秒後超時退出while迴圈 * 這樣就不能作為一個常駐執行緒,而應該處理為一個排程執行緒 */ @Scheduled(fixedDelay = 60000) public void run() { logger.info("scheduler start at:" + LocalDateTime.now()); List<GasAccumulate> arrayList = new ArrayList<>(100); GasAccumulate gasAccumulate = null; while (true) { try { gasAccumulate = queue.poll(1, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } if (gasAccumulate != null) { arrayList.add(gasAccumulate); if (arrayList.size() == 100) { gasAccumulateMapper.batchInsert(arrayList); arrayList.clear(); } } else { gasAccumulateMapper.batchInsert(arrayList); arrayList.clear(); break; } } logger.info("scheduler end at:" + LocalDateTime.now()); } }
啟動類:
//開啟排程的註解 @EnableScheduling @EnableSwagger2 @SpringBootApplication public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }