1. 程式人生 > 其它 >springboot設定定時任務(2)

springboot設定定時任務(2)

上次瞭解到springboot也內建了對應排程任務的支援,當時就覺得很有用,但也一時沒有想到在生產中具體應該怎麼用。

最近接到一個需求,正好可以用到它。

我需要開發一個介面用來接收其他部門發來的資料,並將資料存入mysql資料庫。

這個需求實現不難,關鍵在於當資料一條一條post過來的時候,一條一條insert到資料庫,可能會把資料庫搞掛!

那麼第一個問題來了:能不能快取多次post的資料,攢到一定數量批次insert?

答案:可以,我們可以在springboot中實現一個Queue,在service層將接收到的資料加入Queue,另起一個執行緒作為消費者來消費Queue

消費者可以無限迴圈消費Queue中的資料,我們用一個List攢夠100條就往mysql批量插入一次

這時產生了一個新問題:我們如何知道對方的資料已經發完了?

如果不確定對方什麼時候發完,快取中的資料不足100條的時候就永遠沒有機會存進mysql,所以這種方式不可行

這個時候我想到了排程的方式,也就是我並不啟動一個常駐的執行緒,而是每隔固定時間啟動一下消費者,消費者設定超時時間(比方1s)消費資料,

消費不到就直接break出while迴圈。

測試了一下,還挺好用,上程式碼:

/**
*構造一個Queue供生產者、消費者使用
*/
@Configuration
public class QueueConfig {

    @Bean
    public ArrayBlockingQueue<GasAccumulate> queue() {
        
return new ArrayBlockingQueue<GasAccumulate>(1000); } }

生產者:

@Service
public class GasServiceImpl implements GasService {

    @Autowired
    private GasAccumulateMapper gasAccumulateMapper;

    @Autowired
    private ArrayBlockingQueue<GasAccumulate> queue;

    @Override
    public
void saveGasAccumulate(GasAccumulateParam gasAccumulateParam) { GasAccumulate gasAccumulate = new GasAccumulate(); //資料加入佇列設定超時時間 try { queue.offer(gasAccumulate, 1, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); //佇列滿時對資料進行簡單單條insert gasAccumulateMapper.insert(gasAccumulate); } }

消費者:

@Component
public class GasAccumulateConsumer {
    private static Logger logger = LoggerFactory.getLogger(GasAccumulateConsumer.class);

    @Autowired
    private GasAccumulateMapper gasAccumulateMapper;

    @Autowired
    private ArrayBlockingQueue<GasAccumulate> queue;


    /**
     * 如果佇列沒有資料,1秒後超時退出while迴圈
     * 這樣就不能作為一個常駐執行緒,而應該處理為一個排程執行緒
     */
    @Scheduled(fixedDelay = 60000)
    public void run() {
        logger.info("scheduler start at:" + LocalDateTime.now());
        List<GasAccumulate> arrayList = new ArrayList<>(100);
        GasAccumulate gasAccumulate = null;

        while (true) {
            try {
                gasAccumulate = queue.poll(1, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if (gasAccumulate != null) {
                arrayList.add(gasAccumulate);
                if (arrayList.size() == 100) {
                    gasAccumulateMapper.batchInsert(arrayList);
                    arrayList.clear();
                }
            } else {
                gasAccumulateMapper.batchInsert(arrayList);
                arrayList.clear();
                break;
            }

        }
        logger.info("scheduler end at:" + LocalDateTime.now());
    }
}

啟動類:

//開啟排程的註解
@EnableScheduling
@EnableSwagger2
@SpringBootApplication
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

}