1. 程式人生 > >10W級資料更新操作__生產消費者模式

10W級資料更新操作__生產消費者模式

背景需求

最近有這麼一個需求:由於本地系統資訊與另一個系統資料可能不一致,兩個系統有各自的獨立資料庫和業務,在通訊過程中網路等原因,導致兩者之間的關鍵資訊有差異,因此本地資料庫中可能有10W條記錄需要更新,本地資料庫的資訊需要逐條與遠端http請求資料,對比或更新。

技術分析

如果將本地資料庫全部取出放入一個集合中,然後遍歷併發送http請求核對資料,顯然不太現實。儲存10W條記錄需要多大的快取?而且這樣做對系統資源佔用也很高。
採用生產消費者模式批量處理資料。生產消費者模式維護一個佇列,一個執行緒新增資料,另一個執行緒取資料,可通過控制執行緒的數量來控制處理的速度。
思路:用分頁取批量資料,放入一個阻塞式佇列LinkedBlockingQueue

中,開啟另一個執行緒從佇列中取資料,迴圈以上過程,直到全部資料處理完畢(資料處理到最後一頁),佇列為空。

程式碼實現

  • Talk is cheap, show me code
public class CheckRecordWithChannel {

    private static final Logger logger = LoggerFactory.getLogger(CheckRecordWithChannel.class);

    //500條資料
    private volatile BlockingQueue<WechatTransInfo> orderQueue = new
LinkedBlockingQueue<>(500); public static final int PAGE_NUM = 200; //生產完成標誌 private boolean produceFlag = false; //消費完成標誌 private boolean cosumerFlag = false; //當前頁面數 private int currentPageNum = 0; private CheckOrderBusinessHandle businessHandle; /** * 構造方法 * @param
businessHandle 需要實現的特殊業務方法 */
public CheckRecordWithChannel (CheckOrderBusinessHandle businessHandle) { this.businessHandle = businessHandle; } /** * 主方法 * @return * @throws InterruptedException */ public boolean checkOrderBusiness() throws InterruptedException { //建立執行緒池,生產者、消費都的數量可以多用幾個 ExecutorService checkOrderService = Executors.newCachedThreadPool(); LocalOrderProducer producer = this.new LocalOrderProducer(); OrderConsumer consumer = this.new OrderConsumer(); checkOrderService.submit(producer); checkOrderService.submit(consumer); while (true) { Thread.sleep(2000L); if (produceFlag && cosumerFlag && orderQueue.isEmpty()) { List<Runnable> shutdownList = checkOrderService.shutdownNow(); logger.warn("------{} 頁處理完成,{} 執行緒停止執行-----", currentPageNum,shutdownList); return true; } } } /** * 分頁查詢資料業務 */ public Page<WechatTransInfo> queryOrderWithPage(int pageNum) { //方便以後拓展 return businessHandle.queryOrderWithPage(this.currentPageNum++, pageNum); } /** * 生產方法, 阻塞式,put方法 * @param orderList * @return 是否還有資料, false表示沒有資料 * @throws InterruptedException 執行緒被中斷 */ public boolean produceOrder() throws InterruptedException { //資料標識 boolean hasData = true; Page<Info> queryOrderPage = queryOrderWithPage(PAGE_NUM); if (null == queryOrderPage) { return false; } //最後一次的查詢結果 if (!queryOrderPage.hasNextPage()) { hasData = false; } //迴圈插入200條資料,佇列滿就阻塞等待 for (Info order : queryOrderPage.getContent()) { orderQueue.put(order); } return hasData; } /** * 消費者方法 */ public boolean cosumerOrder(Info orderInfo) { //方便以後拓展 return businessHandle.checkOrderWithOrg(orderInfo); } /** * 生產者 * <p> 查詢本地資料庫訂單,並put到orderIdQueue中,查詢結束則flag=true * @author */ class LocalOrderProducer implements Runnable { @Override public void run() { try { // 如果資料庫未查詢完,繼續生產 while (!produceFlag) { Thread.sleep(2000L); //放慢生產者速度 //如果資料庫中沒有資料 if(!produceOrder()) { Thread.sleep(5000L); produceFlag = true; } } logger.debug("----producer was done---"); } catch (InterruptedException e) { logger.error("--- producer thread was interrupted--{}-", e); } } } /** * 消費者 * <p> 非阻塞式消費 * @author */ class OrderConsumer implements Runnable { @Override public void run() { try { // 第一次阻塞取資料 Thread.sleep(5000L); Info orderInfo = null; //如果生產者還在生產或者佇列不為空,則進入繼續消費處理 while (!produceFlag || null != (orderInfo = orderQueue.poll())) { //如果佇列為空,則等待2s生產者生產 logger.debug("---is get order data, ? {} ---", null != orderInfo); if (null == orderInfo) { Thread.sleep(2000L); } cosumerOrder(orderInfo); //orderInfo = orderQueue.poll(); } if (produceFlag && orderQueue.isEmpty()) { cosumerFlag = true; } } catch (InterruptedException e) { logger.error("---thread was interrupted--{}-", e); } } } }
public interface CheckOrderBusinessHandle {

    /**
     * 查詢本地資料庫業務
     * <p> 順序迭代查詢資料庫,
     * pageNum是每次查詢數量
     * @return 返回查詢的結果
     */
    Page<Info> queryOrderWithPage(int currentPageNum, int pageNum);

    /**
     * 處理業務邏輯,更新本地資料庫
     * @param order  資訊
     */
    boolean checkOrderWithOrg(Info orderInfo);
}
  1. CheckRecordWithChannel 構造方法初始化handle用來處理生產和消費的具體業務;
  2. 兩個Runnable 實現類分別是生產和消費執行緒,其中生產採用阻塞式,如果佇列滿且資料庫未查詢完畢則阻塞,直到資料庫全部查詢完畢並put到佇列中時,該執行緒退出put;消費採用非阻塞式poll,當生產完畢且佇列為空時,則退出poll;
  3. Page是Spring框架中的一個分頁bean;
  4. LinkedBlockingQueue用volatile修飾防止執行緒操作過程中資料不一致;
  5. CheckOrderBusinessHandle是一個介面,具體需要根據業務去實現
  • 執行呼叫
    實現具體業務介面
public void checkOrderWith() {

        try {
            new CheckRecordWithChannel (new CheckOrderBusinessHandle() {

                @Override
                public Page<Info> queryOrderWithPage(int currentPageNum,
                        int pageNum) {
//TODO
                }

                @Override
                public boolean checkOrderWithOrg(Info orderInfo) {
//TODO
                }
            }).checkOrderBusiness();
        } catch (InterruptedException e) {
            logger.error("----check thread was stoped, {}----", e);
        }
    }