10W級資料更新操作__生產消費者模式
阿新 • • 發佈:2019-02-19
背景需求
最近有這麼一個需求:由於本地系統資訊與另一個系統資料可能不一致,兩個系統有各自的獨立資料庫和業務,在通訊過程中網路等原因,導致兩者之間的關鍵資訊有差異,因此本地資料庫中可能有10W條記錄需要更新,本地資料庫的資訊需要逐條與遠端http請求資料,對比或更新。
技術分析
如果將本地資料庫全部取出放入一個集合中,然後遍歷併發送http請求核對資料,顯然不太現實。儲存10W條記錄需要多大的快取?而且這樣做對系統資源佔用也很高。
採用生產消費者模式批量處理資料。生產消費者模式維護一個佇列,一個執行緒新增資料,另一個執行緒取資料,可通過控制執行緒的數量來控制處理的速度。
思路:用分頁取批量資料,放入一個阻塞式佇列LinkedBlockingQueue
程式碼實現
- Talk is cheap, show me code
public class CheckRecordWithChannel {
private static final Logger logger = LoggerFactory.getLogger(CheckRecordWithChannel.class);
//500條資料
private volatile BlockingQueue<WechatTransInfo> orderQueue = new LinkedBlockingQueue<>(500);
public static final int PAGE_NUM = 200;
//生產完成標誌
private boolean produceFlag = false;
//消費完成標誌
private boolean cosumerFlag = false;
//當前頁面數
private int currentPageNum = 0;
private CheckOrderBusinessHandle businessHandle;
/**
* 構造方法
* @param businessHandle 需要實現的特殊業務方法
*/
public CheckRecordWithChannel (CheckOrderBusinessHandle businessHandle) {
this.businessHandle = businessHandle;
}
/**
* 主方法
* @return
* @throws InterruptedException
*/
public boolean checkOrderBusiness() throws InterruptedException {
//建立執行緒池,生產者、消費都的數量可以多用幾個
ExecutorService checkOrderService = Executors.newCachedThreadPool();
LocalOrderProducer producer = this.new LocalOrderProducer();
OrderConsumer consumer = this.new OrderConsumer();
checkOrderService.submit(producer);
checkOrderService.submit(consumer);
while (true) {
Thread.sleep(2000L);
if (produceFlag && cosumerFlag && orderQueue.isEmpty()) {
List<Runnable> shutdownList = checkOrderService.shutdownNow();
logger.warn("------{} 頁處理完成,{} 執行緒停止執行-----", currentPageNum,shutdownList);
return true;
}
}
}
/**
* 分頁查詢資料業務
*/
public Page<WechatTransInfo> queryOrderWithPage(int pageNum) {
//方便以後拓展
return businessHandle.queryOrderWithPage(this.currentPageNum++, pageNum);
}
/**
* 生產方法, 阻塞式,put方法
* @param orderList
* @return 是否還有資料, false表示沒有資料
* @throws InterruptedException 執行緒被中斷
*/
public boolean produceOrder() throws InterruptedException
{
//資料標識
boolean hasData = true;
Page<Info> queryOrderPage = queryOrderWithPage(PAGE_NUM);
if (null == queryOrderPage) {
return false;
}
//最後一次的查詢結果
if (!queryOrderPage.hasNextPage()) {
hasData = false;
}
//迴圈插入200條資料,佇列滿就阻塞等待
for (Info order : queryOrderPage.getContent()) {
orderQueue.put(order);
}
return hasData;
}
/**
* 消費者方法
*/
public boolean cosumerOrder(Info orderInfo) {
//方便以後拓展
return businessHandle.checkOrderWithOrg(orderInfo);
}
/**
* 生產者
* <p> 查詢本地資料庫訂單,並put到orderIdQueue中,查詢結束則flag=true
* @author
*/
class LocalOrderProducer implements Runnable {
@Override
public void run() {
try {
// 如果資料庫未查詢完,繼續生產
while (!produceFlag) {
Thread.sleep(2000L); //放慢生產者速度
//如果資料庫中沒有資料
if(!produceOrder()) {
Thread.sleep(5000L);
produceFlag = true;
}
}
logger.debug("----producer was done---");
} catch (InterruptedException e) {
logger.error("--- producer thread was interrupted--{}-", e);
}
}
}
/**
* 消費者
* <p> 非阻塞式消費
* @author
*/
class OrderConsumer implements Runnable {
@Override
public void run() {
try {
// 第一次阻塞取資料
Thread.sleep(5000L);
Info orderInfo = null;
//如果生產者還在生產或者佇列不為空,則進入繼續消費處理
while (!produceFlag || null != (orderInfo = orderQueue.poll())) {
//如果佇列為空,則等待2s生產者生產
logger.debug("---is get order data, ? {} ---", null != orderInfo);
if (null == orderInfo) {
Thread.sleep(2000L);
}
cosumerOrder(orderInfo);
//orderInfo = orderQueue.poll();
}
if (produceFlag && orderQueue.isEmpty()) {
cosumerFlag = true;
}
} catch (InterruptedException e) {
logger.error("---thread was interrupted--{}-", e);
}
}
}
}
public interface CheckOrderBusinessHandle {
/**
* 查詢本地資料庫業務
* <p> 順序迭代查詢資料庫,
* pageNum是每次查詢數量
* @return 返回查詢的結果
*/
Page<Info> queryOrderWithPage(int currentPageNum, int pageNum);
/**
* 處理業務邏輯,更新本地資料庫
* @param order 資訊
*/
boolean checkOrderWithOrg(Info orderInfo);
}
- CheckRecordWithChannel 構造方法初始化handle用來處理生產和消費的具體業務;
- 兩個Runnable 實現類分別是生產和消費執行緒,其中生產採用阻塞式,如果佇列滿且資料庫未查詢完畢則阻塞,直到資料庫全部查詢完畢並put到佇列中時,該執行緒退出put;消費採用非阻塞式poll,當生產完畢且佇列為空時,則退出poll;
- Page是Spring框架中的一個分頁bean;
- LinkedBlockingQueue用volatile修飾防止執行緒操作過程中資料不一致;
- CheckOrderBusinessHandle是一個介面,具體需要根據業務去實現
- 執行呼叫
實現具體業務介面
public void checkOrderWith() {
try {
new CheckRecordWithChannel (new CheckOrderBusinessHandle() {
@Override
public Page<Info> queryOrderWithPage(int currentPageNum,
int pageNum) {
//TODO
}
@Override
public boolean checkOrderWithOrg(Info orderInfo) {
//TODO
}
}).checkOrderBusiness();
} catch (InterruptedException e) {
logger.error("----check thread was stoped, {}----", e);
}
}